Skip to content
Snippets Groups Projects
Verified Commit b17a3bf3 authored by Martin Petráček's avatar Martin Petráček
Browse files

suricata-monitor: split trafic related stuff to pakon-light package

parent d763abdd
Branches
Tags
No related merge requests found
......@@ -9,7 +9,7 @@ include $(TOPDIR)/rules.mk
PKG_NAME:=suricata-monitor
PKG_VERSION:=1.0
PKG_RELEASE:=4
PKG_RELEASE:=5
PKG_SOURCE_VERSION:=v$(PKG_VERSION)
PKG_BUILD_DIR:=$(BUILD_DIR)/$(PKG_NAME)
......@@ -59,9 +59,6 @@ define Package/$(PKG_NAME)/install
$(INSTALL_DATA) ./files/monitor-uci $(1)/etc/config/suricata-monitor
$(INSTALL_DIR) $(1)/etc/cron.d/
$(INSTALL_DATA) ./files/monitor-cron $(1)/etc/cron.d/suricata-monitor
$(INSTALL_DIR) $(1)/usr/libexec/
$(INSTALL_DATA) ./files/fill_dns.py $(1)/usr/libexec/suricata-fill-dns.py
$(INSTALL_DATA) ./files/squash_flows.py $(1)/usr/libexec/suricata-squash_flows.py
endef
......
#!/usr/bin/env python
import os
import sys
import time
import datetime
import time
import sqlite3
import signal
import errno
from multiprocessing import Process, Queue
interval = 3600
dns_threads = 8
dns_timeout = 5
def timeout_handler():
raise Exception("Timeout!")
def reverse_lookup(q_in, q_out):
signal.signal(signal.SIGALRM, timeout_handler)
ip = ""
while ip is not None:
name = None
ip = q_in.get()
if ip is None:
break
signal.alarm(dns_timeout)
try:
name = socket.gethostbyaddr(ip)[0]
#print(ip+" -> "+name)
q_out.put((ip, name))
except:
#print("Can't resolve "+ip)
pass
signal.alarm(0)
con = sqlite3.connect('/var/lib/suricata-monitor.db')
c = con.cursor()
def get_name_from_cache(time, client, ip):
global con
t = con.cursor()
t.execute('SELECT name FROM dns WHERE time <= ? AND data = ? AND client = ? ORDER BY time DESC LIMIT 1', (time, ip, client))
dns_entry = t.fetchone()
name = None
while dns_entry:
name = dns_entry[0]
print(ip+" -> "+name)
t.execute('SELECT name FROM dns WHERE time <= ? AND data = ? AND client = ? AND type = "CNAME" ORDER BY time DESC LIMIT 1', (time, name, client))
dns_entry = t.fetchone()
return name
now = int(time.mktime(datetime.datetime.utcnow().timetuple()))
start = now-interval*2
print("Solving DNS")
print("Using DNS cache...")
cache = 0
reverse = 0
for row in c.execute('SELECT start, src_ip, dest_ip FROM traffic WHERE start >= ? AND app_hostname_type = 0', (start,)):
name = get_name_from_cache(row[0], row[1], row[2])
if name:
t = con.cursor()
t.execute('UPDATE traffic SET app_hostname = ?, app_hostname_type = 2 WHERE start = ?', (name, row[0]))
cache = cache + 1
print(str(cache)+" records filled from DNS cache")
print("Trying reverse lookups...")
con.commit()
q_in = Queue()
q_out = Queue()
for row in c.execute('SELECT DISTINCT(dest_ip) FROM traffic WHERE start > ? AND app_hostname_type = 0', (start, )):
q_in.put(row[0])
p = []
for i in range(0, dns_threads):
q_in.put(None)
tp = Process(target=reverse_lookup, args=(q_in,q_out))
p.append(tp)
tp.start()
for tp in p:
tp.join()
while not q_out.empty():
res = q_out.get()
t.execute('UPDATE traffic SET app_hostname = ?, app_hostname_type = 3 WHERE start > ? AND dest_ip = ? AND app_hostname = ""', (res[1], start, res[0]))
reverse = reverse + 1
print(str(reverse)+" records filled from reverse lookups")
con.commit()
c.execute('DELETE FROM dns WHERE time < ?', (now-12*3600, ))
con.commit()
......@@ -125,57 +125,27 @@ if con:
'category text, signature text, hostname text)')
except:
logging.debug('Table "alerts" already exists')
try:
c.execute('CREATE TABLE traffic '
'(start real primary key, duration integer, connections integer,'
'src_mac text, src_ip text, src_port integer, dest_ip text, dest_port integer, '
'proto text, app_proto text, bytes_send integer, '
'bytes_received integer, app_hostname text, app_hostname_type integer)')
#app_hostname_type: 0 - unknown, 1 - tls/http(app level), 2 - dns, 3 - reverse lookup
except:
logging.debug('Table "traffic" already exists')
try:
c.execute('CREATE TABLE dns '
'(time integer, client text, name text, type text, data text)')
except:
logging.debug('Table "dns" already exists')
try:
c.execute('CREATE TABLE settings '
'(key text, value integer)')
c.execute('INSERT INTO settings VALUES (?, ?)', ('db_schema_version', 1))
except:
logging.debug('Table "settings" already exists')
# Main loop
def exit_gracefully(signum, frame):
global c, con, active_flows
conntrack.terminate()
time.sleep(1)
if not con:
return
for flow in active_flows.itervalues():
c.execute('INSERT INTO traffic VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (flow[0], int(time.time()-flow[0]), 1, flow[1], flow[2], flow[3], flow[4], flow[5], flow[6], flow[7], 0, 0, flow[8], 1))
con.commit()
global server, con
if con:
con.close()
conntrack.kill()
server.close()
sys.exit(0)
signal.signal(signal.SIGINT, exit_gracefully)
signal.signal(signal.SIGTERM, exit_gracefully)
conntrack=None
devnull = open(os.devnull, 'w')
if os.path.exists( "/var/run/suricata_monitor.sock" ):
os.remove( "/var/run/suricata_monitor.sock" )
active_flows={}
logging.debug("Opening socket...")
try:
conntrack = subprocess.Popen(["/usr/bin/python3","/usr/bin/suricata_conntrack_flows.py","/var/run/suricata_monitor.sock"], shell=False, stdout=subprocess.PIPE, stderr=devnull)
except Exception as e:
logging.error("Can't run flows_conntrack.py")
logging.error(e)
sys.exit(1)
server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
server.bind("/var/run/suricata_monitor.sock")
logging.debug("Listening...")
......@@ -183,9 +153,9 @@ logging.debug("Listening...")
while True:
try:
logging.debug('Getting data...')
line = conntrack.stdout.readline()
line = server.recv(4092)
if not line:
break
continue
line = string.strip(line)
logging.debug(line)
if not line:
......@@ -226,73 +196,6 @@ while True:
data['alert']['category'], data['alert']['signature'],
data['hostname']))
# Handle all other traffic
if data['event_type'] == 'dns' and con:
logging.debug('Got dns!')
if data['dns'] and data['dns']['type'] == 'answer' and 'rrtype' in data['dns'].keys() and data['dns']['rrtype'] in ('A', 'AAAA', 'CNAME') and con:
c.execute('SELECT data FROM dns WHERE client = ? AND name = ? ORDER BY time LIMIT 1',
(data['dest_ip'], data['dns']['rrname']))
row = c.fetchone()
if row is None or row[0] != data['dns']['rdata']:
logging.debug('Saving DNS data')
if row:
logging.debug(' -> ' + row[0] + ' != ' + data['dns']['rdata'])
c.execute('INSERT INTO dns VALUES (?,?,?,?,?)',
(timestamp2unixtime(data['timestamp']),
data['dest_ip'], data['dns']['rrname'], data['dns']['rrtype'],
data['dns']['rdata']))
# Insert or update flow - it might already exist from TLS connection
if data['event_type'] == 'flow' and data['flow'] and con and data['proto'] in ['TCP', 'UDP']:
logging.debug('Got flow!')
if 'app_proto' not in data.keys():
data['app_proto'] = 'unknown'
if data['app_proto'] not in ['failed', 'dns']:
if data['flow_id'] in active_flows.keys():
hostname = active_flows[data['flow_id']][8]
del active_flows[data['flow_id']]
hostname_type = 1
else:
hostname = ''
hostname_type = 0
if 'src_port' not in data.keys():
data['src_port'] = ''
if int(data['flow']['bytes_toserver'])==0 or int(data['flow']['bytes_toclient'])==0:
continue
try:
c.execute('INSERT INTO traffic VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
(timestamp2unixtime(data['flow']['start']),
int(timestamp2unixtime(data['flow']['end'])-timestamp2unixtime(data['flow']['start'])), 1, data['ether']['src'], data['src_ip'],
data['src_port'], data['dest_ip'], data['dest_port'],
data['proto'], data['app_proto'], data['flow']['bytes_toserver'],
data['flow']['bytes_toclient'], hostname, hostname_type))
if c.rowcount!=1:
logging.error("Can't insert flow")
except Exception as e:
print(e)
# Store TLS details of flow
if data['event_type'] == 'tls' and data['tls'] and con:
logging.debug('Got tls!')
hostname = ''
if 'sni' in data['tls'].keys():
hostname = data['tls']['sni']
elif 'subject' in data['tls'].keys():
hostname = data['tls']['subject']
#get only CN from suject
m = re.search('(?<=CN=)[^,]*', hostname)
if m:
hostname = m.group(0)
if not hostname:
continue
active_flows[data['flow_id']]=(time.time(), data['ether']['src'], data['src_ip'], data['src_port'], data['dest_ip'], data['dest_port'], data['proto'], 'tls', hostname)
# Store HTTP details of flow
if data['event_type'] == 'http' and data['http'] and con:
if 'hostname' not in data['http'].keys():
continue
active_flows[data['flow_id']]=(time.time(), data['ether']['src'], data['src_ip'], data['src_port'],data['dest_ip'], data['dest_port'], data['proto'], 'http', data['http']['hostname'])
# Commit everything
if con:
con.commit()
......@@ -303,6 +206,3 @@ while True:
except IOError as e:
if e.errno != errno.EINTR:
raise
logging.error("End of data?")
logging.error("This may mean that suricata_conntrack_flows.py doesn't exist/is broken...")
#!/usr/bin/env python
import os
import sys
import time
import datetime
import sqlite3
import signal
import errno
from multiprocessing import Process, Queue
max_delay = 5
interval = 3600
con = sqlite3.connect('/var/lib/suricata-monitor.db')
c = con.cursor()
def squash(start):
global con
c = con.cursor()
print("Squashing flows...")
to_be_deleted = []
for row in c.execute('SELECT start, (start+duration) AS end, duration, connections, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto, bytes_send, bytes_received, app_hostname, app_hostname_type FROM traffic WHERE start >= ? ORDER BY start', (start,)):
if row[0] in to_be_deleted:
continue
print("trying:")
print(row)
current_start = float(row[0])
current_end = float(row[1])
current_connections = int(row[3])
current_bytes_send = int(row[11])
current_bytes_received = int(row[12])
mac = row[4]
src_port = row[6]
dest_port = row[8]
app_hostname_type = row[14]
count = 0
tmp = con.cursor()
for entry in tmp.execute('SELECT start, (start+duration) AS end, duration, connections, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto, bytes_send, bytes_received, app_hostname, app_hostname_type FROM traffic WHERE start > ? AND src_mac = ? AND src_ip = ? AND dest_ip = ? AND app_proto = ? AND app_hostname = ? ORDER BY start', (current_start, mac, row[5], row[7], row[10], row[13])):
if float(entry[0]) - max_delay > current_end:
break
print("joining with:")
print(entry)
current_end = max(current_end, float(entry[1]))
current_connections += int(entry[3])
current_bytes_send += int(entry[11])
current_bytes_received += int(entry[12])
if src_port!=entry[6]:
src_port = ''
if dest_port!=entry[8]:
dest_port = ''
if app_hostname_type!=entry[14]:
app_hostname_type = ''
count += 1
to_be_deleted.append(entry[0])
if count>0:
tmp.execute('UPDATE traffic SET duration = ?, connections = ?, src_port = ?, dest_port = ?, bytes_send = ?, bytes_received = ?, app_hostname_type = ? WHERE start = ?', (int(current_end-current_start), current_connections, src_port, dest_port, current_bytes_send, current_bytes_received, app_hostname_type, row[0]))
con.commit()
for tbd in to_be_deleted:
c.execute('DELETE FROM traffic WHERE start = ?', (tbd,))
con.commit()
return len(to_be_deleted)
now = int(time.mktime(datetime.datetime.utcnow().timetuple()))
start = now-interval*2
c = con.cursor()
c.execute('SELECT COUNT(*) FROM traffic WHERE start >= ?', (start, ))
count_entry = c.fetchone()
count=count_entry[0]
print("Squashing flows...")
deleted = squash(start)
print("Squashed {} entries (out of {} examined).".format(deleted, count))
c.execute('VACUUM FULL')
con.commit()
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment