Line data Source code
1 : #include <minipot_pipe_handler.h>
2 : #include <minipot_utils.h>
3 : #include <event2/event.h>
4 : #include <signal.h>
5 : #include <errno.h>
6 : #include <string.h>
7 : #include <unistd.h>
8 : #include "log.h"
9 :
10 : #define LEN_LEN sizeof(size_t)
11 :
12 : enum minipot_pipe_state {
13 : PS_BUFF_LEN,
14 : PS_BUFF_DATA,
15 : };
16 :
17 : struct minipot_pipe_handler {
18 : struct event_base *ev_base;
19 : struct event *read_ev;
20 : struct event *sigint_ev;
21 : struct event *sigterm_ev;
22 : enum minipot_pipe_state state;
23 : // this holds length of the data
24 : // it needs to be buffered first to determine the data length
25 : uint8_t *recv_buff;
26 : union {
27 : size_t full;
28 : uint8_t bytes[LEN_LEN];
29 : } data_len;
30 : uint8_t *data_len_write_ptr;
31 : // this holds the data itself
32 : uint8_t *data;
33 : uint8_t *data_write_ptr;
34 : size_t data_mem_len;
35 : minipot_pipe_callback callback;
36 : void *user_data;
37 : };
38 :
39 0 : static void reset_pipe_buff(minipot_pipe_handler_t self) {
40 0 : self->state = PS_BUFF_LEN;
41 0 : self->data_len_write_ptr = self->data_len.bytes;
42 0 : self->data_write_ptr = self->data;
43 0 : }
44 :
45 : // Copies MIN(
46 : // size of data remaining in a *buff with size *buff_len
47 : // size of available memory in a buffer starting at adress dest_start with size dest_len
48 : // ) number of bytes from *buff to *dest_wrt.
49 : // dest_wrt must be in range <dest_start, dest_start + dest_len>.
50 : // dest_wrt and *buff are incremented by number of copied bytes.
51 0 : static void buffer(const uint8_t **buff, size_t *buff_len,
52 : const uint8_t const *dest_start, uint8_t **dest_wrt,
53 : const size_t dest_len) {
54 0 : size_t to_copy = MINIPOT_MIN(*buff_len, dest_start + dest_len - *dest_wrt);
55 0 : memcpy(*dest_wrt, *buff, to_copy);
56 : // shift buffer
57 0 : *buff_len -= to_copy;
58 0 : *buff += to_copy;
59 : // shift write ptr
60 0 : *dest_wrt += to_copy;
61 0 : }
62 :
63 0 : static void proc_data(minipot_pipe_handler_t self, const uint8_t **buff,
64 : size_t *buff_len) {
65 0 : buffer(buff, buff_len, self->data, &self->data_write_ptr, self->data_len.full);
66 0 : if ((self->data_write_ptr - self->data) == self->data_len.full) {
67 0 : self->callback(self->data, self->data_len.full, self->user_data);
68 0 : reset_pipe_buff(self);
69 : }
70 0 : }
71 :
72 0 : static void proc_len(minipot_pipe_handler_t self, const uint8_t **buff,
73 : size_t *buff_len) {
74 0 : buffer(buff, buff_len, self->data_len.bytes, &self->data_len_write_ptr,
75 : LEN_LEN);
76 0 : if ((self->data_len.bytes + LEN_LEN) == self->data_len_write_ptr) {
77 0 : while (self->data_len.full > self->data_mem_len) {
78 0 : self->data_mem_len *= 2;
79 0 : self->data = realloc(self->data, self->data_mem_len);
80 0 : self->data_write_ptr = self->data;
81 : }
82 0 : self->state = PS_BUFF_DATA;
83 : }
84 0 : }
85 :
86 0 : static void proc(minipot_pipe_handler_t self, size_t buff_len) {
87 0 : const uint8_t *buff = self->recv_buff;
88 0 : while (buff_len > 0) {
89 0 : switch (self->state) {
90 0 : case PS_BUFF_LEN:
91 0 : proc_len(self, &buff, &buff_len);
92 0 : break;
93 0 : case PS_BUFF_DATA:
94 0 : proc_data(self, &buff, &buff_len);
95 0 : break;
96 0 : default:
97 0 : error("Pipe data processing has unknown state!");
98 0 : assert(minipot_pipe_handler_stop(self) == 0);
99 : break;
100 : }
101 : }
102 0 : }
103 :
104 0 : static void read_cb(evutil_socket_t fd, short events, void *arg) {
105 0 : minipot_pipe_handler_t self = arg;
106 0 : ssize_t read_cnt = read(fd, self->recv_buff, BUFSIZ);
107 0 : switch (read_cnt) {
108 0 : case -1:
109 0 : if (errno == EAGAIN)
110 : return;
111 0 : error("Reading from pipe failed!");
112 0 : assert(minipot_pipe_handler_stop(self) == 0);
113 : return;
114 0 : case 0:
115 0 : info("Pipe closed from child!");
116 0 : assert(minipot_pipe_handler_stop(self) == 0);
117 : return;
118 : }
119 0 : proc(self, read_cnt);
120 : }
121 :
122 0 : static void terminate_cb(evutil_socket_t fd, short events, void *arg) {
123 0 : assert(minipot_pipe_handler_stop((minipot_pipe_handler_t)arg) == 0);
124 0 : }
125 :
126 : minipot_pipe_handler_t minipot_pipe_handler_new(int read_end_fd,
127 : minipot_pipe_callback cb, void *data) {
128 0 : minipot_pipe_handler_t mph = malloc(sizeof(*mph));
129 :
130 0 : mph->ev_base = event_base_new();
131 0 : if (!mph->ev_base)
132 0 : goto err01;
133 0 : mph->sigint_ev = event_new(mph->ev_base, SIGINT, EV_SIGNAL, terminate_cb, mph);
134 0 : if (!mph->sigint_ev)
135 0 : goto err02;
136 0 : mph->sigterm_ev = event_new(mph->ev_base, SIGTERM, EV_SIGNAL, terminate_cb, mph);
137 0 : if (!mph->sigterm_ev)
138 0 : goto err03;
139 0 : mph->read_ev = event_new(mph->ev_base, read_end_fd, EV_READ | EV_PERSIST,
140 : read_cb, mph);
141 0 : if (!mph->read_ev)
142 0 : goto err04;
143 :
144 0 : if (event_add(mph->sigint_ev, NULL))
145 0 : goto err05;
146 0 : if (event_add(mph->sigterm_ev, NULL))
147 0 : goto err05;
148 0 : if (event_add(mph->read_ev, NULL))
149 0 : goto err05;
150 :
151 0 : mph->recv_buff = malloc(sizeof(*mph->recv_buff) * BUFSIZ);
152 :
153 0 : mph->data_mem_len = BUFSIZ;
154 0 : mph->data = malloc(mph->data_mem_len * sizeof(*mph->data));
155 :
156 0 : mph->callback = cb;
157 0 : mph->user_data = data;
158 0 : return mph;
159 :
160 0 : err05:
161 0 : event_free(mph->read_ev);
162 0 : err04:
163 0 : event_free(mph->sigterm_ev);
164 0 : err03:
165 0 : event_free(mph->sigint_ev);
166 0 : err02:
167 0 : event_base_free(mph->ev_base);
168 0 : err01:
169 0 : free(mph);
170 0 : return NULL;
171 : }
172 :
173 : void minipot_pipe_handler_free(minipot_pipe_handler_t self) {
174 0 : event_free(self->sigint_ev);
175 0 : event_free(self->sigterm_ev);
176 0 : event_free(self->read_ev);
177 0 : event_base_free(self->ev_base);
178 0 : free(self->recv_buff);
179 0 : free(self->data);
180 0 : free(self);
181 0 : }
182 :
183 : int minipot_pipe_handler_run(minipot_pipe_handler_t self) {
184 0 : reset_pipe_buff(self);
185 0 : return event_base_dispatch(self->ev_base);
186 : }
187 :
188 : int minipot_pipe_handler_stop(minipot_pipe_handler_t self) {
189 0 : return event_base_loopbreak(self->ev_base);
190 : }
|