Verified Commit 35cfee58 authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

Merge branch 'flow-term-server'

parents fb9a4adc 5383b63c
......@@ -362,14 +362,14 @@ if (fork == 0) {
my $get_times = $source->prepare('SELECT DISTINCT tagged_on FROM biflows WHERE tagged_on > ? ORDER BY tagged_on');
tprint "Getting flows times tagged after $max_time\n";
$get_times->execute($max_time);
my $store_flow = $destination->prepare('INSERT INTO biflows (ip_remote, port_remote, port_local, tagged_on, proto, start_in, stop_in, start_out, stop_out, size_in, count_in, size_out, count_out, tag, seen_start_in, seen_start_out) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)');
my $store_flow = $destination->prepare('INSERT INTO biflows (ip_remote, port_remote, port_local, tagged_on, proto, start_in, stop_in, start_out, stop_out, size_in, count_in, size_out, count_out, tag, seen_start_in, seen_start_out, seen_end_in, seen_end_out, seen_rst) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)');
my $store_group = $destination->prepare('INSERT INTO biflow_groups (biflow, from_group) VALUES (?, ?)');
# Pre-filter the results so they don't contain the rand-% groups. But we keep the 'all'
# group in yet. This ensures we get every biflows at least once (otherwise we would
# have to juggle with outer joins. This is simlpler and less error prone, without
# too much overhead.
my $get_flows = $source->prepare("SELECT
group_members.in_group, biflows.id, ip_remote, port_remote, port_local, tagged_on, proto, start_in, stop_in, start_out, stop_out, size_in, count_in, size_out, count_out, tag, biflows.seen_start_in, biflows.seen_start_out
group_members.in_group, biflows.id, ip_remote, port_remote, port_local, tagged_on, proto, start_in, stop_in, start_out, stop_out, size_in, count_in, size_out, count_out, tag, biflows.seen_start_in, biflows.seen_start_out, biflows.seen_end_in, biflows.seen_end_out, biflows.seen_rst
FROM
biflows
JOIN
......
......@@ -261,6 +261,9 @@ CREATE TABLE biflows (
count_out INT NOT NULL,
seen_start_in BOOL NOT NULL,
seen_start_out BOOL NOT NULL,
seen_end_in BOOL,
seen_end_out BOOL,
seen_rst BOOL,
tag TEXT NOT NULL,
tagged_on TIMESTAMP NOT NULL,
CHECK(proto = 'T' OR proto = 'U'),
......
......@@ -286,6 +286,9 @@ CREATE TABLE biflows (
count_out INT NOT NULL,
seen_start_in BOOL NOT NULL,
seen_start_out BOOL NOT NULL,
seen_end_in BOOL,
seen_end_out BOOL,
seen_rst BOOL,
tag TEXT,
tagged_on TIMESTAMP,
FOREIGN KEY (client) REFERENCES clients(id),
......
......@@ -193,6 +193,9 @@ def store_flows(max_records, client, message, expect_conf_id, now):
udp = flags & 2
in_started = not not (flags & 4)
out_started = not not (flags & 8)
in_ended = not not (flags & 16)
out_ended = not not (flags & 32)
rst_seen = not not (flags & 64)
if v6:
size = 16
tp = socket.AF_INET6
......@@ -212,13 +215,13 @@ def store_flows(max_records, client, message, expect_conf_id, now):
logger.error("Time difference out of range for client %s: %s/%s", client, calib_time - v, v)
ok = False
if ok:
values.append((aloc, arem, ploc, prem, proto, now, calib_time - tbin if tbin > 0 else None, now, calib_time - tbout if tbout > 0 else None, now, calib_time - tein if tein > 0 else None, now, calib_time - teout if teout > 0 else None, cin, cout, sin, sout, in_started, out_started, client))
values.append((aloc, arem, ploc, prem, proto, now, calib_time - tbin if tbin > 0 else None, now, calib_time - tbout if tbout > 0 else None, now, calib_time - tein if tein > 0 else None, now, calib_time - teout if teout > 0 else None, cin, cout, sin, sout, in_started, out_started, in_ended, out_ended, rst_seen, client))
count += 1
if count > max_records:
logger.warn("Unexpectedly high number of flows in the message from client %s - %s connection, max expected %s. Ignoring.", client, count, max_records)
return
with database.transaction() as t:
t.executemany("INSERT INTO biflows (client, ip_local, ip_remote, port_local, port_remote, proto, start_in, start_out, stop_in, stop_out, count_in, count_out, size_in, size_out, seen_start_in, seen_start_out) SELECT clients.id, %s, %s, %s, %s, %s, %s - %s * INTERVAL '1 millisecond', %s - %s * INTERVAL '1 millisecond', %s - %s * INTERVAL '1 millisecond', %s - %s * INTERVAL '1 millisecond', %s, %s, %s, %s, %s, %s FROM clients WHERE clients.name = %s", values)
t.executemany("INSERT INTO biflows (client, ip_local, ip_remote, port_local, port_remote, proto, start_in, start_out, stop_in, stop_out, count_in, count_out, size_in, size_out, seen_start_in, seen_start_out, seen_end_in, seen_end_out, seen_rst) SELECT clients.id, %s, %s, %s, %s, %s, %s - %s * INTERVAL '1 millisecond', %s - %s * INTERVAL '1 millisecond', %s - %s * INTERVAL '1 millisecond', %s - %s * INTERVAL '1 millisecond', %s, %s, %s, %s, %s, %s, %s, %s, %s FROM clients WHERE clients.name = %s", values)
logger.debug("Stored %s flows for %s", count, client)
class FlowPlugin(plugin.Plugin, diff_addr_store.DiffAddrStore):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment