Verified Commit ca02c20d authored by Martin Petráček's avatar Martin Petráček
Browse files

monitor: switch sqlite to autocommit mode

parent eb98306c
......@@ -178,27 +178,23 @@ def new_device_notify(mac, iface):
thread.daemon = True
thread.start()
def handle_dns(data, c):
def handle_dns(data, con):
if data['dns']['type'] == 'answer' and 'rrtype' in data['dns'].keys() and data['dns']['rrtype'] in ('A', 'AAAA', 'CNAME'):
logging.debug('Saving DNS data')
dev, mac=get_dev_mac(data['dest_ip'])
dns_cache.set(mac, data['dns']['rrname'], data['dns']['rdata'])
def handle_flow(data, c):
def handle_flow(data, con):
if data['proto'] not in ['TCP', 'UDP']:
return
if 'app_proto' not in data.keys() or data['app_proto'] == 'failed':
data['app_proto'] = '?'
if data['app_proto'] == 'dns' or int(data['flow']['bytes_toserver'])==0 or int(data['flow']['bytes_toclient'])==0:
c.execute('DELETE FROM traffic WHERE flow_id = ?', (data['flow_id'],))
if c.rowcount!=1:
logging.debug("Can't delete flow")
con.execute('DELETE FROM traffic WHERE flow_id = ?', (data['flow_id'],))
else:
c.execute('UPDATE traffic SET duration = ?, app_proto = ?, bytes_send = ?, bytes_received = ?, flow_id = NULL WHERE flow_id = ?', (int(timestamp2unixtime(data['flow']['end'])-timestamp2unixtime(data['flow']['start'])), data['app_proto'], data['flow']['bytes_toserver'], data['flow']['bytes_toclient'], data['flow_id']))
if c.rowcount!=1:
logging.debug("Can't update flow")
con.execute('UPDATE traffic SET duration = ?, app_proto = ?, bytes_send = ?, bytes_received = ?, flow_id = NULL WHERE flow_id = ?', (int(timestamp2unixtime(data['flow']['end'])-timestamp2unixtime(data['flow']['start'])), data['app_proto'], data['flow']['bytes_toserver'], data['flow']['bytes_toclient'], data['flow_id']))
def handle_tls(data, c):
def handle_tls(data, con):
hostname = ''
if 'sni' in data['tls'].keys():
hostname = data['tls']['sni']
......@@ -209,18 +205,14 @@ def handle_tls(data, c):
hostname = m.group(0)
if not hostname:
return
c.execute('UPDATE traffic SET app_hostname = ?, app_proto = "tls" WHERE flow_id = ?', (domain_replace.replace(hostname), data['flow_id']))
if c.rowcount!=1:
logging.debug("Can't update flow")
con.execute('UPDATE traffic SET app_hostname = ?, app_proto = "tls" WHERE flow_id = ?', (domain_replace.replace(hostname), data['flow_id']))
def handle_http(data, c):
def handle_http(data, con):
if 'hostname' not in data['http'].keys():
return
c.execute('UPDATE traffic SET app_hostname = ?, app_proto = "http" WHERE flow_id = ?', (domain_replace.replace(data['http']['hostname']), data['flow_id']))
if c.rowcount!=1:
logging.debug("Can't update flow")
con.execute('UPDATE traffic SET app_hostname = ?, app_proto = "http" WHERE flow_id = ?', (domain_replace.replace(data['http']['hostname']), data['flow_id']))
def handle_flow_start(data, notify_new_devices, c):
def handle_flow_start(data, notify_new_devices, con):
dev, mac=get_dev_mac(data['src_ip'])
if data['proto'] not in ['TCP', 'UDP']:
return
......@@ -238,10 +230,8 @@ def handle_flow_start(data, notify_new_devices, c):
if hostname:
logging.debug('Got hostname from cached DNS: {}'.format(hostname))
hostname = domain_replace.replace(hostname.lower())
c.execute('INSERT INTO traffic (flow_id, start, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto, app_hostname) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
(data['flow_id'], timestamp2unixtime(data['flow']['start']),
mac, data['src_ip'],
data['src_port'], data['dest_ip'], data['dest_port'],
con.execute('INSERT INTO traffic (flow_id, start, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto, app_hostname) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
(data['flow_id'], timestamp2unixtime(data['flow']['start']), mac, data['src_ip'], data['src_port'], data['dest_ip'], data['dest_port'],
data['proto'], data['app_proto'], hostname))
def exit_gracefully(signum, frame):
......@@ -315,21 +305,20 @@ def main():
global allowed_interfaces, data_source
archive_path = uci_get('pakon.archive.path') or '/srv/pakon/pakon-archive.db'
dns_cache.try_load()
con = sqlite3.connect('/var/lib/pakon.db')
c = con.cursor()
# isolation_level=None for autocommit mode - we dont want long-lasting transactions
con = sqlite3.connect('/var/lib/pakon.db', isolation_level=None)
# flow_ids are only unique (and meaningful) during one run of this script
# flows with flow_id are incomplete, delete them
try:
c.execute('DELETE FROM traffic WHERE flow_id IS NOT NULL')
con.commit()
con.execute('DELETE FROM traffic WHERE flow_id IS NOT NULL')
except:
logging.debug('Error cleaning flow_id')
notify_new_devices = int(uci_get('pakon.monitor.notify_new_devices'))
if notify_new_devices:
c.execute('ATTACH ? AS archive', (archive_path,))
for row in c.execute('SELECT DISTINCT(src_mac) FROM traffic UNION SELECT DISTINCT(src_mac) FROM archive.traffic'):
con.execute('ATTACH ? AS archive', (archive_path,))
for row in con.execute('SELECT DISTINCT(src_mac) FROM traffic UNION SELECT DISTINCT(src_mac) FROM archive.traffic'):
known_devices.add(row[0])
c.execute('DETACH archive')
con.execute('DETACH archive')
if uci_get('pakon.monitor.mode').strip() == 'filter':
data_source = ConntrackScriptSource()
else:
......@@ -354,18 +343,17 @@ def main():
data['ether']={}
data['ether']['src']=''
if data['event_type'] == 'dns' and data['dns']:
handle_dns(data, c)
handle_dns(data, con)
elif data['event_type'] == 'flow' and data['flow']:
handle_flow(data, c)
handle_flow(data, con)
elif data['event_type'] == 'tls' and data['tls']:
handle_tls(data, c)
handle_tls(data, con)
elif data['event_type'] == 'http' and data['http']:
handle_http(data, c)
handle_http(data, con)
elif data['event_type'] == 'flow_start' and data['flow']:
handle_flow_start(data, notify_new_devices, c)
handle_flow_start(data, notify_new_devices, con)
else:
logging.warn("Unknown event type")
con.commit()
except KeyboardInterrupt:
exit_gracefully()
except IOError as e:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment