collect-master.py 3.83 KB
Newer Older
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
1
#!/usr/bin/python2
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
2
3
#
#    Ucollect - small utility for real-time analysis of network data
4
#    Copyright (C) 2013 CZ.NIC, z.s.p.o. (http://www.nic.cz/)
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#
#    This program is free software; you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation; either version 2 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License along
#    with this program; if not, write to the Free Software Foundation, Inc.,
#    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

21
from twisted.internet import reactor, protocol
22
23
24
from twisted.internet.endpoints import UNIXServerEndpoint
from twisted.internet.error import ReactorNotRunning
from subprocess import Popen
25
import log_extra
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
26
import logging
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
27
import logging.handlers
28
from client import ClientFactory
29
from plugin import Plugins, pool
30
import master_config
31
import activity
32
import importlib
33
import os
34

35
36
37
38
# If we have too many background threads, the GIL slows down the
# main thread and cleants start dropping because we are not able
# to keep up with pings.
reactor.suggestThreadPoolSize(3)
39
40
41
42
43
severity = master_config.get('log_severity')
if severity == 'TRACE':
	severity = log_extra.TRACE_LEVEL
else:
	severity = getattr(logging, severity)
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
44
log_file = master_config.get('log_file')
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
45
46
logging.basicConfig(level=severity, format=master_config.get('log_format'))
if log_file != '-':
47
48
49
	handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=int(master_config.get('log_file_size')), backupCount=int(master_config.get('log_file_count')))
	handler.setFormatter(logging.Formatter(fmt=master_config.get('log_format')))
	logging.getLogger().addHandler(handler)
50
51

loaded_plugins = {}
52
plugins = Plugins()
53
54
55
56
57
58
for (plugin, config) in master_config.plugins().items():
	(modulename, classname) = plugin.rsplit('.', 1)
	module = importlib.import_module(modulename)
	constructor = getattr(module, classname)
	loaded_plugins[plugin] = constructor(plugins, config)
	logging.info('Loaded plugin %s from %s', loaded_plugins[plugin].name(), plugin)
59
# Some configuration, to load the port from?
60
61
62
63
64
65
66
67
endpoint = UNIXServerEndpoint(reactor, './collect-master.sock')

socat = None

class Socat(protocol.ProcessProtocol):
	def connectionMade(self):
		global socat
		socat = self.transport
68
		logging.info('Started proxy')
69
70
71
72
73
74
75

	def processEnded(self, status):
		global socat
		if socat:
			socat = None
			try:
				reactor.stop()
76
77
				# Don't report lost proxy if we're already terminating
				logging.fatal('Lost proxy, terminating')
78
79
80
81
			except ReactorNotRunning:
				pass

	def errReceived(self, data):
82
		logging.warn('Proxy complained: %s', data)
83

Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
84
85
86
87
# Disable running the uncompressed soxy, it is no longer needed.
#args = ['./soxy/soxy', master_config.get('cert'), master_config.get('key'), str(master_config.getint('port')), os.getcwd() + '/collect-master.sock']
#logging.debug('Starting proxy with: %s', args)
#reactor.spawnProcess(Socat(), './soxy/soxy', args=args, env=os.environ)
88
args = ['./soxy/soxy', master_config.get('cert'), master_config.get('key'), master_config.get('ca'), str(master_config.getint('port_compression')), os.getcwd() + '/collect-master.sock', 'compress']
89
90
91
logging.debug('Starting proxy with: %s', args)
reactor.spawnProcess(Socat(), './soxy/soxy', args=args, env=os.environ)

Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
92
endpoint.listen(ClientFactory(plugins, frozenset(master_config.get('fastpings').split())))
Michal 'vorner' Vaner's avatar
Michal 'vorner' Vaner committed
93
logging.info('Init done')
94

95
reactor.run()
96
97

logging.info('Finishing up')
98
pool.stop()
99
100
101
102
if socat:
	soc = socat
	socat = None
	soc.signalProcess('TERM')
103
104
activity.shutdown()
logging.info('Shutdown done')