Verified Commit e519f728 authored by Martin Petráček's avatar Martin Petráček
Browse files

abandon multiprocessing as it caused problems wih twisted, use spawnProcess instead

parent 679364e7
......@@ -31,7 +31,6 @@ import time
import plugin_versions
import database
import timers
from multiprocessing import Process, Pipe, reduction
from coordinator import CoordinatorMasterFactory
logger = logging.getLogger(name='client')
......@@ -78,7 +77,8 @@ class ClientConn(twisted.protocols.basic.Int32StringReceiver):
self.__cid=cid
self.__connected = True
self.__authenticated = True
def __del__(self):
logger.debug("CLIENTCONN DELETED")
def has_plugin(self, plugin_name):
return plugin_name in self.__available_plugins
......@@ -176,16 +176,10 @@ class ClientConn(twisted.protocols.basic.Int32StringReceiver):
#select worker (based on CID hash)
worker=cid_hash % len(self.__workers)
logger.debug('MASTER Passing client %s (FD %s) to worker %s', self.__cid, self.transport.getHandle().fileno(), worker)
#send handle to worker
reduction.send_handle(self.__workers[worker][1][0], self.transport.getHandle().fileno(), self.__workers[worker][0].pid)
self.transport.stopReading()
self.transport.stopWriting()
logger.debug('MASTER Removing client %s', self.__cid)
# Replay the bufferend messages, pack them (to be sent to worker)
buffer=""
for message in self.__auth_buffer:
buffer += format_string(message)
worker_conn = self.__workers[worker][2].connect(CoordinatorMasterFactory("l"+format_string(self.__cid)+format_string(str(len(self.__auth_buffer)))+buffer))
self.__workers[worker].passClientHandle(self.__cid, self.__auth_buffer, self.transport.getHandle())
self.transport.abortConnection()
#self.transport.stopWriting()
else:
login_failure('Incorrect password')
self.__auth_buffer = None
......
......@@ -26,9 +26,8 @@ import log_extra
import logging
import logging.handlers
from client import ClientFactory
from coordinator import CoordinatorWorkerFactory
from coordinator import CoordinatorWorkerFactory, Worker
from plugin import Plugins, pool
from multiprocessing import Process, Pipe, reduction
import master_config
import socket
import activity
......@@ -41,6 +40,10 @@ import sys
# to keep up with pings.
reactor.suggestThreadPoolSize(3)
WorkerProcCnt = 2
if len(sys.argv) != 2:
raise Exception('There must be exactly 1 argument - config file name')
severity = master_config.get('log_severity')
if severity == 'TRACE':
severity = log_extra.TRACE_LEVEL
......@@ -86,43 +89,43 @@ class Socat(protocol.ProcessProtocol):
def errReceived(self, data):
logging.warn('Proxy complained: %s', data)
parent, child = Pipe()
def worker(ep,conn):
ep = UNIXServerEndpoint(reactor, ep)
ep.listen(CoordinatorWorkerFactory(conn, plugins, frozenset(master_config.get('fastpings'))))
logging.warn('child born')
reactor.run()
logging.debug('readers %s',reactor.getReaders())
sys.exit(0)
def main():
workers=[]
#endpoint = UNIXServerEndpoint(reactor, './collect-master.sock')
endpoint = TCP4ServerEndpoint(reactor, 12345)
for i in range(WorkerProcCnt):
ep = './collect-master-worker-'+str(i)+'.sock'
parent, child = Pipe()
ch = Process(target=worker, args=(ep,child,))
ep = UNIXClientEndpoint(reactor, ep)
ch.start()
while not ch.pid:
time.sleep(.25)
workers.append((ch, (parent, child), ep))
print workers
args = ['./soxy/soxy', master_config.get('cert'), master_config.get('key'), master_config.get('ca'), str(master_config.getint('port_compression')), '127.0.0.1:12345', 'compress']
logging.debug('Starting proxy with: %s', args)
reactor.spawnProcess(Socat(), './soxy/soxy', args=args, env=os.environ)
endpoint.listen(ClientFactory(plugins, frozenset(master_config.get('fastpings').split()), workers))
logging.info('Init done')
reactor.run()
main()
class WorkerProtocol(protocol.ProcessProtocol):
def childDataReceived(self, fd, str):
logging.info('worker: %s', str)
def processEnded(self, status):
logging.fatal('worker ended')
reactor.stop()
def errReceived(self, data):
logging.warn('worker complained: %s', data)
workers=[]
workers_prot= []
#endpoint = UNIXServerEndpoint(reactor, './collect-master.sock')
endpoint = TCP4ServerEndpoint(reactor, 12345)
for i in range(WorkerProcCnt):
worker_sock = './collect-master-worker-'+str(i)+'.sock'
parent_sock, child_sock = socket.socketpair()
args = ['./collect-worker.py', sys.argv[1], worker_sock]
worker_prot=reactor.spawnProcess(WorkerProtocol(), './collect-worker.py', args=args, env=os.environ, childFDs={0:0, 1:1, 2:2, 3: child_sock.fileno() })
while not worker_prot.pid:
sleep(0.25)
ep = UNIXServerEndpoint(reactor, worker_sock)
workers.append(Worker(parent_sock, ep))
print workers
args = ['./soxy/soxy', master_config.get('cert'), master_config.get('key'), master_config.get('ca'), str(master_config.getint('port_compression')), '127.0.0.1:12345', 'compress']
logging.debug('Starting proxy with: %s', args)
reactor.spawnProcess(Socat(), './soxy/soxy', args=args, env=os.environ)
endpoint.listen(ClientFactory(plugins, frozenset(master_config.get('fastpings').split()), workers))
logging.info('Init done')
reactor.run()
for w in workers_prot:
w.signalProcess('TERM')
logging.info('Finishing up')
pool.stop()
......
#!/usr/bin/python2
#
# Ucollect - small utility for real-time analysis of network data
# Copyright (C) 2013 CZ.NIC, z.s.p.o. (http://www.nic.cz/)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
from twisted.internet import reactor, protocol
from twisted.internet.endpoints import UNIXServerEndpoint, UNIXClientEndpoint, TCP4ServerEndpoint
from twisted.internet.error import ReactorNotRunning
from subprocess import Popen
import log_extra
import logging
import logging.handlers
from client import ClientFactory
from coordinator import CoordinatorWorkerFactory, Worker
from plugin import Plugins, pool
import master_config
import socket
import activity
import importlib
import os
import sys
# If we have too many background threads, the GIL slows down the
# main thread and cleants start dropping because we are not able
# to keep up with pings.
reactor.suggestThreadPoolSize(3)
if len(sys.argv) != 3:
raise Exception('There must be 2 arguments - config file name an path to socket (for communicating with master)')
severity = master_config.get('log_severity')
if severity == 'TRACE':
severity = log_extra.TRACE_LEVEL
else:
severity = getattr(logging, severity)
log_file = master_config.get('log_file')
logging.basicConfig(level=severity, format=master_config.get('log_format'))
if log_file != '-':
handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=int(master_config.get('log_file_size')), backupCount=int(master_config.get('log_file_count')))
handler.setFormatter(logging.Formatter(fmt=master_config.get('log_format')))
logging.getLogger().addHandler(handler)
loaded_plugins = {}
plugins = Plugins()
for (plugin, config) in master_config.plugins().items():
(modulename, classname) = plugin.rsplit('.', 1)
module = importlib.import_module(modulename)
constructor = getattr(module, classname)
loaded_plugins[plugin] = constructor(plugins, config)
logging.info('Loaded plugin %s from %s', loaded_plugins[plugin].name(), plugin)
ep = UNIXClientEndpoint(reactor, sys.argv[2])
logging.debug('readers %s',reactor.getReaders())
d=ep.connect(CoordinatorWorkerFactory(plugins, frozenset(master_config.get('fastpings'))))
def cant_connect(failure):
logging.fatal("Can't connect to master: %s", failure)
reactor.stop()
d.addErrback(cant_connect)
reactor.run()
logging.info('Finishing up')
pool.stop()
activity.shutdown()
logging.info('Shutdown done')
......@@ -18,7 +18,8 @@
#
from twisted.internet.task import LoopingCall
from twisted.python.sendmsg import getsockfam
from socket import SOL_SOCKET, socketpair
from twisted.python.sendmsg import SCM_RIGHTS, send1msg, recv1msg
from twisted.internet import reactor
import twisted.internet.protocol
import twisted.protocols.basic
......@@ -26,8 +27,9 @@ import logging
import traceback
import socket
from protocol import extract_string, format_string
from multiprocessing import reduction
import client
from struct import unpack, pack
logger = logging.getLogger(name='timers')
......@@ -43,62 +45,100 @@ def timer(callback, time, startnow=False):
class CoordinatorWorkerConn(twisted.protocols.basic.Int32StringReceiver):
MAX_LENGTH = 1024 ** 3 # A gigabyte should be enough
def __init__(self, parent_pipe, plugins, fastpings):
self.__parent_pipe = parent_pipe
def __init__(self, plugins, fastpings):
self.__plugins=plugins
self.__fastpings = fastpings
def connectionMade(self):
logger.debug("Connected to master")
return
def connectionLost(self, reason):
logger.fatal("Lost connection to master")
reactor.stop()
return
def stringReceived(self, string):
logger.trace("WORKER Received from MSTER: %s", repr(string))
logger.trace("WORKER Received from MASTER: %s", repr(string))
(msg, params) = (string[0], string[1:])
if msg == 'l':
# Passing client from master
s = socket.fromfd(reduction.recv_handle(self.__parent_pipe), socket.AF_UNIX, socket.SOCK_STREAM)
data, flags, ancillary = recv1msg(3, 1024)
s = unpack("i", ancillary[0][2])[0]
logging.debug('received socket: %s', s)
(cid, params) = extract_string(params)
(replay_msgs,params)=extract_string(params)
replay_msgs=int(replay_msgs)
clientObj=client.ClientFactory(self.__plugins, self.__fastpings, None, True, cid)
reactor.adoptStreamConnection(s.fileno(), socket.AF_INET, clientObj)
reactor.adoptStreamConnection(s, socket.AF_INET, clientObj)
for i in range(replay_msgs):
(msg,params)=extract_string(params)
clientObj.stringReceived(msg)
logger.debug(" WORKER Got client (fd %s) from master: CID %s msgs %s", s.fileno(), cid, replay_msgs)
logger.debug(" WORKER Got client (fd %s) from master: CID %s msgs %s", s, cid, replay_msgs)
return
else:
logger.warn("Unknown message from coordinator: %s", msg)
class CoordinatorWorkerFactory(twisted.internet.protocol.Factory):
def __init__(self, parent_pipe, plugins, fastpings):
self.__parent_pipe=parent_pipe
def __init__(self, plugins, fastpings):
self.__plugins=plugins
self.__fastpings = fastpings
def buildProtocol(self, addr):
return CoordinatorWorkerConn(self.__parent_pipe, self.__plugins, self.__fastpings)
return CoordinatorWorkerConn(self.__plugins, self.__fastpings)
class CoordinatorMasterConn(twisted.protocols.basic.Int32StringReceiver):
MAX_LENGTH = 1024 ** 3 # A gigabyte should be enough
def __init__(self, addr, string):
self.__string=string
def __init__(self, addr):
return
def connectionMade(self):
logger.trace("Connected to worker")
self.sendString(self.__string)
logger.debug("Connection to worker")
def submit(self, string):
self.sendString(string)
def connectionLost(self, reason):
logger.trace("Lost connection to worker")
logger.fatal("Lost connection to worker")
class CoordinatorMasterFactory(twisted.internet.protocol.Factory):
def __init__(self, string):
self.__string = string
def __init__(self):
self.conn = None
def buildProtocol(self, addr):
return CoordinatorMasterConn(addr, self.__string)
self.conn = CoordinatorMasterConn(addr)
return self.conn
class Worker():
def __init__(self, pipe, sock):
self.__pipe = pipe
self.__sock = sock
self.__queue = []
self.__conn = None
self.__listen_factory=CoordinatorMasterFactory()
self.__sock.listen(self.__listen_factory)
def submit(self, string):
if not self.__conn:
if self.__listen_factory.conn:
self.__conn = self.__listen_factory.conn
return self.__conn.submit(string)
self.__queue.append(string)
logger.warn("Tried writing to worker while it's not connected.")
else:
self.__conn.submit(string)
def passClientHandle(self, cid, messages, fd):
sent = send1msg(self.__pipe.fileno(), "\x00", 0, [(SOL_SOCKET, SCM_RIGHTS, pack("i", fd.fileno()))])
#reduction.send_handle(self.__pipes[0], fd.fileno(), self.__ch.pid)
#reactor.removeReader(fd)
#reactor.removeWriter(fd)
#self.transport.stopReading()
#self.transport.stopWriting()
# Replay the bufferend messages, pack them (to be sent to worker)
buffer=""
for message in messages:
buffer += format_string(message)
worker_conn = self.submit("l"+format_string(cid)+format_string(str(len(messages)))+buffer)
......@@ -20,8 +20,8 @@
import ConfigParser
import sys
if len(sys.argv) != 2:
raise Exception('There must be exactly 1 argument - config file name')
#if len(sys.argv) != 2:
#raise Exception('There must be exactly 1 argument - config file name')
config_data = ConfigParser.RawConfigParser()
with open(sys.argv[1]) as f:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment