Verified Commit d89b6d00 authored by Martin Petráček's avatar Martin Petráček
Browse files

globally synchronized timers

parent 1085c2c6
......@@ -185,7 +185,7 @@ class BandwidthPlugin(plugin.Plugin):
plugin.Plugin.__init__(self, plugins)
self.__interval = int(config['interval'])
self.__aggregate_delay = int(config['aggregate_delay'])
self.__downloader = timers.timer(self.__init_download, self.__interval, False)
self.__downloader = timers.global_timer("bandwidth/1", self.__init_download, self.__interval, False)
self.__data = {}
self.__last = self.__current = int(time.time())
......
......@@ -32,7 +32,8 @@ fastpings:
; The plugins to load follow. Each name is the class to load and instantiate.
[spoof_plugin.SpoofPlugin]
; Spoof plugin is not supported now (it tries to bind to the same port in each worker, it obviously fails).
; [spoof_plugin.SpoofPlugin]
[count_plugin.CountPlugin]
; The plugin that counts some stuff (packets of various properties, amount of data, ...)
......
......@@ -73,7 +73,7 @@ class CountPlugin(plugin.Plugin):
plugin.Plugin.__init__(self, plugins)
self.__interval = int(config['interval'])
self.__aggregate_delay = int(config['aggregate_delay'])
self.__downloader = timers.timer(self.__init_download, self.__interval, False)
self.__downloader = timers.global_timer("count/1", self.__init_download, self.__interval, False)
self.__data = {}
self.__stats = {}
self.__last = int(time.time())
......
......@@ -28,10 +28,12 @@ import traceback
import socket
import sys
from protocol import extract_string, format_string
from struct import unpack, pack
import struct
logger = logging.getLogger(name='workerConn')
global_timers={}
class Gatekeeper2WorkerConn(twisted.protocols.basic.Int32StringReceiver):
"""
Connection from gatekeeper to worker.
......@@ -51,6 +53,31 @@ class Gatekeeper2WorkerConn(twisted.protocols.basic.Int32StringReceiver):
def stringReceived(self, string):
logger.trace("Gatekeeper received from worker: %s", repr(string))
(msg, params) = (string[0], string[1:])
if msg == 'T':
# Request to set globally synchronized timer.
# Only the first one with unique id will actually set the timer (start LoopingCall), following ones are just registered.
# When LoopingCall calls the callback, notification will be send to all workers that requested this timer.
def timer_tick(id):
global global_timers
try:
for w in global_timers[id]:
w.sendString("t" + format_string(id))
except Exception as e:
logger.warn("Exception while handling global timer: %s\n", e)
(time, ) = struct.unpack('!L', params[:4])
(id, params) = extract_string(params[4:])
global global_timers
if id in global_timers:
global_timers[id].append(self)
else:
logger.info("Registered new global timer: %s, interval %s", id, time)
global_timers[id] = [self]
result = LoopingCall(timer_tick, id)
result.start(int(time), now=False)
else:
logger.warn("Unknown message from worker: %s", msg)
return
class Gatekeeper2WorkerConnFactory(twisted.internet.protocol.Factory):
def __init__(self, worker):
......@@ -97,7 +124,7 @@ class Worker():
"""
Pass (already established) connection with client to worker. Also client CID and buffered messages are send.
"""
sent = send1msg(self.__pipe.fileno(), "\x00", 0, [(SOL_SOCKET, SCM_RIGHTS, pack("i", fd.fileno()))])
sent = send1msg(self.__pipe.fileno(), "\x00", 0, [(SOL_SOCKET, SCM_RIGHTS, struct.pack("i", fd.fileno()))])
# Replay the bufferend messages, pack them (to be sent to worker)
buffer = ""
for message in messages:
......
......@@ -20,9 +20,15 @@
from twisted.internet.task import LoopingCall
import logging
import traceback
import inspect
import hashlib
from protocol import extract_string, format_string
import struct
import worker2gatekeeper
logger = logging.getLogger(name='timers')
def timer(callback, time, startnow=False):
def protected():
try:
......@@ -32,3 +38,29 @@ def timer(callback, time, startnow=False):
result = LoopingCall(protected)
result.start(time, startnow)
return result
global_timer_map = {}
def global_timer(name, callback, time, startnow=False):
"""
Sets timer (that will be synchronized between all workers).
It just saves it's identifier and callback internally and requests setting timer on master.
"""
global global_timer_map
global_timer_map[name] = callback
worker2gatekeeper.send_to_master("T" + struct.pack('!L', time) + format_string(name))
if startnow:
callback()
def global_timer_cb(id):
"""
Callback for globally synchronized timer.
It's called upon receiving notification about global timer from master.
"""
global global_timer_map
try:
global_timer_map[id]()
except Exception as e:
logger.error("Error calling: ", e)
......@@ -32,6 +32,7 @@ from protocol import extract_string, format_string
import client_worker
from struct import unpack, pack
import sys
import timers
#worker's file descriptor no on which worker'll get client's handle (for recvn1msg)
#this is ugly, but spawnProcess wants directly FD numbers...
......@@ -53,6 +54,10 @@ class Worker2GatekeeperConn(twisted.protocols.basic.Int32StringReceiver):
self.__fastpings = fastpings
def connectionMade(self):
global master_conn, send_queue
master_conn = self
for m in send_queue:
self.sendString(m)
logger.debug("Connected to gatekeeper")
return
......@@ -86,6 +91,11 @@ class Worker2GatekeeperConn(twisted.protocols.basic.Int32StringReceiver):
reactor.adoptStreamConnection(s, socket.AF_INET, client_worker.ClientWorkerFactory(self.__plugins, self.__fastpings, cid, replay))
logger.debug("Got client (fd %s) from master: CID %s msgs %s", s, cid, replay_msgs)
return
if msg == 't':
# Timer tick notification from master. Just extract id and call requested callback.
(id, params) = extract_string(params)
logger.debug("Global timer %s tick", id)
timers.global_timer_cb(id)
else:
logger.warn("Unknown message from gatekeeper: %s", msg)
......@@ -96,3 +106,13 @@ class Worker2GatekeeperConnFactory(twisted.internet.protocol.Factory):
def buildProtocol(self, addr):
return Worker2GatekeeperConn(self.__plugins, self.__fastpings)
master_conn = None
send_queue=[]
def send_to_master(string):
global master_conn
if not master_conn:
send_queue.append(string)
else:
master_conn.sendString(string)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment