handler.py 6.63 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3

import os
import sys
import time
import datetime
import time
import sqlite3
import signal
import errno
import re
import json
import glob
14
import socketserver
15
16

def build_filter(query):
17
    now = time.time()
18
    if "start" in query:
19
        time_from = int(query["start"])
20
21
22
    else:
        time_from = 0
    if "end" in query:
23
        time_to = int(query["end"])
24
25
26
27
28
    else:
        time_to=now
    where_clause="(start BETWEEN ? AND ? OR (start+duration) BETWEEN ? AND ?)"
    where_parameters=[time_from, time_to, time_from, time_to]
    if "mac" in query:
29
30
31
        fill=['?' for m in query["mac"]]
        where_clause+=" AND src_mac IN ("+",".join(fill)+")"
        where_parameters+=query["mac"]
32
    if "hostname" in query:
33
34
35
        fill=['?' for m in query["hostname"]]
        where_clause+=" AND app_hostname IN ("+",".join(fill)+")"
        where_parameters+=query["hostname"]
36
37
38
39
40
41
42
43
44
    return (time_from, time_to, where_clause, where_parameters)


def load_ignores():
    ignored={}
    try:
        for fn in glob.glob("/usr/share/pakon-light/domains_ignore/*.txt"):
            with open(fn) as f:
                for line in f:
45
46
                    if not line or line[0]=='#':
                        continue
47
48
49
50
51
52
53
                    ignored[line.strip()]=1
    except IOError:
        print("can't load domains_ignore file")
    return ignored

ignored=load_ignores()

54
def is_ignored(hostname):
55
56
    if not hostname:
        return False
57
58
59
60
61
62
63
64
65
    if hostname in ignored:
        return True
    parts=hostname.split('.')
    while parts:
        if ".".join(parts) in ignored:
            return True
        parts=parts[1:]
    return False

66
def query(query):
67
68
69
70
71
72
73
74
    con = sqlite3.connect('/var/lib/pakon.db')
    c = con.cursor()
    c.execute('ATTACH DATABASE "/srv/pakon/pakon-archive.db" AS archive')
    try:
        query = json.loads(query)
    except ValueError:
        con.close()
        return '[]'
75
76
77
78
79
80
    (time_from, time_to, where_clause, where_parameters) = build_filter(query)
    aggregate = query["aggregate"] if "aggregate" in query else False
    filter = query["filter"] if "filter" in query else True
    domains = []
    if aggregate:
        last2 = [0,0]
81
        result=c.execute("""select start,duration,src_mac,app_hostname,(dest_port || '/' || lower(proto)) as dest_port,app_proto,bytes_send,bytes_received from traffic where flow_id IS NULL AND """+where_clause+"""
82
        UNION ALL
83
        select start,duration,src_mac,app_hostname,(dest_port || '/' || lower(proto)) as dest_port,app_proto,bytes_send,bytes_received from archive.traffic where """+where_clause+"""
84
        ORDER BY src_mac,app_hostname,dest_port,start""", where_parameters + where_parameters)
85
86
87
        last=c.fetchone()
        if last:
            last = [i for i in last]
88
89
        for row in result:
            row=[i for i in row]
90
91
            if filter and is_ignored(row[3]):
                continue
92
93
94
95
96
97
98
99
100
101
102
103
            if row[0]<time_from:
                not_contained=time_from-row[0]
                part=1.0*(row[1]-not_contained)/row[1]
                row[1]-=int(not_contained)
                row[6]=int(part*row[6])
                row[7]=int(part*row[7])
            if row[0]+row[1]>time_to:
                not_contained=row[0]+row[1]-time_to
                part=1.0*(row[1]-not_contained)/row[1]
                row[1]-=int(not_contained)
                row[6]=int(part*row[6])
                row[7]=int(part*row[7])
104
            if last[2]==row[2] and last[3]==row[3] and last[4]==row[4]:
105
106
107
108
109
                if row[0] > last2[1]:
                    last[1]+=int(last2[1]-last2[0])
                    last2=[row[0],row[0]+row[1]]
                else:
                    last2[1]=max(last2[1],row[0]+row[1])
110
                last[5]=(row[5] if row[5]==last[5] or last[5]=='?' else "?")
111
112
113
                last[6]+=int(row[6])
                last[7]+=int(row[7])
            else:
114
115
116
                if last[6]+last[7]>0:
                    domains.append(last)
                last=row
117
                last2 = [0,0]
118
        if last and last[6]+last[7]>0:
119
            domains.append(last)
120
121
        domains = sorted(domains, key=lambda x: x[6]+x[7])
    else:
122
        result = c.execute("""select start,duration,src_mac,coalesce(app_hostname,dest_ip) as app_hostname,(dest_port || '/' || lower(proto)) as dest_port,app_proto,bytes_send,bytes_received from traffic where flow_id IS NULL AND """+where_clause+"""
123
        UNION ALL
124
125
        select start,duration,src_mac,app_hostname,(dest_port || '/' || lower(proto)) as dest_port,app_proto,bytes_send,bytes_received from archive.traffic where """+where_clause+"""
        ORDER BY app_hostname,app_proto,start""", where_parameters + where_parameters)
126
127
128
        last=c.fetchone()
        if last:
            last = [i for i in last]
129
        for row in result:
130
131
            if not row[3]:
                continue
132
            if filter and is_ignored(row[3]):
133
134
135
136
                continue
            row=[i for i in row]
            if not row[3]:
                row[3]=''
137
138
            if last[3]==row[3] and last[4]==row[4]:
                if row[0] > last[0]+last[1]+1:
139
140
141
                    domains.append(last)
                else:
                    last[1]=max(last[1],int(row[0]-last[0]+row[1]))
142
                    last[5]=(row[5] if row[5]==last[5] or last[5]=='?' else "?")
143
144
145
146
147
148
                    last[6]+=row[6]
                    last[7]+=row[7]
                    continue
            else:
                domains.append(last)
            last=[i for i in row]
149
150
        if last:
            domains.append(last)
151
        domains = sorted(domains, key=lambda x: x[0])
152
153
154
155
156
157
158
159
    proto_ports = {'22/tcp': 'ssh', '80/tcp': 'http', '443/tcp': 'https', '53/tcp': 'dns', '53/udp': 'dns', '143/tcp': 'imap', '993/tcp': 'imaps', '587/tcp': 'smtp', '995/tcp': 'pop3s', '25/tcp': 'smtp', '465/tcp': 'smtps', '110/tcp': 'pop3'}
    #This is ugly hack for missing velues (due to aggregation). This should disappear in the future.
    proto_ports['/tcp']=''
    proto_ports['/udp']=''
    for d in domains:
        d[0]=datetime.datetime.fromtimestamp(d[0]).strftime('%Y-%m-%d %H:%M:%S')
        if d[4] in proto_ports:
            d[4]=proto_ports[d[4]]
160
    con.close()
161
    return json.dumps(domains)
162

163
164
165

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
166
167
        with self.request.makefile() as f:
            data = f.readline().strip()
168
169
170
171
172
173
174
175
176
177
178
179
180
181
        self.request.sendall((query(data)+"\n").encode())

def main():
    try:
        os.unlink("/var/run/pakon-query.sock")
    except OSError:
        pass
    server = socketserver.UnixStreamServer("/var/run/pakon-query.sock", ThreadedTCPRequestHandler)
    server.serve_forever()
    server.shutdown()
    server.server_close()

if __name__ == "__main__":
    main()