Verified Commit 5383b63c authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

Store termination flags of TCP flows

This is the server part of storing the ands of biflows as well as their
starts (which we did from the very beginning).
parent 5d263480
......@@ -362,14 +362,14 @@ if (fork == 0) {
my $get_times = $source->prepare('SELECT DISTINCT tagged_on FROM biflows WHERE tagged_on > ? ORDER BY tagged_on');
tprint "Getting flows times tagged after $max_time\n";
$get_times->execute($max_time);
my $store_flow = $destination->prepare('INSERT INTO biflows (ip_remote, port_remote, port_local, tagged_on, proto, start_in, stop_in, start_out, stop_out, size_in, count_in, size_out, count_out, tag, seen_start_in, seen_start_out) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)');
my $store_flow = $destination->prepare('INSERT INTO biflows (ip_remote, port_remote, port_local, tagged_on, proto, start_in, stop_in, start_out, stop_out, size_in, count_in, size_out, count_out, tag, seen_start_in, seen_start_out, seen_end_in, seen_end_out, seen_rst) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)');
my $store_group = $destination->prepare('INSERT INTO biflow_groups (biflow, from_group) VALUES (?, ?)');
# Pre-filter the results so they don't contain the rand-% groups. But we keep the 'all'
# group in yet. This ensures we get every biflows at least once (otherwise we would
# have to juggle with outer joins. This is simlpler and less error prone, without
# too much overhead.
my $get_flows = $source->prepare("SELECT
group_members.in_group, biflows.id, ip_remote, port_remote, port_local, tagged_on, proto, start_in, stop_in, start_out, stop_out, size_in, count_in, size_out, count_out, tag, biflows.seen_start_in, biflows.seen_start_out
group_members.in_group, biflows.id, ip_remote, port_remote, port_local, tagged_on, proto, start_in, stop_in, start_out, stop_out, size_in, count_in, size_out, count_out, tag, biflows.seen_start_in, biflows.seen_start_out, biflows.seen_end_in, biflows.seen_end_out, biflows.seen_rst
FROM
biflows
JOIN
......
......@@ -261,6 +261,9 @@ CREATE TABLE biflows (
count_out INT NOT NULL,
seen_start_in BOOL NOT NULL,
seen_start_out BOOL NOT NULL,
seen_end_in BOOL,
seen_end_out BOOL,
seen_rst BOOL,
tag TEXT NOT NULL,
tagged_on TIMESTAMP NOT NULL,
CHECK(proto = 'T' OR proto = 'U'),
......
......@@ -287,6 +287,9 @@ CREATE TABLE biflows (
count_out INT NOT NULL,
seen_start_in BOOL NOT NULL,
seen_start_out BOOL NOT NULL,
seen_end_in BOOL,
seen_end_out BOOL,
seen_rst BOOL,
tag TEXT,
tagged_on TIMESTAMP,
FOREIGN KEY (client) REFERENCES clients(id),
......
......@@ -193,6 +193,9 @@ def store_flows(max_records, client, message, expect_conf_id, now):
udp = flags & 2
in_started = not not (flags & 4)
out_started = not not (flags & 8)
in_ended = not not (flags & 16)
out_ended = not not (flags & 32)
rst_seen = not not (flags & 64)
if v6:
size = 16
tp = socket.AF_INET6
......@@ -212,13 +215,13 @@ def store_flows(max_records, client, message, expect_conf_id, now):
logger.error("Time difference out of range for client %s: %s/%s", client, calib_time - v, v)
ok = False
if ok:
values.append((aloc, arem, ploc, prem, proto, now, calib_time - tbin if tbin > 0 else None, now, calib_time - tbout if tbout > 0 else None, now, calib_time - tein if tein > 0 else None, now, calib_time - teout if teout > 0 else None, cin, cout, sin, sout, in_started, out_started, client))
values.append((aloc, arem, ploc, prem, proto, now, calib_time - tbin if tbin > 0 else None, now, calib_time - tbout if tbout > 0 else None, now, calib_time - tein if tein > 0 else None, now, calib_time - teout if teout > 0 else None, cin, cout, sin, sout, in_started, out_started, in_ended, out_ended, rst_seen, client))
count += 1
if count > max_records:
logger.warn("Unexpectedly high number of flows in the message from client %s - %s connection, max expected %s. Ignoring.", client, count, max_records)
return
with database.transaction() as t:
t.executemany("INSERT INTO biflows (client, ip_local, ip_remote, port_local, port_remote, proto, start_in, start_out, stop_in, stop_out, count_in, count_out, size_in, size_out, seen_start_in, seen_start_out) SELECT clients.id, %s, %s, %s, %s, %s, %s - %s * INTERVAL '1 millisecond', %s - %s * INTERVAL '1 millisecond', %s - %s * INTERVAL '1 millisecond', %s - %s * INTERVAL '1 millisecond', %s, %s, %s, %s, %s, %s FROM clients WHERE clients.name = %s", values)
t.executemany("INSERT INTO biflows (client, ip_local, ip_remote, port_local, port_remote, proto, start_in, start_out, stop_in, stop_out, count_in, count_out, size_in, size_out, seen_start_in, seen_start_out, seen_end_in, seen_end_out, seen_rst) SELECT clients.id, %s, %s, %s, %s, %s, %s - %s * INTERVAL '1 millisecond', %s - %s * INTERVAL '1 millisecond', %s - %s * INTERVAL '1 millisecond', %s - %s * INTERVAL '1 millisecond', %s, %s, %s, %s, %s, %s, %s, %s, %s FROM clients WHERE clients.name = %s", values)
logger.debug("Stored %s flows for %s", count, client)
class FlowPlugin(plugin.Plugin, diff_addr_store.DiffAddrStore):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment