Verified Commit 04e1addf authored by Martin Petráček's avatar Martin Petráček
Browse files

get data from suricata

As suricata doesn't provide notion whether src is local/remote, we consider 'local' only input devices passed as argv.
Devices are taken from suricata configuration and passed from init script
parent 7f28890d
Pipeline #10666 passed with stage
in 33 seconds
......@@ -22,6 +22,7 @@ import sys
import json
import time
import logging
import os
from dev_detect.mac_addr_store import MacAddrStore
logger = logging.getLogger('devdetect')
......@@ -32,70 +33,31 @@ formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
PAKOND_SOCKET='/tmp/pakon/socket'
SURICATA_SOCKET='/tmp/suricata/dev_detect.sock'
class PakondReader():
"""basically line reader, takes care of splitting stream from pakond into single json messages
also takes cares of connecting/reconnecting
"""
def __init__(self, socket_path):
self.socket_path = socket_path
self.__get_connection()
def __get_connection(self):
"""get connection to pakond (keep trying forever, wait 5 second before each next attempt"""
self.__buffer = b''
while True:
try:
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.connect(self.socket_path)
logger.debug("connected to pakond")
return
except OSError:
logger.debug("can't connect to pakond")
time.sleep(5)
def __reconnect(self):
"""close connection, connect again"""
logger.debug("reconnecting to pakond")
self.socket.close()
self.__get_connection()
def read_line(self):
"""keeps reading (blocking) until it gets (at least one) complete line, then it returns it"""
while b'\n' not in self.__buffer:
try:
data = self.socket.recv(2048)
if data:
self.__buffer += data
else:
logger.debug("connection to pakond closed")
self.__reconnect()
except InterruptedError: #EINTR
pass
except OSError:
logger.debug("connection to pakond lost")
self.__reconnect()
(line, self.__buffer) = self.__buffer.split(b'\n', 1)
return line.decode('utf-8', 'ignore')
def main():
reader = PakondReader(PAKOND_SOCKET)
mac_store = MacAddrStore()
def main(argv = sys.argv):
try:
os.unlink(SURICATA_SOCKET) #remove stray socket
except OSError:
pass
if len(argv) < 2:
logger.error("usage: {} interface [interface] ...".format(argv[0]))
return 1
interfaces = argv[1:]
suricata_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
suricata_sock.bind(SURICATA_SOCKET)
mac_store = MacAddrStore(interfaces)
while True:
try:
raw_data = reader.read_line()
raw_data = suricata_sock.recv(2048).decode('utf-8', 'ignore')
data = json.loads(raw_data)
if data["method"] == "pakon.flow-update":
mac_store.update(data["params"]["local"]["mac"], data["params"]["flags"]["src"])
elif data["method"] == "pakon.proto-version":
if data["params"]["version"] != 1:
logger.error("Incompatible version of pakond protocol")
sys.exit(1)
if data["event_type"] == "flow_start":
mac_store.update(data["ether"]["src"], data["in_dev"])
except ValueError:
logger.warn("received malformed record (invalid JSON) from pakond: {}".format(raw_data))
except KeyError as err:
if err.args[0] != "mac":
#We want to ignore missing keys "ether", "src" or "in_dev" - they may be missing, but we don't want warn about that
if err.args[0] != "ether" and err.args[0] != "src" and err.args[0] != "in_dev":
logger.warn("received malformed report (missing expected key) from pakond: {}".format(raw_data))
if __name__ == "__main__":
......
......@@ -32,11 +32,12 @@ STORE_FILE = '/usr/share/pakon-dev-detect/known_macs'
class MacAddrStore():
"""keeps MAC addresses as dictionary, MAC address is the key, timestamp is the value"""
def __init__(self):
def __init__(self, interfaces):
self.__load()
self.save_interval = 30*60 #TODO: move to configuration
self.mac_timeout = 60*60*24*30 #TODO: move to configuration
self.__last_save = int(time.time())
self.__interfaces = interfaces
def update(self, mac, device):
"""update MAC address (its timestamp).
......@@ -49,6 +50,8 @@ class MacAddrStore():
if mac in self.__known:
self.__known[mac] = max(int(time.time()), self.__known[mac])
else:
if device not in self.__interfaces:
return
t = threading.Thread(target=self.__new_device, args=(mac,device,))
t.daemon = True
t.start()
......
......@@ -12,7 +12,7 @@ classifiers = ['Development Status :: 4 - Beta',
setup(
name = 'dev_detect',
version = '1.1',
version = '1.2',
license = 'GPLv2+',
url = 'https://gitlab.labs.nic.cz/turris/devdetect',
author = 'Martin Petracek',
......
......@@ -27,37 +27,34 @@ import threading
import dev_detect.__main__ as main
import dev_detect.mac_addr_store as mac_addr_store
REPORT_VERSION_1 = b'{"jsonrpc":"2.0","method":"pakon.proto-version","params":{"version":1}}\n'
REPORT_FLOW_WITHOUT_MAC = b'{"jsonrpc":"2.0","method":"pakon.flow-update","params":{"verdict":{"action":"ACCEPT"},"id":"1493131200949-5982-3066682092-1","status":"new","starttime":1493131200949,"starttime_monotonic":5982711,"lasttime":1493131200949,"lasttime_monotonic":5982711,"direction":"OUT","family":"IPv4","local":{"ip":"192.168.1.123","name":{},"port":52282},"remote":{"ip":"123.123.123.123","name":{},"port":443},"ip_proto":"TCP","ip_proto_raw":6,"in":{"packets":0,"payload":0,"total":0,"closed":false},"out":{"packets":1,"payload":46,"total":98,"closed":false},"flags":{"src":"br-lan","dir":"out"}}}\n'
REPORT_FLOW_WITH_MAC = b'{"jsonrpc":"2.0","method":"pakon.flow-update","params":{"verdict":{"action":"ACCEPT"},"id":"1493131200949-5982-3066682092-1","status":"new","starttime":1493131200949,"starttime_monotonic":5982711,"lasttime":1493131200949,"lasttime_monotonic":5982711,"direction":"OUT","family":"IPv4","local":{"ip":"192.168.1.123","name":{},"mac":"d4:81:d7:00:00:01","port":52282},"remote":{"ip":"123.123.123.123","name":{},"port":443},"ip_proto":"TCP","ip_proto_raw":6,"in":{"packets":0,"payload":0,"total":0,"closed":false},"out":{"packets":1,"payload":46,"total":98,"closed":false},"flags":{"src":"br-lan","dir":"out"}}}\n'
REPORT_FLOW_WITHOUT_MAC = b'{"timestamp":"2017-07-24T12:56:58.236099+0200","flow_id":1249701575432771,"event_type":"flow_start","src_ip":"fe80:0000:0000:0000:0000:0000:0000:002c","dest_ip":"ff02:0000:0000:0000:0000:0000:0000:0002","proto":"IPv6-ICMP","icmp_type":131,"icmp_code":0}\n'
REPORT_FLOW_WITH_MAC = b'{"timestamp":"2017-07-24T12:57:45.400006+0200","flow_id":2164877504879238,"event_type":"flow_start","src_ip":"192.168.1.126","src_port":39592,"dest_ip":"192.168.123.123","dest_port":443,"proto":"TCP","ether":{"src":"d4:81:d7:00:01:02"},"in_dev":"br-lan"}\n'
def fake_pakond(sock_path):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(10)
sock.bind(sock_path)
sock.listen(1)
try:
conn, addr = sock.accept()
except socket.timeout:
print("client didn't connect")
sock.close()
os.remove(sock_path)
sys.exit(1)
conn.send(REPORT_VERSION_1)
conn.send(REPORT_FLOW_WITHOUT_MAC)
conn.send(REPORT_FLOW_WITH_MAC)
conn.makefile().flush()
def fake_suricata(sock_path):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
retry = 10
time.sleep(1)
while retry > 0:
try:
sock.connect(sock_path)
break
except:
time.sleep(1)
retry -= 1
sock.send(REPORT_FLOW_WITHOUT_MAC)
sock.send(REPORT_FLOW_WITH_MAC)
sock.makefile().flush()
time.sleep(1)
sock.close()
conn.close()
os.remove(sock_path)
def test():
mac_addr_store.STORE_FILE='./known_macs'
main.PAKOND_SOCKET='./pakond-dev-detect.sock'
p = Process(target = main.main)
main.SURICATA_SOCKET='./pakond-dev-detect.sock'
argv=["dev-detect", "br-lan"]
p = Process(target = main.main, args=(argv,))
p.start()
fake_pakond(main.PAKOND_SOCKET)
fake_suricata(main.SURICATA_SOCKET)
p.terminate() #from doc: "Terminate the process. On Unix this is done using the SIGTERM signal;"
p.join()
if os.path.exists(mac_addr_store.STORE_FILE):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment