Verified Commit a7adced0 authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

Merge branch 'amihacked'

parents 79367cd3 7e771c25
This directory holds the script to provide data to the amihacked website. It is
just that ‒ providing the data. The frontend lives elsewhere.
There are three script entry-points:
* export_repu: This is to be run on the archive.turris.cz machine. It generates
exports of the whole history, in the form of several .csv.bz2 files.
* repu_init: This one fills the initial data into a database. It expects to be
run in a directory with the .csv.bz2 files from the above script. The
database must already exist and the home directory should contain a db.ini file.
* repu_add: This one adds data incrementaly, from files created by the
archivist.pl. It expects the information to connect to the db in
archivist.ini in the home directory.
The other scripts are helpers and are not to be called directly.
#!/bin/sh
# Run the pbzip2 compression or decompression, with -1
PROG=pbzip2
exec "$PROG" -1 "$@"
#!/usr/bin/perl
use common::sense;
# Sum consecutive lines with the same ip, date and kind together. Used to make
# the primary export smaller (the db doesn't guarantee the similar lines to be
# together, but they still often are).
my ($ip, $date, $cnt, $kind);
sub flush() {
if ($ip) {
print "$ip,$date,$cnt,$kind\n";
undef $ip;
}
}
while (<>) {
chomp;
my ($new_ip, $new_date, $new_cnt, $new_kind) = split /,/;
if (($new_ip ne $ip) or ($new_date ne $date) or ($new_kind ne $kind)) {
flush;
($ip, $date, $cnt, $kind) = ($new_ip, $new_date, $new_cnt, $new_kind);
} else {
$cnt += $new_cnt;
}
}
flush;
#!/bin/sh
set -e
q() {
CMD="$1"
NAME="$2"
psql -q -d turris -1 -c "$CMD" | ./compact.pl | pbzip2 -5 >"$NAME".csv.bz2
echo "Done $NAME on $(date +%X)" >&2
}
q "COPY (select remote, date(start_time), count(1) as attempt_count, 'ssh' from ssh_sessions where remote is not null group by date(start_time), remote) to STDOUT with CSV;" "ssh" &
q "COPY (select remote, date, attempt_count, server from fake_attackers where attempt_count > 0) to STDOUT with CSV;" "telnet" &
q "COPY (select addr_rem as remote, date(time) as date, count, 'firewall' from firewall_packets where direction = 'I' and ((protocol = 'TCP' AND tcp_flags & 18 = 2) OR protocol = 'UDP') and port_loc in (22, 2222, 8822, 22222, 23, 445, 1433, 3306, 5432, 161, 1723, 2083, 3389, 3390, 5631, 5900, 5901, 5902, 5903, 5060, 5061, 1080, 3128, 8088, 8118, 9064, 21320, 137, 128, 139, 1900, 53413, 9333, 5000, 5001, 80, 443, 8080, 8081) AND count > 0) to STDOUT with CSV;" "firewall" &
q "COPY (select addr_rem as remote, date(time) as date, count, 'firewall_all' from firewall_packets where direction = 'I' and ((protocol = 'TCP' AND tcp_flags & 18 = 2) OR protocol = 'UDP') AND count > 0) to STDOUT with CSV;" "firewall_all" &
wait
#!/usr/bin/perl
# This script takes series of CSV lines about attacks and constructs
# corresponding JSON records for them. It expects the lines of one
# IP address to be consequtive.
#
# The columns are:
# * IP address
# * date
# * kind of attack
#
# The produced lines contain the IP address and JSON description. The
# json object is indexed by the kind of attack and the date. This
# holds the count of attacks on the day.
#
# eg:
# 192.0.2.1 {"telnet": {"2015-10-05": 5}}
#
# All IPv6 attackers are aggregated into their /64 ranges as well, in addition
# to individual records.
use common::sense;
use JSON qw(encode_json);
use NetAddr::IP;
my $last_ip;
my $object;
my %nets;
sub flush() {
return unless defined $last_ip;
print $last_ip->canon(), "\t", encode_json $object, "\n";
undef $object;
}
my $ip6_strange = NetAddr::IP->new("f000::/8");
my $ip4_strange = NetAddr::IP->new("224.0.0.0/4");
while (<>) {
chomp;
my ($ip, $date, $cnt, $kind) = split /,/;
$ip = NetAddr::IP->new($ip) or die "Bad IP: $ip\n";
# Skip addresses that are not interesting:
# • Private IPv4 ranges (RFC 1918)
# • Multicast IPv4 ranges
# • f* IPv6 addresses (there are several kinds of strange addresses, like fe* local ones, ff*multicast ones, etc.
next if $ip->is_rfc1918() or $ip->within($ip4_strange) or $ip->within($ip6_strange);
if ($last_ip ne $ip) {
flush;
$last_ip = $ip;
}
$object->{$kind}->{$date} += $cnt;
if ($ip->version() == 6) {
my $net = NetAddr::IP->new($ip->canon(), 64)->network();
# Make sure it is stringified
$nets{$net->canon()}->{$kind}->{$date} += $cnt;
}
}
flush;
while (my ($net, $obj) = each %nets) {
print "$net/64\t", encode_json $obj, "\n";
}
#!/bin/sh
# Add incidents incrementally, from all the present csv files.
set -e
DIR=$(dirname "$0")
FILES=$(ls *.csv)
cat $FILES | LC_ALL=C sort -S 2G -T . >sorted
"$DIR/jsonize.pl" <sorted >jsonized
"$DIR/to_db.pl" -d "$HOME/archivist.ini" <jsonized
rm $FILES sorted jsonized
#!/bin/sh
# Import the data into database. It does it all compressed during the way
# (because there's a lot of data) and uses many cores during the processing.
set -e
DIR=$(dirname "$0")
(
pbzip2 -d < telnet.csv.bz2
pbzip2 -d < ssh.csv.bz2
pbzip2 -d < firewall.csv.bz2
pbzip2 -d < firewall_all.csv.bz2
) | LC_ALL=C sort --compress-prog="$DIR/bzchoose" -T . -S 4G | "$DIR/split.pl"
cat >Makefile <<ENDMAKE
INPUTS:=\$(wildcard split/*.csv.gz)
OUTPUTS:=\$(patsubst %.csv.gz,%.json.gz,\$(INPUTS))
all: \$(OUTPUTS)
%.json.gz: %.csv.gz
gunzip -c < \$< | "$DIR/jsonize.pl" | gzip -1 >\$@
ENDMAKE
make -j12
for i in split/*.json.gz ; do
gunzip <"$i"
done | "$DIR/to_db.pl" -i -d "$HOME/db.ini"
#!/usr/bin/perl
# Sort the lines of input into several files, according to the IP at the front.
# This helps parallelize the jsonize step (we can't split the IP address into two
# groups, but if all the ones with the same prefix are together, this won't happen).
use common::sense;
mkdir "split";
my $last;
my $f;
while (<>) {
my ($prefix) = /^(..)/;
$prefix =~ s/:/_/g;
if ($last ne $prefix) {
close $f if $f;
open $f, '|-', "gzip -1 >split/$prefix.csv.gz" or die "Failed to open split/$prefix.csv: $!\n";
$last = $prefix;
}
print $f $_;
}
close $f;
#!/usr/bin/perl
# This script adds provided attacker data to the database. It expects
# the data to come in a form:
#
# ip-address JSON data
#
# (The separator is a tab)
#
# If -i is provided, the DB is cleaned and new data is inserted. If not,
# the JSONs are summed together.
use common::sense;
use DBI;
use JSON qw(encode_json decode_json);
use Getopt::Long;
use Config::IniFiles;
my $initial;
my $dbini = "db.ini";
GetOptions
initial => \$initial,
'dbini=s' => \$dbini
or die "Error parsing parameters\n";
my $cfg = Config::IniFiles->new(-file => $dbini) or die "Failed to read config: @Config::IniFiles::errors\n";
my ($dbname, $dbuser, $dbpass, $dbhost) = map { $cfg->val('amihacked', $_) } qw(db user passwd host);
# Connect to the database. Use the username based authentication (→ no password)
my $dbh = DBI->connect("dbi:Pg:dbname=$dbname" . ($dbhost ? ";host=$dbhost" : ""), $dbuser, $dbpass, {RaiseError => 1, AutoCommit => 0});
my $lookup = $dbh->prepare("SELECT data FROM amihacked_statistics WHERE address = ?");
my $insert = $dbh->prepare("INSERT INTO amihacked_statistics (address, data) VALUES (?, ?)");
my $update = $dbh->prepare("UPDATE amihacked_statistics SET data = ? WHERE address = ?");
# If we want to provide initial data, wipe the original
$dbh->do("TRUNCATE amihacked_statistics") if $initial;
while (<>) {
chomp;
my ($addr, $data) = split /\t/;
my $previous;
unless ($initial) {
# Unless we fill the DB with initial data, try to look up previous value
$lookup->execute($addr);
($previous) = $lookup->fetchrow_array;
}
if ($previous) {
# Decode both old and new
my $json_previous = decode_json $previous;
my $json_data = decode_json $data;
# Sum them together, fieldwise
while (my ($kind, $kind_data) = each %$json_data) {
while (my ($date, $cnt) = each %$kind_data) {
# If the field is not there yet, it gets created (including all the necessary levels above it)
$json_previous->{$kind}->{$date} += $cnt;
}
}
# Store the new value
$update->execute(encode_json $json_previous, $addr);
} else {
# Insert a new value
$insert->execute($addr, $data);
}
}
$dbh->commit;
......@@ -70,6 +70,37 @@ $source->commit;
undef $destination;
undef $source;
# Export incidents for the amihacked site (this way we can do the same exports as go into the archive)
my $ifile;
my $fname;
sub incident_init($) {
my ($name) = @_;
my ($sec, $min, $hour, $day, $mon, $year) = localtime();
$year += 1900;
$mon += 1;
$fname = "$year-$mon-$day-$name.csv";
open $ifile, '>', "$fname.part" or die "Couldn't write $fname.part: $!\n";
}
my @incidents;
sub incident($$$$) {
my ($remote, $date, $count, $name) = @_;
push @incidents, [$remote, $date, $count, $name];
}
sub incident_flush() {
print $ifile (join ',', @$_), "\n" for @incidents;
@incidents = ();
}
sub incident_finish() {
incident_flush;
close $ifile;
rename "$fname.part", $fname;
}
if (fork == 0) {
my $source = connect_db 'source';
my $destination = connect_db 'destination';
......@@ -127,6 +158,9 @@ if (fork == 0) {
my $source = connect_db 'source';
my $destination = connect_db 'destination';
# The ports that are included in „firewall“ category. If they change, the whole export needs to be redone from archive, if we just update it here, the history won't match correctly.
my %interesting_ports = map { $_ => 1 } (22, 2222, 8822, 22222, 23, 445, 1433, 3306, 5432, 161, 1723, 2083, 3389, 3390, 5631, 5900, 5901, 5902, 5903, 5060, 5061, 1080, 3128, 8088, 8118, 9064, 21320, 137, 128, 139, 1900, 53413, 9333, 5000, 5001, 80, 443, 8080, 8081);
# We get the maximum time of a packet in the destination and
# read the packets in the source from that time on. But we don't
# do it until the current time, but only some time before the maximum.
......@@ -136,6 +170,7 @@ if (fork == 0) {
my ($loc_max) = $source->selectrow_array("SELECT MAX(time) - INTERVAL '3 hours' FROM router_loggedpacket");
my ($rem_max) = $destination->selectrow_array('SELECT COALESCE(MAX(time), TO_TIMESTAMP(0)) FROM firewall_packets');
tprint "Going to store firewall logs between $rem_max and $loc_max\n";
incident_init 'firewall';
# Get the packets. Each packet may have multiple resulting lines,
# for multiple groups it is in. Prefilter the groups, we are not
# interested in the random ones. We still have the 'all' group
......@@ -143,7 +178,7 @@ if (fork == 0) {
# (we could solve it by some kind of outer join, but the condition
# at the WHERE part would get complicated, handling NULL columns).
my $get_packets = $source->prepare("
SELECT router_loggedpacket.id, group_members.in_group, router_loggedpacket.rule_id, router_loggedpacket.time, router_loggedpacket.direction, router_loggedpacket.remote_port, router_loggedpacket.remote_address, router_loggedpacket.local_port, router_loggedpacket.protocol, router_loggedpacket.count, router_loggedpacket.tcp_flags FROM router_loggedpacket
SELECT router_loggedpacket.id, group_members.in_group, DATE(router_loggedpacket.time), router_loggedpacket.rule_id, router_loggedpacket.time, router_loggedpacket.direction, router_loggedpacket.remote_port, router_loggedpacket.remote_address, router_loggedpacket.local_port, router_loggedpacket.protocol, router_loggedpacket.count, router_loggedpacket.tcp_flags FROM router_loggedpacket
JOIN router_router ON router_loggedpacket.router_id = router_router.id
JOIN group_members ON router_router.client_id = group_members.client
JOIN groups ON group_members.in_group = groups.id
......@@ -156,13 +191,20 @@ if (fork == 0) {
my $packet_group = $destination->prepare('INSERT INTO firewall_groups (packet, for_group) VALUES (?, ?)');
my ($last_id, $id_dest);
my $count = 0;
while (my ($id, $group, @data) = $get_packets->fetchrow_array) {
while (my ($id, $group, $date, @data) = $get_packets->fetchrow_array) {
if ($last_id != $id) {
$count ++;
if ($count % 100000 == 0) {
$destination->commit;
incident_flush;
}
$store_packet->execute(@data);
my ($rule_id, $time, $direction, $remote_port, $remote_address, $local_port, $protocol, $count, $tcp_flags) = @data;
if (($count > 0) && ($direction eq 'I') && (($protocol eq 'UDP') || (($protocol eq 'TCP') && (($tcp_flags & 18) == 2)))) {
# The incidents are only about incoming connections (SYN and not FIN) or UDP packets
incident $remote_address, $date, $count, 'firewall_all';
incident $remote_address, $date, $count, 'firewall' if $interesting_ports{$local_port};
}
$last_id = $id;
$id_dest = $destination->last_insert_id(undef, undef, 'firewall_packets', undef);
}
......@@ -173,6 +215,7 @@ if (fork == 0) {
tprint "Stored $count packets\n";
$destination->commit;
$source->commit;
incident_finish;
exit;
}
......@@ -446,8 +489,9 @@ if (fork == 0) {
if (fork == 0) {
my $source = connect_db 'source';
my $destination = connect_db 'destination';
incident_init 'ssh';
my %sessions;
my $get_commands = $source->prepare('SELECT ssh_commands.id, start_time, end_time, login, password, remote, remote_port, ts, success, command FROM ssh_commands JOIN ssh_sessions ON ssh_commands.session_id = ssh_sessions.id WHERE NOT archived');
my $get_commands = $source->prepare('SELECT ssh_commands.id, start_time, end_time, login, password, remote, remote_port, ts, success, command, DATE(start_time) FROM ssh_commands JOIN ssh_sessions ON ssh_commands.session_id = ssh_sessions.id WHERE NOT archived');
my $mark_command = $source->prepare('UPDATE ssh_commands SET archived = TRUE WHERE id = ?');
my $store_command = $destination->prepare('INSERT INTO ssh_commands (session, timestamp, success, command) VALUES (?, ?, ?, ?)');
# Make sure the params are considered the correct type.
......@@ -465,7 +509,7 @@ if (fork == 0) {
$get_commands->execute;
my $count_commands = 0;
my $count_sessions = 0;
while (my ($id, $start, $end, $login, $password, $remote, $remote_port, $time, $success, $command) = $get_commands->fetchrow_array) {
while (my ($id, $start, $end, $login, $password, $remote, $remote_port, $time, $success, $command, $date) = $get_commands->fetchrow_array) {
my $sid = $sessions{$start}->{$login}->{$password};
if (not defined $sid) {
$get_session->execute($start, $login, $password);
......@@ -476,6 +520,7 @@ if (fork == 0) {
$store_session->execute($start, $end, $login, $password, $remote, $remote_port);
($sid) = $store_session->fetchrow_array;
$count_sessions ++;
incident $remote, $date, 1, 'ssh';
}
$sessions{$start}->{$login}->{$password} = $sid;
}
......@@ -485,6 +530,7 @@ if (fork == 0) {
}
$destination->commit;
$source->commit;
incident_finish;
tprint "Archived $count_sessions SSH sessions and $count_commands commands\n";
exit;
}
......@@ -492,16 +538,19 @@ if (fork == 0) {
if (fork == 0) {
my $source = connect_db 'source';
my $destination = connect_db 'destination';
incident_init 'telnet';
my ($max_date) = $destination->selectrow_array("SELECT DATE(COALESCE(MAX(date), TO_TIMESTAMP(0))) FROM fake_attackers");
$destination->do("DELETE FROM fake_attackers WHERE date >= ?", undef, $max_date);
my $get_attackers = $source->prepare("SELECT DATE(timestamp), server, remote, COUNT(CASE WHEN event = 'login' THEN true END), COUNT(CASE WHEN event = 'connect' THEN true END) FROM fake_logs WHERE DATE(timestamp) >= ? GROUP BY remote, server, DATE(timestamp)");
$get_attackers->execute($max_date);
my $put_attacker = $destination->prepare("INSERT INTO fake_attackers (date, server, remote, attempt_count, connect_count) VALUES (?, ?, ?, ?, ?)");
my $attackers = -1;
$put_attacker->execute_for_fetch(sub {
while (my @data = $get_attackers->fetchrow_array) {
$attackers ++;
return $get_attackers->fetchrow_arrayref;
});
my ($date, $server, $remote, $attempt_count) = @data;
incident $remote, $date, $attempt_count, $server if $attempt_count > 0;
$put_attacker->execute(@data);
}
tprint "Archived $attackers fake attacker stats\n";
$destination->do("DELETE FROM fake_passwords WHERE timestamp >= ?", undef, $max_date);
my $get_passwords = $source->prepare("SELECT timestamp, server, remote, name, password, remote_port FROM fake_logs WHERE name IS NOT NULL AND password IS NOT NULL AND event = 'login' AND timestamp >= ?");
......@@ -531,6 +580,7 @@ if (fork == 0) {
tprint "Archived $activity_count fake server activity statistics\n";
$destination->commit;
$source->commit;
incident_finish;
exit;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment