Commit 3e85d0a1 authored by Robin Obůrka's avatar Robin Obůrka Committed by Robin Obůrka
Browse files

network: Refactoring of SN and auxiliary classes

parent 752772b4
import zmq
import re
from collections import namedtuple
import zmq
from .argparser import get_arg_parser
from .exceptions import *
def resource_parser(config_list):
""" Gets a tuple of command line arguments - each for one socket connection
in the form {sockname,[conn/bind],SOCK_TYPE,IP,PORT}.
Returns a dictionary filled with zmq socket configs in the form
{name:[connection1, connection2,...]} as each ZMQ socket can handle
multiple connections. Each connection is a namedtuple.
"""
Connection = namedtuple(
'Connection',
['direction', 'sock_type', 'address', 'port']
)
resources = dict()
for config in config_list:
config = config[0]
splitted = config.split(",")
if len(splitted) == 5:
if not splitted[0] in resources:
resources[splitted[0]] = list()
resources[splitted[0]].append(Connection(*splitted[1:]))
else:
raise SockConfigError("Resource {} is invalid.".format(config))
return resources
class Resource:
NAME = re.compile("[a-z0-9_-]+")
ADDRESS = re.compile("[a-z0-9_-]+") #TODO Use something better
DIRECTIONS = [
"connect",
"bind"
]
SOCK_TYPES = [
"REQ",
"REP",
"DEALER",
"ROUTER",
"PUB",
"SUB",
"PUSH",
"PULL",
"PAIR",
]
class SN:
""" This class serves as a container for all resources. This class provides
an API-like interface for requesting ZMQ sockets based on available
resources.
"""
def __init__(self, ctx, argparser=None, **options):
""" Gets a list of command line arguments - each for one socket
connection and creates a dict of ZMQ socket configs.
"""
self.context = ctx
self.sock_configs = dict()
if argparser:
self.args = argparser.parse_args(options["args"] if "args" in options else None)
else:
self.args = get_arg_parser().parse_args(options["args"] if "args" in options else None)
def __init__(self, name, direction, sock_type, address, port):
if not Resource.NAME.match(name):
raise SockConfigError("Inadmissible characters in resource name")
res_avail = resource_parser(self.args.resource)
for res in res_avail:
sc = None
for connection in res_avail[res]:
if sc:
sc.add_connection(
connection.sock_type,
connection.direction,
connection.address,
connection.port
)
else:
sc = SockConfig(
self.context,
connection.sock_type,
connection.direction,
connection.address,
connection.port,
ipv6=not self.args.disable_ipv6
)
self.sock_configs[res] = sc
def get_socket(self, *sockets):
""" Gets multiple socket names in 'get_socket(name1, name2,...)'
or 'get_socket((name1, TYPE1), name2, (name3,TYPE3),...)' or any of
their combinations. Returns list of all available ZMQ sockets with the
required names. Exception is risen when there is no socket with the
desired name or when the socket is of another type.
"""
ret = list()
for socket in sockets:
if type(socket) == tuple:
sock_name = socket[0]
else:
sock_name = socket
if sock_name in self.sock_configs:
if (
type(socket) == tuple
and not self.sock_configs[sock_name].is_type(socket[1])
):
raise SockConfigError("Socket type does not match required value!")
if not self.sock_configs[sock_name].socket:
self.sock_configs[sock_name].connect()
ret.append(self.sock_configs[sock_name].socket)
else:
raise SockConfigError("Resource {} not provided.".format(sock_name))
if len(ret) == 1:
return ret[0]
else:
return ret
if direction not in Resource.DIRECTIONS:
raise SockConfigError("Inadmissible or empty value for direction (use connect or bind)")
if sock_type not in Resource.SOCK_TYPES:
raise SockConfigError("Inadmissible or empty socket type")
if not Resource.ADDRESS.match(address) and address != "*":
raise SockConfigError("Inadmissible characters in resource address")
try:
port_number = int(port)
except ValueError:
raise SockConfigError("Port must be a number")
if port_number < 1 or port_number > 65535:
raise SockConfigError("Port number is out of range (0-65535)")
# This is a little bit higher logic
if address == "*" and direction == "connect":
raise SockConfigError("On '*' is only bind operation permitted")
self.name = name
self.direction = direction
self.sock_type = sock_type
self.address = address
self.port = port_number
class SockConfig:
# a ZMQ feature: one socket can have a multiple connections
class ZMQConnection:
def __init__(self, addr, port):
self.addr = addr
self.port = port
self.connection = self.get_connection_string()
def get_connection_string(self):
return "tcp://{}:{}".format(self.address, self.port)
def get_connection_string(self):
return "tcp://{}:{}".format(self.addr, self.port)
@classmethod
def from_string(cls, arg):
splitted = arg.split(",")
if len(splitted) != 5:
raise SockConfigError("Bad count of resource string items")
return cls(*splitted)
def __eq__(self, other):
if self.name == other.name and \
self.direction == other.direction and \
self.sock_type == other.sock_type and \
self.address == other.address and \
self.port == other.port:
return True
return False
def __ne__(self, other):
return not self.__eq__(other)
class Socket:
SOCKET_TYPE_MAP = {
"REQ": zmq.REQ,
"REP": zmq.REP,
......@@ -121,90 +99,125 @@ class SockConfig:
"PAIR": zmq.PAIR,
}
DIRECTIONS = [
"connect",
"bind",
]
def __init__(self, name, **configuration):
self.name = name
self.resources = []
self.my_type = None
self.my_direction = None
self.configuration = configuration
self.setup_done = False
def __init__(self, context, socktype, direction, addr, port, ipv6):
""" Adds socket configuruation. List
of all connection is stored for further checking of duplicate
connections.
"""
self.check_params_validity(socktype, direction, addr, port)
self.socktype = SockConfig.SOCKET_TYPE_MAP[socktype]
self.direction = direction
def add_resource(self, resource):
if self.name != resource.name:
raise SockConfigError("Putting bad resource to socket")
zmq_connection = self.ZMQConnection(addr, port)
self.connections = list()
self.connections.append(zmq_connection)
self.context = context
self.socket = None
self.ipv6 = ipv6
if not self.my_type:
self.my_type = resource.sock_type
def add_connection(self, socktype, direction, addr, port):
""" Adds another ZMQ connection to an existing ZMQ socket.
"""
self.check_params_validity(socktype, direction, addr, port)
if self.my_type != resource.sock_type:
raise SockConfigError("New resource is different type than current Socket type")
if self.socktype != SockConfig.SOCKET_TYPE_MAP[socktype]:
raise SockConfigError("Socket type does not match")
if not self.my_direction:
self.my_direction = resource.direction
if self.direction == "bind" or direction == "bind":
raise SockConfigError("Socket direction mismatch")
if self.setup_done and self.my_direction == "bind":
raise SockConfigError("Socket can have only one bind operation")
for con in self.connections:
if con.addr == addr and con.port == port:
raise SockConfigError("Creating duplicate connection")
if resource in self.resources:
raise SockConfigError("Resource duplication")
zmq_connection = self.ZMQConnection(addr, port)
self.connections.append(zmq_connection)
self.resources.append(resource)
def check_params_validity(self, socktype, direction, addr, port):
""" Checks whether all the params are present and ZMQ-compliant
"""
if not socktype:
raise SockConfigError("Missing socket type")
if not direction:
raise SockConfigError("Missing socket direction")
if not addr:
raise SockConfigError("Missing address")
if not port:
raise SockConfigError("Missing port")
if socktype not in SockConfig.SOCKET_TYPE_MAP:
raise SockConfigError("Unknown socket option", socktype)
if direction not in SockConfig.DIRECTIONS:
raise SockConfigError("Unknown direction option", direction)
if int(port) < 1 or int(port) > 65535:
raise SockConfigError("Port number out of range", port)
def is_type(self, socktype):
""" Checks whether the socket type of this socket is equal to
'socktype' string argument.
"""
return (
socktype in SockConfig.SOCKET_TYPE_MAP
and self.socktype == SockConfig.SOCKET_TYPE_MAP[socktype]
)
def connect(self):
""" Connects or binds unconnected/unbound zmq socket. An exception
is risen when the socket is already connected.
self.setup_done = True
def build(self, ctx, name, sock_type=None):
if self.name != name:
raise SockConfigError("Name of requested resource is invalid")
if sock_type and self.my_type != sock_type:
raise SockConfigError("Unmatched socket type with requested one")
socket = ctx.socket(Socket.SOCKET_TYPE_MAP[self.my_type])
if self.my_direction == "bind":
socket.bind(self.resources[0].get_connection_string())
else:
for resource in self.resources:
socket.connect(resource.get_connection_string())
self.configure(socket)
return socket
def configure(self, socket):
if "ipv6" in self.configuration:
socket.ipv6 = self.configuration["ipv6"]
class SN:
""" This class serves as a container for all resources. This class provides
an API-like interface for requesting ZMQ sockets based on available
resources.
"""
def __init__(self, ctx, argparser=None, **options):
## Gather data
self.context = ctx
if argparser:
self.args = argparser.parse_args(options["args"] if "args" in options else None)
else:
self.args = get_arg_parser().parse_args(options["args"] if "args" in options else None)
## Build all necessary configuration
self.build_global_configuration()
self.parse_resources()
self.build_sockets()
def build_global_configuration(self):
self.global_configuration = {
"ipv6": not self.args.disable_ipv6,
}
def parse_resources(self):
## Currently I don't know why but resources is array of arrays
self.resources = [ Resource.from_string(res[0]) for res in self.args.resource ]
def build_sockets(self):
self.sockets = {}
for resource in self.resources:
if resource.name not in self.sockets:
self.sockets[resource.name] = Socket(resource.name, **self.global_configuration)
self.sockets[resource.name].add_resource(resource)
def get_socket(self, *requested_sockets):
""" Gets multiple socket names in 'get_socket(name1, name2,...)'
or 'get_socket((name1, TYPE1), name2, (name3,TYPE3),...)' or any of
their combinations. Returns list of all available ZMQ sockets with the
required names. Exception is risen when there is no socket with the
desired name or when the socket is of another type.
"""
if not self.socket:
self.socket = self.context.socket(self.socktype)
self.socket.ipv6 = self.ipv6
for zmq_connection in self.connections:
if self.direction == "bind":
self.socket.bind(zmq_connection.connection)
elif self.direction == "connect":
self.socket.connect(zmq_connection.connection)
else:
raise SockConfigError("Wrong socket direction")
ret = []
for request in requested_sockets:
if type(request) == tuple:
sock_name, sock_type = request
else:
sock_name, sock_type = request, None
socket = self.sockets[sock_name]
ret.append(socket.build(self.context, sock_name, sock_type))
if len(ret) == 1:
return ret[0]
else:
raise SockConfigError("Socket already connected")
return ret
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment