network.py 6.56 KB
Newer Older
1
import re
Martin Prudek's avatar
Martin Prudek committed
2

3
import zmq
4

Robin Obůrka's avatar
Robin Obůrka committed
5
from .argparser import get_arg_parser
6
from .exceptions import *
Martin Prudek's avatar
Martin Prudek committed
7
8


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Resource:
    NAME = re.compile("[a-z0-9_-]+")
    ADDRESS = re.compile("[a-z0-9_-]+") #TODO Use something better
    DIRECTIONS = [
        "connect",
        "bind"
    ]
    SOCK_TYPES = [
        "REQ",
        "REP",
        "DEALER",
        "ROUTER",
        "PUB",
        "SUB",
        "PUSH",
        "PULL",
        "PAIR",
    ]
Martin Prudek's avatar
Martin Prudek committed
27

28
29
30
    def __init__(self, name, direction, sock_type, address, port):
        if not Resource.NAME.match(name):
            raise SockConfigError("Inadmissible characters in resource name")
31

32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
        if direction not in Resource.DIRECTIONS:
            raise SockConfigError("Inadmissible or empty value for direction (use connect or bind)")

        if sock_type not in Resource.SOCK_TYPES:
            raise SockConfigError("Inadmissible or empty socket type")

        if not Resource.ADDRESS.match(address) and address != "*":
            raise SockConfigError("Inadmissible characters in resource address")

        try:
            port_number = int(port)
        except ValueError:
            raise SockConfigError("Port must be a number")

        if port_number < 1 or port_number > 65535:
            raise SockConfigError("Port number is out of range (0-65535)")

        # This is a little bit higher logic
        if address == "*" and direction == "connect":
            raise SockConfigError("On '*' is only bind operation permitted")

        self.name = name
        self.direction = direction
        self.sock_type = sock_type
        self.address = address
        self.port = port_number
58
59


60
61
    def get_connection_string(self):
        return "tcp://{}:{}".format(self.address, self.port)
Martin Prudek's avatar
Martin Prudek committed
62
63


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
    @classmethod
    def from_string(cls, arg):
        splitted = arg.split(",")

        if len(splitted) != 5:
            raise SockConfigError("Bad count of resource string items")

        return cls(*splitted)


    def __eq__(self, other):
        if self.name == other.name and \
               self.direction == other.direction and \
               self.sock_type == other.sock_type and \
               self.address == other.address and \
               self.port == other.port:
            return True

        return False


    def __ne__(self, other):
        return not self.__eq__(other)


class Socket:
Martin Prudek's avatar
Martin Prudek committed
90
91
92
93
94
95
96
97
98
99
100
101
    SOCKET_TYPE_MAP = {
        "REQ": zmq.REQ,
        "REP": zmq.REP,
        "DEALER": zmq.DEALER,
        "ROUTER": zmq.ROUTER,
        "PUB": zmq.PUB,
        "SUB": zmq.SUB,
        "PUSH": zmq.PUSH,
        "PULL": zmq.PULL,
        "PAIR": zmq.PAIR,
    }

102
103
104
105
106
107
108
    def __init__(self, name, **configuration):
        self.name = name
        self.resources = []
        self.my_type = None
        self.my_direction = None
        self.configuration = configuration
        self.setup_done = False
Martin Prudek's avatar
Martin Prudek committed
109
110


111
    def check_resource(self, resource):
112
113
        if self.name != resource.name:
            raise SockConfigError("Putting bad resource to socket")
Martin Prudek's avatar
Martin Prudek committed
114

115
116
        if not self.my_type:
            self.my_type = resource.sock_type
Martin Prudek's avatar
Martin Prudek committed
117

118
119
        if self.my_type != resource.sock_type:
            raise SockConfigError("New resource is different type than current Socket type")
Martin Prudek's avatar
Martin Prudek committed
120

121
122
        if not self.my_direction:
            self.my_direction = resource.direction
Martin Prudek's avatar
Martin Prudek committed
123

124
125
        if self.setup_done and self.my_direction == "bind":
            raise SockConfigError("Socket can have only one bind operation")
Martin Prudek's avatar
Martin Prudek committed
126

127
128
        if resource in self.resources:
            raise SockConfigError("Resource duplication")
Martin Prudek's avatar
Martin Prudek committed
129

130
131
132
133

    def add_resource(self, resource):
        self.check_resource(resource)

134
        self.resources.append(resource)
Martin Prudek's avatar
Martin Prudek committed
135

136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
        self.setup_done = True


    def build(self, ctx, name, sock_type=None):
        if self.name != name:
            raise SockConfigError("Name of requested resource is invalid")

        if sock_type and self.my_type != sock_type:
            raise SockConfigError("Unmatched socket type with requested one")

        socket = ctx.socket(Socket.SOCKET_TYPE_MAP[self.my_type])

        if self.my_direction == "bind":
            socket.bind(self.resources[0].get_connection_string())

        else:
            for resource in self.resources:
                socket.connect(resource.get_connection_string())

        self.configure(socket)

        return socket


    def configure(self, socket):
        if "ipv6" in self.configuration:
            socket.ipv6 = self.configuration["ipv6"]
163
        socket.setsockopt(zmq.LINGER, 1*1000)  # In msec
164
165
166
167
168
169
170


class SN:
    """ This class serves as a container for all resources. This class provides
    an API-like interface for requesting ZMQ sockets based on available
    resources.
    """
171
    def __init__(self, ctx, argparser=None):
172
173
174
175
        ## Gather data
        self.context = ctx

        if argparser:
176
            self.args = argparser.parse_args()
177
        else:
178
            self.args = get_arg_parser().parse_args()
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212

        ## Build all necessary configuration
        self.build_global_configuration()
        self.parse_resources()
        self.build_sockets()


    def build_global_configuration(self):
        self.global_configuration = {
            "ipv6": not self.args.disable_ipv6,
        }


    def parse_resources(self):
        ## Currently I don't know why but resources is array of arrays
        self.resources = [ Resource.from_string(res[0]) for res in self.args.resource ]


    def build_sockets(self):
        self.sockets = {}

        for resource in self.resources:
            if resource.name not in self.sockets:
                self.sockets[resource.name] = Socket(resource.name, **self.global_configuration)

            self.sockets[resource.name].add_resource(resource)


    def get_socket(self, *requested_sockets):
        """ Gets multiple socket names in 'get_socket(name1, name2,...)'
        or 'get_socket((name1, TYPE1), name2, (name3,TYPE3),...)' or any of
        their combinations. Returns list of all available ZMQ sockets with the
        required names. Exception is risen when there is no socket with the
        desired name or when the socket is of another type.
Martin Prudek's avatar
Martin Prudek committed
213
        """
214
215
216
217
218
219
220
221
        ret = []

        for request in requested_sockets:
            if type(request) == tuple:
                sock_name, sock_type = request
            else:
                sock_name, sock_type = request, None

222
            if sock_name not in self.sockets:
223
                raise UndefinedSocketError("Requesting undefined socket")
224

225
226
227
228
229
            socket = self.sockets[sock_name]
            ret.append(socket.build(self.context, sock_name, sock_type))

        if len(ret) == 1:
            return ret[0]
230
        else:
231
            return ret