Verified Commit 3c90fb84 authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner
Browse files

Merge branch 'master' of gitlab.labs.nic.cz:turris/ucollect

parents e077409a f45aad3e
......@@ -905,31 +905,23 @@ struct uplink *uplink_create(struct loop *loop) {
ulog(LLOG_INFO, "Creating uplink\n");
struct mem_pool *permanent_pool = loop_permanent_pool(loop);
struct uplink *result = mem_pool_alloc(permanent_pool, sizeof *result);
z_stream strm_compress;
strm_compress.zalloc = Z_NULL;
strm_compress.zfree = Z_NULL;
strm_compress.opaque = Z_NULL;
if (deflateInit(&strm_compress, COMPRESSION_LEVEL) != Z_OK)
die("Could not initialize zlib (compression stream)\n");
z_stream strm_decompress;
strm_decompress.zalloc = Z_NULL;
strm_decompress.zfree = Z_NULL;
strm_decompress.opaque = Z_NULL;
strm_decompress.avail_in = 0;
if (inflateInit(&strm_decompress) != Z_OK)
die("Could not initialize zlib (decompression stream)\n");
unsigned char *incoming_buffer = mem_pool_alloc(permanent_pool, COMPRESSION_BUFFSIZE);
*result = (struct uplink) {
.uplink_read = uplink_read,
.loop = loop,
.buffer_pool = loop_pool_create(loop, NULL, mem_pool_printf(loop_temp_pool(loop), "Buffer pool for uplink")),
.fd = -1,
.zstrm_send = strm_compress,
.zstrm_recv = strm_decompress,
.inc_buffer = incoming_buffer,
.inc_buffer_size = COMPRESSION_BUFFSIZE
};
//fields zalloc, zfree and opaque that are required to be set before calling deflateInit (resp. inflateInit) were set to zero (Z_NULL) by the initializer above
if (deflateInit(&(result->zstrm_send), COMPRESSION_LEVEL) != Z_OK)
die("Could not initialize zlib (compression stream)\n");
//for zstrm_recv it's also necessary to set zstrm_recv.avail_in to 0, this was done by the initializer as well
if (inflateInit(&(result->zstrm_recv)) != Z_OK)
die("Could not initialize zlib (decompression stream)\n");
loop_uplink_set(loop, result);
return result;
}
......
......@@ -35,7 +35,7 @@ import os
# If we have too many background threads, the GIL slows down the
# main thread and cleants start dropping because we are not able
# to keep up with pings.
reactor.suggestThreadPoolSize(3)
reactor.suggestThreadPoolSize(5)
severity = master_config.get('log_severity')
if severity == 'TRACE':
severity = log_extra.TRACE_LEVEL
......
#
# Ucollect - small utility for real-time analysis of network data
# Copyright (C) 2013,2015 CZ.NIC, z.s.p.o. (http://www.nic.cz/)
# Copyright (C) 2013-2017 CZ.NIC, z.s.p.o. (http://www.nic.cz/)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -22,8 +22,13 @@ import logging
import threading
import traceback
import time
import datetime
import monotonic
from master_config import get
CACHE_TIME = 600 # We cache the database calibration for 10 minutes
LONG_TRANSACTION_TRESHOLD = 10 # Transactions here should not take longer than 10 seconds, we warn for longer
logger = logging.getLogger(name='database')
class __CursorContext:
......@@ -34,6 +39,7 @@ class __CursorContext:
self.__connection = connection
self.__depth = 0
self.reuse()
self.__start_time = None
def reuse(self):
self._cursor = self.__connection.cursor()
......@@ -41,6 +47,7 @@ class __CursorContext:
def __enter__(self):
if not self.__depth:
logger.debug('Entering transaction %s', self)
self.__start_time = monotonic.monotonic()
self.__depth += 1
return self._cursor
......@@ -48,6 +55,10 @@ class __CursorContext:
self.__depth -= 1
if self.__depth:
return # Didn't exit all the contexts yet
duration = monotonic.monotonic() - self.__start_time
if duration > LONG_TRANSACTION_TRESHOLD:
logger.warn('The transaction took a long time (%s seconds): %s', duration, traceback.format_stack())
self.__start_time = None
if exc_type:
logger.error('Rollback of transaction %s:%s/%s/%s', self, exc_type, exc_val, traceback.format_tb(exc_tb))
self.__connection.rollback()
......@@ -112,12 +123,22 @@ __time_update = 0
__time_db = 0
def now():
"""
Return the current database timestamp.
To minimise the number of accesses to the database (because it can be blocking
and it is on a different server), we cache the result for some time and adjust
it by local clock. We re-request the database timestamp from time to time, so
the database stays the authoritative source of time.
"""
global __time_update
global __time_db
t = time.time()
if __time_update + 2 < t:
t = monotonic.monotonic()
diff = t - __time_update
if diff > CACHE_TIME: # More than 10 minutes since the last update, request a new one
__time_update = t
diff = 0 # We request it now, so it is in sync
with transaction() as t:
t.execute("SELECT CURRENT_TIMESTAMP AT TIME ZONE 'UTC'");
(__time_db,) = t.fetchone()
return __time_db
return __time_db + datetime.timedelta(seconds=diff)
......@@ -15,6 +15,7 @@ DROP VIEW IF EXISTS fake_blacklist_concat;
DROP VIEW IF EXISTS fake_blacklist_uncached;
DROP VIEW IF EXISTS plugin_activity;
DROP VIEW IF EXISTS fake_blacklist_cache_fill;
DROP VIEW IF EXISTS ssh_blacklist_scores;
DROP TABLE IF EXISTS fake_blacklist_cache;
DROP TABLE IF EXISTS fwup_addresses;
DROP TABLE IF EXISTS fwup_sets;
......
......@@ -412,32 +412,33 @@ void init(struct context *context) {
// Windows settings
// Parameter count should be number that windows_count*window_length is at least 1 second
// WARNING: Minimal value of windows_count is 2!
size_t init = 0;
context->user_data->windows[init++] = init_window(context->permanent_pool, 500, 12, common_start_timestamp);
context->user_data->windows[init++] = init_window(context->permanent_pool, 1000, 6, common_start_timestamp);
context->user_data->windows[init++] = init_window(context->permanent_pool, 2000, 3, common_start_timestamp);
context->user_data->windows[init++] = init_window(context->permanent_pool, 5000, 2, common_start_timestamp);
context->user_data->windows[init++] = init_window(context->permanent_pool, 10000, 2, common_start_timestamp);
size_t init_windows = 0;
context->user_data->windows[init_windows++] = init_window(context->permanent_pool, 500, 12, common_start_timestamp);
context->user_data->windows[init_windows++] = init_window(context->permanent_pool, 1000, 6, common_start_timestamp);
context->user_data->windows[init_windows++] = init_window(context->permanent_pool, 2000, 3, common_start_timestamp);
context->user_data->windows[init_windows++] = init_window(context->permanent_pool, 5000, 2, common_start_timestamp);
context->user_data->windows[init_windows++] = init_window(context->permanent_pool, 10000, 2, common_start_timestamp);
size_t init_buckets = 0;
for (size_t i = 0; i < 1000; i += 250) {
context->user_data->in_buckets[init] = init_bucket(i);
context->user_data->out_buckets[init] = init_bucket(i);
init++;
context->user_data->in_buckets[init_buckets] = init_bucket(i);
context->user_data->out_buckets[init_buckets] = init_bucket(i);
init_buckets++;
}
for (size_t i = 1000; i <= 20000; i += 1000) {
context->user_data->in_buckets[init] = init_bucket(i);
context->user_data->out_buckets[init] = init_bucket(i);
init++;
context->user_data->in_buckets[init_buckets] = init_bucket(i);
context->user_data->out_buckets[init_buckets] = init_bucket(i);
init_buckets++;
}
for (size_t i = 30000; i <= 100000; i += 10000) {
context->user_data->in_buckets[init] = init_bucket(i);
context->user_data->out_buckets[init] = init_bucket(i);
init++;
context->user_data->in_buckets[init_buckets] = init_bucket(i);
context->user_data->out_buckets[init_buckets] = init_bucket(i);
init_buckets++;
}
for (size_t i = 200000; i <= 1000000; i += 100000) {
context->user_data->in_buckets[init] = init_bucket(i);
context->user_data->out_buckets[init] = init_bucket(i);
init++;
context->user_data->in_buckets[init_buckets] = init_bucket(i);
context->user_data->out_buckets[init_buckets] = init_bucket(i);
init_buckets++;
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment