diff --git a/src/master/archivist/archivist.pl b/src/master/archivist/archivist.pl index 2abda60841fcf9d286d281bf3e4e3db6208700d6..2cda66f25d6c4d2ac0348d6b431ab96e17f94110 100755 --- a/src/master/archivist/archivist.pl +++ b/src/master/archivist/archivist.pl @@ -3,6 +3,7 @@ use common::sense; use DBI; use Config::IniFiles; use List::Util qw(sum); +use Date::Format; # First connect to databases my $cfg = Config::IniFiles->new(-file => $ARGV[0]); @@ -28,8 +29,13 @@ my %config_tables = ( cert_requests => [qw(id host port starttls want_cert want_chain want_details want_params)], ); +sub tprint(@) { + my $date = time2str('%H:%M:%S', time); + print $date, "\t", @_; +} + while (my($table, $columns) = each %config_tables) { - print "Syncing config table $table\n"; + tprint "Syncing config table $table\n"; # Dump the whole table from both databases my $columns_commas = join ', ', @$columns; my $select = "SELECT $columns_commas FROM $table"; @@ -71,7 +77,7 @@ if (fork == 0) { # Migrate anomalies. # First, look for the newest one stored. They are stored in batches in transaction some time from each other, so we wont lose anything. my ($max_anom) = $destination->selectrow_array('SELECT COALESCE(MAX(timestamp), TO_TIMESTAMP(0)) FROM anomalies'); - print "Getting anomalies newer than $max_anom\n"; + tprint "Getting anomalies newer than $max_anom\n"; # Keep reading and putting it to the other DB my $store_anomaly = $destination->prepare('INSERT INTO anomalies (from_group, type, timestamp, value, relevance_count, relevance_of, strength) VALUES (?, ?, ?, ?, ?, ?, ?)'); my $get_anomalies = $source->prepare('SELECT from_group, type, timestamp, value, relevance_count, relevance_of, strength FROM anomalies WHERE timestamp > ?'); @@ -81,7 +87,7 @@ if (fork == 0) { $count ++; return $get_anomalies->fetchrow_arrayref; }); - print "Stored $count anomalies\n"; + tprint "Stored $count anomalies\n"; $destination->commit; $source->commit; exit; @@ -92,7 +98,7 @@ if (fork == 0) { my $destination = connect_db 'destination'; # Migrate the counts. We let the source database do the aggregation. my ($max_count) = $destination->selectrow_array('SELECT COALESCE(MAX(timestamp), TO_TIMESTAMP(0)) FROM count_snapshots'); - print "Getting counts newer than $max_count\n"; + tprint "Getting counts newer than $max_count\n"; # Select all the counts for snapshots and expand the snapshots. # Then join with the groups and generate all pairs count-group for the client it is in the group. # Then aggregate over the (type, time, group) and produce that. @@ -134,7 +140,7 @@ if (fork == 0) { $stat_count ++; } - print "Stored $stat_count count statistics in $snap_count snapshots\n"; + tprint "Stored $stat_count count statistics in $snap_count snapshots\n"; $destination->commit; $source->commit; exit; @@ -152,7 +158,7 @@ if (fork == 0) { # the packets won't arrive too late. my ($loc_max) = $source->selectrow_array("SELECT MAX(time) - INTERVAL '3 hours' FROM router_loggedpacket"); my ($rem_max) = $destination->selectrow_array('SELECT COALESCE(MAX(time), TO_TIMESTAMP(0)) FROM firewall_packets'); - print "Going to store firewall logs between $rem_max and $loc_max\n"; + tprint "Going to store firewall logs between $rem_max and $loc_max\n"; # Get the packets. Each packet may have multiple resulting lines, # for multiple groups it is in. Prefilter the groups, we are not # interested in the random ones. We still have the 'all' group @@ -167,7 +173,7 @@ if (fork == 0) { WHERE time > ? AND time <= ? AND groups.name NOT LIKE 'rand-%' ORDER BY id "); - print "Getting new firewall packets\n"; + tprint "Getting new firewall packets\n"; $get_packets->execute($rem_max, $loc_max); my $store_packet = $destination->prepare('INSERT INTO firewall_packets (rule_id, time, direction, port_rem, addr_rem, port_loc, protocol, count, tcp_flags) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)'); my $packet_group = $destination->prepare('INSERT INTO firewall_groups (packet, for_group) VALUES (?, ?)'); @@ -187,7 +193,7 @@ if (fork == 0) { $packet_group->execute($id_dest, $group); } } - print "Stored $count packets\n"; + tprint "Stored $count packets\n"; $destination->commit; $source->commit; exit; @@ -199,9 +205,9 @@ if (fork == 0) { # Get the newest day stored. We'll overwrite this day (we assume there's overlap from the last time we archived). my ($max_act) = $destination->selectrow_array('SELECT COALESCE(MAX(activities.date), DATE(TO_TIMESTAMP(0))) FROM activities'); - print "Dropping archived anomalies at $max_act\n"; + tprint "Dropping archived anomalies at $max_act\n"; $destination->do('DELETE FROM activities WHERE activities.date = ?', undef, $max_act); - print "Getting activities not older than $max_act\n"; + tprint "Getting activities not older than $max_act\n"; # Keep reading and putting it to the other DB my $store_activity = $destination->prepare('INSERT INTO activities (date, activity, client, count) VALUES (?, ?, ?, ?)'); my $get_activities = $source->prepare('SELECT DATE(timestamp), activity, client, COUNT(id) FROM activities WHERE DATE(timestamp) >= ? GROUP BY activity, client, DATE(timestamp)'); @@ -211,7 +217,7 @@ if (fork == 0) { $count ++; return $get_activities->fetchrow_arrayref; }); - print "Stored $count activity summaries\n"; + tprint "Stored $count activity summaries\n"; $destination->commit; $source->commit; exit; @@ -221,9 +227,9 @@ if (fork == 0) { my $source = connect_db 'source'; my $destination = connect_db 'destination'; my ($max_batch) = $destination->selectrow_array('SELECT COALESCE(MAX(ping_stats.batch), TO_TIMESTAMP(0)) FROM ping_stats'); - print "Dropping pings from batch $max_batch\n"; + tprint "Dropping pings from batch $max_batch\n"; $destination->do('DELETE FROM ping_stats WHERE ping_stats.batch = ?', undef, $max_batch); - print "Getting pings not older than $max_batch\n"; + tprint "Getting pings not older than $max_batch\n"; my $store_stat = $destination->prepare('INSERT INTO ping_stats (batch, request, from_group, received, asked, resolved, min, max, avg) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)'); my $get_stat = $source->prepare('SELECT batch, request, group_members.in_group, SUM(received), COUNT(1), COUNT(ip), MIN(min), MAX(max), SUM(received * avg) / SUM(received) FROM pings JOIN group_members ON pings.client = group_members.client WHERE batch >= ? GROUP BY batch, request, group_members.in_group'); $get_stat->execute($max_batch); @@ -232,8 +238,8 @@ if (fork == 0) { $stat_cnt ++; return $get_stat->fetchrow_arrayref; }); - print "Stored $stat_cnt ping statistics\n"; - print "Getting IP address histograms since $max_batch\n"; + tprint "Stored $stat_cnt ping statistics\n"; + tprint "Getting IP address histograms since $max_batch\n"; my $store_hist = $destination->prepare('INSERT INTO ping_ips (ping_stat, ip, count) SELECT id, ?, ? FROM ping_stats WHERE batch = ? AND request = ? AND from_group = ?'); my $get_hist = $source->prepare('SELECT ip, COUNT(DISTINCT pings.client), batch, request, group_members.in_group FROM pings JOIN group_members ON pings.client = group_members.client WHERE batch >= ? GROUP BY request, batch, group_members.in_group, ip'); my $hist_cnt = -1; @@ -242,7 +248,7 @@ if (fork == 0) { $hist_cnt ++; return $get_hist->fetchrow_arrayref; }); - print "Stored $hist_cnt ping histograms\n"; + tprint "Stored $hist_cnt ping histograms\n"; $destination->commit; $source->commit; exit; @@ -252,9 +258,9 @@ if (fork == 0) { my $source = connect_db 'source'; my $destination = connect_db 'destination'; my ($max_batch) = $destination->selectrow_array('SELECT COALESCE(MAX(cert_histograms.batch), TO_TIMESTAMP(0)) FROM cert_histograms'); - print "Dropping certificates from batch $max_batch\n"; + tprint "Dropping certificates from batch $max_batch\n"; $destination->do('DELETE FROM cert_histograms WHERE cert_histograms.batch = ?', undef, $max_batch); - print "Getting certificates not older than $max_batch\n"; + tprint "Getting certificates not older than $max_batch\n"; my $store_hist = $destination->prepare('INSERT INTO cert_histograms (batch, request, from_group, cert, count) VALUES (?, ?, ?, ?, ?)'); my $get_hist = $source->prepare('SELECT batch, request, in_group, value, count(certs.client) FROM certs JOIN cert_chains ON cert_chains.cert = certs.id JOIN group_members ON group_members.client = certs.client WHERE cert_chains.ord = 0 AND batch >= ? GROUP BY group_members.in_group, certs.request, cert_chains.value, batch'); my $hist_count = -1; @@ -263,7 +269,7 @@ if (fork == 0) { $hist_count ++; return $get_hist->fetchrow_arrayref; }); - print "Stored $hist_count certificate histograms\n"; + tprint "Stored $hist_count certificate histograms\n"; $destination->commit; $source->commit; exit; @@ -273,7 +279,7 @@ if (fork == 0) { my $source = connect_db 'source'; my $destination = connect_db 'destination'; my ($max_time) = $destination->selectrow_array('SELECT COALESCE(MAX(bandwidth.timestamp), TO_TIMESTAMP(0)) FROM bandwidth'); - print "Getting bandwidth records newer than $max_time\n"; + tprint "Getting bandwidth records newer than $max_time\n"; my $store_band = $destination->prepare('INSERT INTO bandwidth (timestamp, from_group, win_len, in_min, out_min, in_max, out_max, in_avg, out_avg, in_var, out_var) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'); my $get_band = $source->prepare('SELECT timestamp, in_group, win_len, MIN(input), MIN(output), MAX(input), MAX(output), AVG(input), AVG(output), STDDEV_POP(input), STDDEV_POP(output) FROM (SELECT timestamp, client, win_len, 1000000.0 * in_max / win_len AS input, 1000000.0 * out_max / win_len AS output FROM bandwidth) AS b JOIN group_members ON group_members.client = b.client WHERE timestamp > ? GROUP BY timestamp, in_group, win_len'); my $band_count = -1; @@ -282,9 +288,9 @@ if (fork == 0) { $band_count ++; return $get_band->fetchrow_arrayref; }); - print "Stored $band_count bandwidth records\n"; + tprint "Stored $band_count bandwidth records\n"; my ($max_time, $cur_time) = $destination->selectrow_array("SELECT COALESCE(MAX(bandwidth_avg.timestamp), TO_TIMESTAMP(0)), CURRENT_TIMESTAMP AT TIME ZONE 'UTC' FROM bandwidth_avg"); - print "Getting bandwidth stats newer than $max_time\n"; + tprint "Getting bandwidth stats newer than $max_time\n"; my $store_avg = $destination->prepare('INSERT INTO bandwidth_avg (timestamp, client, bps_in, bps_out) VALUES (?, ?, ?, ?)'); my $get_avg = $source->prepare("SELECT timestamp, client, in_time, in_bytes, out_time, out_bytes FROM bandwidth_stats WHERE timestamp > ? AND timestamp + INTERVAL '90 minutes' < ?"); $get_avg->execute($max_time, $cur_time); @@ -294,8 +300,8 @@ if (fork == 0) { $store_avg->execute($timestamp, $client, int($in_bytes / $in_time), int($out_bytes / $out_time)); $avg_cnt ++; } - print "Stored $avg_cnt bandwidth averages\n"; - print "Getting bandwidth sums newer than $max_time\n"; + tprint "Stored $avg_cnt bandwidth averages\n"; + tprint "Getting bandwidth sums newer than $max_time\n"; my $store_sum = $destination->prepare('INSERT INTO bandwidth_sums (timestamp, from_group, client_count, in_time, out_time, in_bytes, out_bytes) VALUES (?, ?, ?, ?, ?, ?, ?)'); my $get_sum = $source->prepare("SELECT timestamp, in_group, in_time, out_time, in_bytes, out_bytes FROM bandwidth_stats JOIN group_members ON bandwidth_stats.client = group_members.client WHERE timestamp > ? AND timestamp + '90 minutes' < ? ORDER BY timestamp, in_group"); $get_sum->execute($max_time, $cur_time); @@ -329,7 +335,7 @@ if (fork == 0) { $sum->(\@out_bytes, $out_bytes); } $submit->(); - print "Aggregated $record_cnt bandwidth records to $sum_cnt sums\n"; + tprint "Aggregated $record_cnt bandwidth records to $sum_cnt sums\n"; $destination->commit; $source->commit; exit; @@ -340,7 +346,7 @@ if (fork == 0) { my $destination = connect_db 'destination'; my $max_time = $destination->selectrow_array('SELECT COALESCE(MAX(biflows.tagged_on), TO_TIMESTAMP(0)) FROM biflows'); my $get_times = $source->prepare('SELECT DISTINCT tagged_on FROM biflows WHERE tagged_on > ? ORDER BY tagged_on'); - print "Getting flows times tagged after $max_time\n"; + tprint "Getting flows times tagged after $max_time\n"; $get_times->execute($max_time); my $store_flow = $destination->prepare('INSERT INTO biflows (ip_remote, port_remote, port_local, tagged_on, proto, start_in, stop_in, start_out, stop_out, size_in, count_in, size_out, count_out, tag, seen_start_in, seen_start_out) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'); my $store_group = $destination->prepare('INSERT INTO biflow_groups (biflow, from_group) VALUES (?, ?)'); @@ -385,7 +391,7 @@ if (fork == 0) { $source->commit; $destination->commit; } - print "Stored $fcount flows with $gcount group entries\n"; + tprint "Stored $fcount flows with $gcount group entries\n"; exit; } @@ -393,9 +399,9 @@ if (fork == 0) { my $source = connect_db 'source'; my $destination = connect_db 'destination'; my ($max_batch) = $destination->selectrow_array('SELECT COALESCE(MAX(batch), TO_TIMESTAMP(0)) FROM nat_counts'); - print "Dropping nats from batch $max_batch\n"; + tprint "Dropping nats from batch $max_batch\n"; $destination->do('DELETE FROM nat_counts WHERE batch = ?', undef, $max_batch); - print "Getting nat records not older than $max_batch\n"; + tprint "Getting nat records not older than $max_batch\n"; my $store_nat = $destination->prepare('INSERT INTO nat_counts (from_group, batch, v4direct, v4nat, v6direct, v6nat, total) VALUES(?, ?, ?, ?, ?, ?, ?)'); my $get_nats = $source->prepare('SELECT in_group, batch, COUNT(CASE WHEN nat_v4 = false THEN true END), COUNT(CASE WHEN nat_v4 = true THEN true END), COUNT(CASE WHEN nat_v6 = false THEN true END), COUNT(CASE WHEN nat_v6 = true THEN true END), COUNT(nats.client) FROM nats JOIN group_members ON nats.client = group_members.client WHERE batch >= ? GROUP BY batch, in_group'); my $nat_count = -1; @@ -404,7 +410,7 @@ if (fork == 0) { $nat_count ++; return $get_nats->fetchrow_arrayref; }); - print "Stored $nat_count nat counts\n"; + tprint "Stored $nat_count nat counts\n"; $destination->commit; $source->commit; exit; @@ -414,9 +420,9 @@ if (fork == 0) { my $source = connect_db 'source'; my $destination = connect_db 'destination'; my ($max_batch) = $destination->selectrow_array('SELECT COALESCE(MAX(batch), TO_TIMESTAMP(0)) FROM spoof_counts'); - print "Dropping spoofed packets from batch $max_batch\n"; + tprint "Dropping spoofed packets from batch $max_batch\n"; $destination->do('DELETE FROM spoof_counts WHERE batch = ?', undef, $max_batch); - print "Getting spoof records not older than $max_batch\n"; + tprint "Getting spoof records not older than $max_batch\n"; my $store_spoof = $destination->prepare('INSERT INTO spoof_counts (from_group, batch, reachable, spoofable) VALUES (?, ?, ?, ?)'); my $get_spoof = $source->prepare('SELECT in_group, batch, COUNT(CASE WHEN NOT spoofed THEN TRUE END), COUNT(CASE WHEN spoofed AND addr_matches THEN TRUE END) FROM spoof JOIN group_members ON group_members.client = spoof.client WHERE batch >= ? GROUP BY batch, in_group'); my $spoof_count = -1; @@ -425,7 +431,7 @@ if (fork == 0) { $spoof_count ++; return $get_spoof->fetchrow_arrayref; }); - print "Stored $spoof_count spoofed groups\n"; + tprint "Stored $spoof_count spoofed groups\n"; $destination->commit; $source->commit; exit; @@ -435,7 +441,7 @@ if (fork == 0) { my $source = connect_db 'source'; my $destination = connect_db 'destination'; my ($max_since) = $destination->selectrow_array("SELECT DATE_TRUNC('hour', COALESCE(MAX(since), TO_TIMESTAMP(0)) - INTERVAL '90 minutes') FROM refused_addrs"); - print "Dropping refused connections since $max_since\n"; + tprint "Dropping refused connections since $max_since\n"; $destination->do('DELETE FROM refused_addrs WHERE since >= ?', undef, $max_since); $destination->do('DELETE FROM refused_clients WHERE since >= ?', undef, $max_since); my $store_addr = $destination->prepare('INSERT INTO refused_addrs (addr, port, reason, since, until, conn_count, client_count) VALUES (?, ?, ?, ?, ?, ?, ?)'); @@ -454,7 +460,7 @@ if (fork == 0) { $client_count ++; return $get_client->fetchrow_arrayref; }); - print "Stored $addr_count refused addresses and $client_count clients\n"; + tprint "Stored $addr_count refused addresses and $client_count clients\n"; $destination->commit; $source->commit; exit; @@ -493,7 +499,7 @@ if (fork == 0) { } $destination->commit; $source->commit; - print "Archived $count_sessions SSH sessions and $count_commands commands\n"; + tprint "Archived $count_sessions SSH sessions and $count_commands commands\n"; exit; } @@ -510,7 +516,7 @@ if (fork == 0) { $attackers ++; return $get_attackers->fetchrow_arrayref; }); - print "Archived $attackers fake attacker stats\n"; + tprint "Archived $attackers fake attacker stats\n"; $destination->do("DELETE FROM fake_passwords WHERE timestamp >= ?", undef, $max_date); my $get_passwords = $source->prepare("SELECT timestamp, server, remote, name, password, remote_port FROM fake_logs WHERE name IS NOT NULL AND password IS NOT NULL AND event = 'login' AND timestamp >= ?"); $get_passwords->execute($max_date); @@ -520,7 +526,7 @@ if (fork == 0) { $passwords ++; return $get_passwords->fetchrow_arrayref; }); - print "Archived $passwords password attempts\n"; + tprint "Archived $passwords password attempts\n"; $destination->do("DELETE FROM fake_server_activity WHERE date >= ?", undef, $max_date); my $get_activity = $source->prepare("SELECT DATE(timestamp), server, client, COUNT(CASE WHEN event = 'login' THEN true END), COUNT(CASE WHEN event = 'connect' THEN true END) FROM fake_logs WHERE timestamp >= ? GROUP BY DATE(timestamp), server, client"); $get_activity->execute($max_date); @@ -530,7 +536,7 @@ if (fork == 0) { $activity_count ++; return $get_activity->fetchrow_arrayref; }); - print "Archived $activity_count fake server activity statistics\n"; + tprint "Archived $activity_count fake server activity statistics\n"; $destination->commit; $source->commit; exit;