archive.py 7.25 KB
Newer Older
1
#!/usr/bin/env python3
Michal Hrusecky's avatar
Michal Hrusecky committed
2
3
4
5
6
7
8
9
10
11
12
13
14

import fileinput
import os, os.path
import string
import socket
import sys
import subprocess
import re
import time
import datetime
import sqlite3
import signal
import errno
Martin Petráček's avatar
Martin Petráček committed
15
import logging
Michal Hrusecky's avatar
Michal Hrusecky committed
16

Martin Petráček's avatar
Martin Petráček committed
17
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
18
#logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
Michal Hrusecky's avatar
Michal Hrusecky committed
19

20
#TODO: replace with uci bindings - once available
Michal Hrusecky's avatar
Michal Hrusecky committed
21
def uci_get(opt):
22
    delimiter = '__uci__delimiter__'
Michal Hrusecky's avatar
Michal Hrusecky committed
23
24
25
    chld = subprocess.Popen(['/sbin/uci', '-d', delimiter, '-q', 'get', opt],
                            stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    out, err = chld.communicate()
26
    out = out.strip().decode('ascii','ignore')
Michal Hrusecky's avatar
Michal Hrusecky committed
27
28
29
30
31
    if out.find(delimiter) != -1:
        return out.split(delimiter)
    else:
        return out

32
def uci_get_time(opt, default = None):
33
34
35
36
    ret = 0
    text = uci_get(opt)
    if not text:
        text = default
37
    if text[-1:].upper() == 'M':
38
        ret = int(text[:-1]) * 60
39
    elif text[-1:].upper() == 'H':
40
        ret = int(text[:-1]) * 3600
41
    elif text[-1:].upper() == 'D':
42
        ret = int(text[:-1]) * 24 * 3600
43
44
    elif text[-1:].upper() == 'W':
        ret = int(text[:-1]) * 7 * 24 * 3600
45
46
47
48
    else:
        ret = int(text)
    return ret

Martin Petráček's avatar
Martin Petráček committed
49
archive_path = uci_get('pakon.archive.path') or '/srv/pakon/pakon-archive.db'
50
con = sqlite3.connect(archive_path, timeout = 300.0)
51
con.row_factory = sqlite3.Row
Michal Hrusecky's avatar
Michal Hrusecky committed
52

53
def squash(from_details, to_details, up_to, window, size_threshold):
Michal Hrusecky's avatar
Michal Hrusecky committed
54
    global con
55
56
    now = int(time.mktime(datetime.datetime.utcnow().timetuple()))
    start = now - up_to
Martin Petráček's avatar
Martin Petráček committed
57
    c = con.cursor()
Martin Petráček's avatar
Martin Petráček committed
58
    logging.debug("Squashing flows - from detail_level {} to detail_level {}".format(from_details, to_details))
Martin Petráček's avatar
Martin Petráček committed
59
60
    to_be_deleted = []
    for row in c.execute('SELECT rowid, start, (start+duration) AS end, duration, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto, bytes_send, bytes_received, app_hostname FROM traffic WHERE details = ? AND start < ? ORDER BY start', (from_details, start,)):
61
        if row['rowid'] in to_be_deleted:
Martin Petráček's avatar
Martin Petráček committed
62
            continue
Martin Petráček's avatar
Martin Petráček committed
63
        logging.debug("trying:")
64
        logging.debug(tuple(row))
65
66
67
68
69
70
71
72
73
        current_start = float(row['start'])
        current_end = float(row['end'])
        current_bytes_send = int(row['bytes_send'])
        current_bytes_received = int(row['bytes_received'])
        src_ip = row['src_ip']
        src_port = row['src_port']
        dest_ip = row['dest_ip']
        app_proto = row['app_proto']
        app_hostname = row['app_hostname']
Martin Petráček's avatar
Martin Petráček committed
74
        tmp = con.cursor()
75
76
77
78
79
        first = True
        for entry in tmp.execute('SELECT rowid, start, (start+duration) AS end, duration, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto, bytes_send, bytes_received, app_hostname FROM traffic WHERE details = ? AND start > ? AND start <= ? AND src_mac = ? AND dest_port = ? AND proto = ? ORDER BY start', (from_details, current_start, current_start+window, row['src_mac'], row['dest_port'], row['proto'])):
            #hostname comparison done here (not in SQL query) because of None values
            if entry['app_hostname']!=row['app_hostname']:
                continue
80
81
82
83
            #if hostname is Null, we only want to merge flows with equal dest_ip
            if not entry['app_hostname'] and entry['dest_ip']!=row['dest_ip']:
                continue
            logging.debug("merging with:")
84
            logging.debug(tuple(entry))
85
86
87
88
            current_end = max(current_end, float(entry['end']))
            current_bytes_send += int(entry['bytes_send'])
            current_bytes_received += int(entry['bytes_received'])
            if src_ip != entry['src_ip']:
Martin Petráček's avatar
Martin Petráček committed
89
                src_ip = ''
90
            if src_port != entry['src_port']:
Martin Petráček's avatar
Martin Petráček committed
91
                src_port = ''
92
            if dest_ip != entry['dest_ip']:
Martin Petráček's avatar
Martin Petráček committed
93
                dest_ip = ''
94
            if app_proto != entry['app_proto']:
Martin Petráček's avatar
Martin Petráček committed
95
                app_proto = ''
96
            if app_hostname != entry['app_hostname']:
Martin Petráček's avatar
Martin Petráček committed
97
                app_hostname = ''
98
            to_be_deleted.append(entry['rowid'])
99
100
101
102
        if current_bytes_send + current_bytes_received > size_threshold:
            tmp.execute('UPDATE traffic SET details = ?, duration = ?, src_ip = ?, src_port = ?, dest_ip = ?, app_proto = ?, bytes_send = ?, bytes_received = ?, app_hostname = ? WHERE rowid = ?', (to_details, int(current_end-current_start), src_ip, src_port, dest_ip, app_proto, current_bytes_send, current_bytes_received, app_hostname, row['rowid']))
        else:
            to_be_deleted.append(row['rowid'])
Martin Petráček's avatar
Martin Petráček committed
103
104
    for tbd in to_be_deleted:
        c.execute('DELETE FROM traffic WHERE rowid = ?', (tbd,))
105
    con.commit()
Martin Petráček's avatar
Martin Petráček committed
106
107
    return len(to_be_deleted)

108
109
110
111
112
113
def load_archive_rules():
    rules = []
    i = 0
    while uci_get("pakon.@archive_rule[{}].up_to".format(i)):
        up_to = uci_get_time("pakon.@archive_rule[{}].up_to".format(i))
        window = uci_get_time("pakon.@archive_rule[{}].window".format(i))
114
115
        size_threshold = int(uci_get("pakon.@archive_rule[{}].size_threshold".format(i)) or 0)
        rules.append( { "up_to": up_to, "window": window, "size_threshold": size_threshold })
116
        i = i + 1
117
118
119
    if not rules: #if there is no rule (old configuration?) - add one default rule
        rules.append( { "up_to": 86400, "window": 60, "size_threshold": 4096 })
        logging.info('no rules in configuration - using default {}'.format(str(rules[0])))
120
121
122
    sorted(rules, key=lambda r: r["up_to"])
    return rules

Michal Hrusecky's avatar
Michal Hrusecky committed
123
124
# Create database if it was empty
c = con.cursor()
Martin Petráček's avatar
Martin Petráček committed
125
now = int(time.mktime(datetime.datetime.utcnow().timetuple()))
Martin Petráček's avatar
Martin Petráček committed
126
start = now-3600*24 #move flows from live DB to archive after 24hours
Martin Petráček's avatar
Martin Petráček committed
127

128
c.execute('ATTACH DATABASE "/var/lib/pakon.db" AS live')
129
c.execute('INSERT INTO traffic SELECT start, duration, 0, src_mac, src_ip, src_port, dest_ip, dest_port, proto, app_proto, bytes_send, bytes_received, app_hostname FROM live.traffic WHERE start < ? AND flow_id IS NULL', (start,))
130
131
132
logging.info("moved {} flows from live to archive".format(c.rowcount))
c.execute('DELETE FROM live.traffic WHERE start < ? AND flow_id IS NULL', (start,))
con.commit()
133
134
135
136
137
138
139

#workaround for a bug in Python 3.6
#https://bugs.python.org/issue28518
con.isolation_level = None
con.execute('VACUUM live')
con.isolation_level = ''

140
141
142
143
144
145
146
147
148
149
rules = load_archive_rules()

#if the rules changed (there is detail level that can't be generated using current rules)
#reset everything to detail level 0 -> perform the whole archivation again
c.execute('SELECT DISTINCT(details) FROM traffic WHERE details > ?', (len(rules),))
if c.fetchall():
    logging.info('resetting all detail levels to 0')
    c.execute('UPDATE traffic SET details = 0')

for i in range(len(rules)):
150
151
    deleted = squash(i, i+1, rules[i]["up_to"], rules[i]["window"], rules[i]["size_threshold"])
    logging.info("squashed from {} to {} - deleted {}".format(i, i+1, deleted))
152
c.execute('DELETE FROM traffic WHERE start < ?', (now - uci_get_time('pakon.archive.keep', '4w'),))
153
154
155
156
157

#c.execute('VACUUM')
#performing it every time is bad - it causes the whole database file to be rewritten
#TODO: think about when to do it, perform it once in a while?

158
159
160
con.commit()
c.execute('SELECT COUNT(*) FROM live.traffic')
logging.info("{} flows in live database".format(c.fetchone()[0]))
161
162
163
for i in range(len(rules)+1):
    c.execute('SELECT COUNT(*) FROM traffic WHERE details = ?', (i,))
    logging.info("{} flows in archive on details level {}".format(c.fetchone()[0], i))