Verified Commit 248fe80d authored by Martin Petráček's avatar Martin Petráček
Browse files

refactor archivation

parent ca02c20d
......@@ -50,60 +50,90 @@ archive_path = uci_get('pakon.archive.path') or '/srv/pakon/pakon-archive.db'
con = sqlite3.connect(archive_path, isolation_level = None, timeout = 300.0)
con.row_factory = sqlite3.Row
def squash(from_details, to_details, up_to, window, size_threshold):
def squash(from_details, to_details, rules):
global con
c = con.cursor()
now = int(time.mktime(datetime.datetime.utcnow().timetuple()))
start = now - up_to
start = now - rules['up_to']
inserted = 0
deleted = 0
if from_details == 'live':
for row_mac in c.execute('SELECT DISTINCT src_mac FROM live.traffic WHERE start < ? AND flow_id IS NULL', (start,)):
src_mac = row_mac['src_mac']
c2 = con.cursor()
for row_hostname in c2.execute('SELECT DISTINCT COALESCE(app_hostname,dest_ip) AS hostname FROM live.traffic WHERE src_mac = ? AND start < ? AND flow_id IS NULL', (src_mac, start)):
hostname = row_hostname['hostname']
(i, d) = squash_for_mac_and_hostname(src_mac, hostname, from_details, to_details, start, rules)
inserted += i
deleted += d
else:
for row_mac in c.execute('SELECT DISTINCT src_mac FROM traffic WHERE details = ? AND start < ?', (from_details,start,)):
src_mac = row_mac['src_mac']
c2 = con.cursor()
for row_hostname in c2.execute('SELECT DISTINCT COALESCE(app_hostname,dest_ip) AS hostname FROM traffic WHERE details = ? AND src_mac = ? AND start < ?', (from_details, src_mac, start)):
hostname = row_hostname['hostname']
(i, d) = squash_for_mac_and_hostname(src_mac, hostname, from_details, to_details, start, rules)
inserted += i
deleted += d
logging.info('deleted %d flows from detail level %s, inserted %d flows to detail level %d', deleted, str(from_details), inserted, to_details)
def squash_for_mac_and_hostname(src_mac, hostname, from_details, to_details, start, rules):
def add_to_insert(flow):
if flow['bytes_send'] + flow['bytes_received'] < rules['size_threshold']:
return
flow['detail'] = to_details
to_insert.append(flow)
def merge_flows(flow1, flow2):
if flow2['end'] > flow1['end']:
flow1['end'] = flow2['end']
flow1['duration'] = int(flow1['end'] - flow1['start'])
flow1['bytes_send'] += flow2['bytes_send']
flow1['bytes_received'] += flow2['bytes_received']
if flow1['src_ip'] != flow2['src_ip']:
flow1['src_ip'] = ''
if flow1['dest_ip'] != flow2['dest_ip']:
flow1['dest_ip'] = ''
if flow1['app_proto'] != flow2['app_proto']:
flow1['app_proto'] = ''
if flow1['app_hostname'] != flow2['app_hostname']:
flow1['app_hostname'] = ''
global con
c = con.cursor()
logging.debug("Squashing flows - from detail_level {} to detail_level {}".format(from_details, to_details))
to_be_deleted = []
for row in c.execute('SELECT rowid, start, (start+duration) AS end, duration, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto, bytes_send, bytes_received, app_hostname FROM traffic WHERE details = ? AND start < ? ORDER BY start', (from_details, start,)):
if row['rowid'] in to_be_deleted:
continue
logging.debug("trying:")
logging.debug(tuple(row))
current_start = float(row['start'])
current_end = float(row['end'])
current_bytes_send = int(row['bytes_send'])
current_bytes_received = int(row['bytes_received'])
src_ip = row['src_ip']
src_port = row['src_port']
dest_ip = row['dest_ip']
app_proto = row['app_proto']
app_hostname = row['app_hostname']
tmp = con.cursor()
first = True
for entry in tmp.execute('SELECT rowid, start, (start+duration) AS end, duration, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto, bytes_send, bytes_received, app_hostname FROM traffic WHERE details = ? AND start > ? AND start <= ? AND src_mac = ? AND dest_port = ? AND proto = ? ORDER BY start', (from_details, current_start, current_start+window, row['src_mac'], row['dest_port'], row['proto'])):
#hostname comparison done here (not in SQL query) because of None values
if entry['app_hostname']!=row['app_hostname']:
continue
#if hostname is Null, we only want to merge flows with equal dest_ip
if not entry['app_hostname'] and entry['dest_ip']!=row['dest_ip']:
continue
logging.debug("merging with:")
logging.debug(tuple(entry))
current_end = max(current_end, float(entry['end']))
current_bytes_send += int(entry['bytes_send'])
current_bytes_received += int(entry['bytes_received'])
if src_ip != entry['src_ip']:
src_ip = ''
if src_port != entry['src_port']:
src_port = ''
if dest_ip != entry['dest_ip']:
dest_ip = ''
if app_proto != entry['app_proto']:
app_proto = ''
if app_hostname != entry['app_hostname']:
app_hostname = ''
to_be_deleted.append(entry['rowid'])
if current_bytes_send + current_bytes_received > size_threshold:
tmp.execute('UPDATE traffic SET details = ?, duration = ?, src_ip = ?, src_port = ?, dest_ip = ?, app_proto = ?, bytes_send = ?, bytes_received = ?, app_hostname = ? WHERE rowid = ?', (to_details, int(current_end-current_start), src_ip, src_port, dest_ip, app_proto, current_bytes_send, current_bytes_received, app_hostname, row['rowid']))
if from_details == 'live':
result = c.execute('SELECT rowid, (start+duration) AS end, * FROM live.traffic WHERE src_mac = ? AND COALESCE(app_hostname,dest_ip) = ? AND start < ? AND flow_id IS NULL ORDER BY dest_port, start', (src_mac, hostname, start))
else:
result = c.execute('SELECT rowid, (start+duration) AS end, * FROM traffic WHERE details = ? AND src_mac = ? AND COALESCE(app_hostname,dest_ip) = ? AND start < ? ORDER BY dest_port, start', (from_details, src_mac, hostname, start))
to_delete = []
to_insert = []
current_flow = None
for row in result:
to_delete.append((row['rowid'],))
row = dict(row)
row['start'] = float(row['start'])
row['end'] = float(row['end'])
row['bytes_send'] = int(row['bytes_send'])
row['bytes_received'] = int(row['bytes_received'])
if not current_flow:
current_flow = row
elif current_flow['dest_port'] == row['dest_port'] and current_flow['end'] + int(rules['window']) > row['start']:
merge_flows(current_flow, row)
else:
to_be_deleted.append(row['rowid'])
for tbd in to_be_deleted:
c.execute('DELETE FROM traffic WHERE rowid = ?', (tbd,))
con.commit()
return len(to_be_deleted)
add_to_insert(current_flow)
current_flow = row
add_to_insert(current_flow)
con.execute('BEGIN')
con.executemany('INSERT INTO traffic (start, duration, details, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto, bytes_send, bytes_received, app_hostname)'
'VALUES (:start, :duration, :detail, :src_mac, :src_ip, :src_port, :dest_ip, :dest_port, :proto, :app_proto, :bytes_send, :bytes_received, :app_hostname)',
to_insert)
if from_details == 'live':
con.executemany('DELETE FROM live.traffic WHERE rowid = ?', to_delete)
else:
con.executemany('DELETE FROM traffic WHERE rowid = ?', to_delete)
con.execute('COMMIT')
return (len(to_insert), len(to_delete))
def load_archive_rules():
rules = []
......@@ -120,30 +150,17 @@ def load_archive_rules():
sorted(rules, key=lambda r: r["up_to"])
return rules
# Create database if it was empty
c = con.cursor()
now = int(time.mktime(datetime.datetime.utcnow().timetuple()))
start = now-3600*24 #move flows from live DB to archive after 24hours
c.execute('ATTACH DATABASE "/var/lib/pakon.db" AS live')
c.execute('BEGIN')
try:
c.execute('INSERT INTO traffic SELECT start, duration, 0, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto, bytes_send, bytes_received, app_hostname FROM live.traffic WHERE start < ? AND flow_id IS NULL', (start,))
logging.info("moved {} flows from live to archive".format(c.rowcount))
c.execute('DELETE FROM live.traffic WHERE start < ? AND flow_id IS NULL', (start,))
c.execute('COMMIT')
except sqlite3.Error:
c.execute('ROLLBACK')
logging.warn('could not move flows from live database to archive')
c.execute('SELECT COUNT(*) FROM live.traffic')
logging.info("{} flows remaining in live database".format(c.fetchone()[0]))
con.execute('ATTACH DATABASE "/var/lib/pakon.db" AS live')
start = 3600*24 #move flows from live DB to archive after 24hours
squash('live', 0, { "up_to": start, "window": 1, "size_threshold": 0 })
con.execute('VACUUM live')
c.execute('DETACH DATABASE live')
c.execute('SELECT COUNT(*) FROM live.traffic')
logging.info("{} flows remaining in live database".format(c.fetchone()[0]))
con.isolation_level = ''
con.execute('DETACH DATABASE live')
rules = load_archive_rules()
......@@ -155,15 +172,14 @@ if c.fetchall():
c.execute('UPDATE traffic SET details = 0')
for i in range(len(rules)):
deleted = squash(i, i+1, rules[i]["up_to"], rules[i]["window"], rules[i]["size_threshold"])
logging.info("squashed from {} to {} - deleted {}".format(i, i+1, deleted))
squash(i, i+1, rules[i])
now = int(time.mktime(datetime.datetime.utcnow().timetuple()))
c.execute('DELETE FROM traffic WHERE start < ?', (now - uci_get_time('pakon.archive.keep', '4w'),))
#c.execute('VACUUM')
#performing it every time is bad - it causes the whole database file to be rewritten
#TODO: think about when to do it, perform it once in a while?
con.commit()
for i in range(len(rules)+1):
c.execute('SELECT COUNT(*) FROM traffic WHERE details = ?', (i,))
logging.info("{} flows in archive on details level {}".format(c.fetchone()[0], i))
logging.info("{} flows remaining in archive on details level {}".format(c.fetchone()[0], i))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment