Verified Commit 9fee7c1d authored by Martin Petráček's avatar Martin Petráček
Browse files

monitor: split event handling into multiple functions

parent 3eac6e0e
......@@ -29,21 +29,66 @@ def timestamp2unixtime(timestamp):
timestamp = timestamp*1.0 + dt.microsecond*1.0/1000000
return timestamp
if not os.path.isfile('/var/lib/pakon.db'):
subprocess.call(['/usr/bin/python3', '/usr/libexec/pakon-light/create_db.py'])
con = sqlite3.connect('/var/lib/pakon.db')
def handle_dns(data, c):
if data['dns']['type'] == 'answer' and 'rrtype' in data['dns'].keys() and data['dns']['rrtype'] in ('A', 'AAAA', 'CNAME'):
logging.debug('Saving DNS data')
c.execute('INSERT INTO dns VALUES (?,?,?,?,?)',
(timestamp2unixtime(data['timestamp']),
data['dest_ip'], data['dns']['rrname'], data['dns']['rrtype'],
data['dns']['rdata']))
def handle_flow(data, c):
if data['proto'] not in ['TCP', 'UDP']:
return
if 'app_proto' not in data.keys():
data['app_proto'] = '?'
if data['app_proto'] in ['failed', 'dns'] or int(data['flow']['bytes_toserver'])==0 or int(data['flow']['bytes_toclient'])==0:
c.execute('DELETE FROM traffic WHERE flow_id = ?', (data['flow_id'],))
if c.rowcount!=1:
logging.debug("Can't delete flow")
else:
c.execute('UPDATE traffic SET duration = ?, app_proto = ?, bytes_send = ?, bytes_received = ?, flow_id = NULL WHERE flow_id = ?', (int(timestamp2unixtime(data['flow']['end'])-timestamp2unixtime(data['flow']['start'])), data['app_proto'], data['flow']['bytes_toserver'], data['flow']['bytes_toclient'], data['flow_id']))
if c.rowcount!=1:
logging.debug("Can't update flow")
def handle_tls(data, c):
hostname = ''
if 'sni' in data['tls'].keys():
hostname = data['tls']['sni']
elif 'subject' in data['tls'].keys():
hostname = data['tls']['subject']
#get only CN from suject
m = re.search('(?<=CN=)[^,]*', hostname)
if m:
hostname = m.group(0)
if not hostname:
return
c.execute('UPDATE traffic SET app_hostname = ?, app_proto = "tls" WHERE flow_id = ?', (hostname, data['flow_id']))
if c.rowcount!=1:
logging.debug("Can't update flow")
c = con.cursor()
# flow_ids are only unique (and meaningful) during one run of this script
try:
c.execute('UPDATE traffic SET flow_id = NULL, duration = 0, bytes_send = 0, bytes_received = 0 WHERE flow_id IS NOT NULL')
con.commit()
except:
logging.debug('Error cleaning flow_id')
def handle_http(data, c):
if 'hostname' not in data['http'].keys():
return
c.execute('UPDATE traffic SET app_hostname = ?, app_proto = "http" WHERE flow_id = ?', (data['http']['hostname'], data['flow_id']))
if c.rowcount!=1:
logging.debug("Can't update flow")
def handle_flow_start(data, c):
if data['proto'] not in ['TCP', 'UDP']:
return
if 'app_proto' not in data.keys():
data['app_proto'] = '?'
if data['app_proto'] in ['failed', 'dns']:
return
c.execute('INSERT INTO traffic (flow_id, start, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
(data['flow_id'], timestamp2unixtime(data['flow']['start']),
data['ether']['src'], data['src_ip'],
data['src_port'], data['dest_ip'], data['dest_port'],
data['proto'], data['app_proto']))
def exit_gracefully(signum, frame):
global c, con
global con, conntrack
conntrack.terminate()
time.sleep(1)
if not con:
......@@ -54,6 +99,18 @@ def exit_gracefully(signum, frame):
conntrack.kill()
sys.exit(0)
if not os.path.isfile('/var/lib/pakon.db'):
subprocess.call(['/usr/bin/python3', '/usr/libexec/pakon-light/create_db.py'])
con = sqlite3.connect('/var/lib/pakon.db')
c = con.cursor()
# flow_ids are only unique (and meaningful) during one run of this script
try:
c.execute('UPDATE traffic SET flow_id = NULL, duration = 0, bytes_send = 0, bytes_received = 0 WHERE flow_id IS NOT NULL')
con.commit()
except:
logging.debug('Error cleaning flow_id')
signal.signal(signal.SIGINT, exit_gracefully)
signal.signal(signal.SIGTERM, exit_gracefully)
conntrack=None
......@@ -89,66 +146,17 @@ while True:
data['ether']={}
data['ether']['src']=''
if data['event_type'] == 'dns' and data['dns']:
logging.debug('Got dns!')
if data['dns']['type'] == 'answer' and 'rrtype' in data['dns'].keys() and data['dns']['rrtype'] in ('A', 'AAAA', 'CNAME'):
logging.debug('Saving DNS data')
c.execute('INSERT INTO dns VALUES (?,?,?,?,?)',
(timestamp2unixtime(data['timestamp']),
data['dest_ip'], data['dns']['rrname'], data['dns']['rrtype'],
data['dns']['rdata']))
# Store final counters of flow - UPDATE flow, set duration, counters, app_proto and erase flow_id
if data['event_type'] == 'flow' and data['flow'] and data['proto'] in ['TCP', 'UDP']:
logging.debug('Got flow!')
if 'app_proto' not in data.keys():
data['app_proto'] = 'unknown'
if data['app_proto'] in ['failed', 'dns'] or int(data['flow']['bytes_toserver'])==0 or int(data['flow']['bytes_toclient'])==0:
c.execute('DELETE FROM traffic WHERE flow_id = ?', (data['flow_id'],))
if c.rowcount!=1:
logging.debug("Can't delete flow")
else:
c.execute('UPDATE traffic SET duration = ?, app_proto = ?, bytes_send = ?, bytes_received = ?, flow_id = NULL WHERE flow_id = ?', (int(timestamp2unixtime(data['flow']['end'])-timestamp2unixtime(data['flow']['start'])), data['app_proto'], data['flow']['bytes_toserver'], data['flow']['bytes_toclient'], data['flow_id']))
if c.rowcount!=1:
logging.debug("Can't update flow")
# Store TLS details of flow - UPDATE flow, set hostname and app_proto
if data['event_type'] == 'tls' and data['tls']:
logging.debug('Got tls!')
hostname = ''
if 'sni' in data['tls'].keys():
hostname = data['tls']['sni']
elif 'subject' in data['tls'].keys():
hostname = data['tls']['subject']
#get only CN from suject
m = re.search('(?<=CN=)[^,]*', hostname)
if m:
hostname = m.group(0)
if not hostname:
continue
c.execute('UPDATE traffic SET app_hostname = ?, app_proto = "tls" WHERE flow_id = ?', (hostname, data['flow_id']))
if c.rowcount!=1:
logging.debug("Can't update flow")
# Store HTTP details of flow - UPDATE flow, set hostname and app_proto
if data['event_type'] == 'http' and data['http']:
if 'hostname' not in data['http'].keys():
continue
c.execute('UPDATE traffic SET app_hostname = ?, app_proto = "http" WHERE flow_id = ?', (data['http']['hostname'], data['flow_id']))
if c.rowcount!=1:
logging.debug("Can't update flow")
# Store flow - INSERT with many fiels NULL/zeros
if data['event_type'] == 'flow_start' and data['flow'] and data['proto'] in ['TCP', 'UDP']:
if 'app_proto' not in data.keys():
data['app_proto'] = 'unknown'
if data['app_proto'] in ['failed', 'dns']:
continue
c.execute('INSERT INTO traffic (flow_id, start, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
(data['flow_id'], timestamp2unixtime(data['flow']['start']),
data['ether']['src'], data['src_ip'],
data['src_port'], data['dest_ip'], data['dest_port'],
data['proto'], data['app_proto']))
# Commit everything
handle_dns(data, c)
elif data['event_type'] == 'flow' and data['flow']:
handle_flow(data, c)
elif data['event_type'] == 'tls' and data['tls']:
handle_tls(data, c)
elif data['event_type'] == 'http' and data['http']:
handle_http(data, c)
elif data['event_type'] == 'flow_start' and data['flow']:
handle_flow_start(data, c)
else:
logging.warn("Unknown event type")
con.commit()
except KeyboardInterrupt:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment