Line data Source code
1 : #include <minipot_zmq_sender.h> 2 : #include <czmq.h> 3 : #include "log.h" 4 : 5 : struct minipot_zmq_sender { 6 : char *topic; 7 : zsock_t *sock; 8 : }; 9 : 10 : minipot_zmq_sender_t minipot_zmq_sender_new(const char *endpoint, 11 : const char *topic) { 12 0 : minipot_zmq_sender_t sender = malloc(sizeof(*sender)); 13 0 : sender->sock = zsock_new_push(endpoint); 14 0 : if (!sender->sock) 15 0 : goto err01; 16 : // The linger period determines how long pending messages which have yet to 17 : // be sent to a peer shall linger in memory after a socket is disconnected. 18 : // 0 -> Pending messages shall be discarded immediately. 19 : // >> Without this the socket close and application termination would block 20 : // until all pending messages are sent. 21 0 : zsock_set_linger(sender->sock, 0); 22 : // Sets the timeout for send operation on the socket. 23 : // 0 -> Return immediately with an error if the message cannot be sent. 24 : // >> non-blocking send 25 0 : zsock_set_sndtimeo(sender->sock, 0); 26 : // Hard limit on the maximum number of outstanding messages ØMQ shall queue 27 : // in memory for any single peer that the specified socket is communicating 28 : // with. If the limit has been reached the socket shall enter an exceptional 29 : // state and depending on the socket type, ØMQ shall take appropriate 30 : // action such as blocking or dropping sent messages. 31 : // >> After the previous setup when HWM is reached sends starts to fail. 32 : // >> 1000 is default value of ØMQ library 33 0 : zsock_set_sndhwm(sender->sock, 1000); 34 : // Send welcome message to identify ourself to Sentinel Proxy 35 0 : zmsg_t *msg = zmsg_new(); 36 0 : if (!msg) 37 0 : goto err02; 38 0 : if (zmsg_addstr(msg, topic) || zmsg_send(&msg, sender->sock)) 39 0 : goto err03; 40 0 : sender->topic = strdup(topic); 41 0 : return sender; 42 : 43 0 : err03: 44 0 : zmsg_destroy(&msg); 45 0 : err02: 46 0 : zsock_destroy(&sender->sock); 47 0 : error("Send of welcome message failed!"); 48 0 : err01: 49 0 : free(sender); 50 0 : return NULL; 51 : } 52 : 53 : void minipot_zmq_sender_free(minipot_zmq_sender_t self) { 54 0 : zsock_destroy(&self->sock); 55 0 : free(self->topic); 56 0 : free(self); 57 0 : } 58 : 59 : int minipot_zmq_sender_send(minipot_zmq_sender_t self, const void *data, 60 : size_t data_len) { 61 0 : zmsg_t *msg = zmsg_new(); 62 0 : if (!msg) 63 : return -1; 64 0 : if (zmsg_addstr(msg, self->topic) 65 0 : || zmsg_addmem(msg, data, data_len) 66 0 : || zmsg_send(&msg, self->sock)) { 67 0 : zmsg_destroy(&msg); 68 0 : return -1; 69 : } 70 : return 0; 71 : }