worker.c 22.8 KB
Newer Older
Marek Vavruša's avatar
Marek Vavruša committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*  Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

17
#include <uv.h>
18
#include <lua.h>
19
#include <libknot/packet/pkt.h>
20
#include <libknot/descriptor.h>
21
22
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
23
#include <contrib/wire.h>
Marek Vavruša's avatar
Marek Vavruša committed
24
25
26
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
#include <malloc.h>
#endif
27
28
#include <assert.h>
#include "lib/utils.h"
29
#include "daemon/worker.h"
30
#include "daemon/engine.h"
31
#include "daemon/io.h"
32

33
34
35
/* @internal IO request entry. */
struct ioreq
{
36
37
38
39
40
41
42
	union {
		uv_udp_t      udp;
		uv_tcp_t      tcp;
		uv_udp_send_t send;
		uv_write_t    write;
		uv_connect_t  connect;
	} as;
43
44
};

45
46
47
/** @internal Number of request within timeout window. */
#define MAX_PENDING (KR_NSREP_MAXADDR + (KR_NSREP_MAXADDR / 2))

48
49
50
51
52
53
54
/** @internal Debugging facility. */
#ifdef DEBUG
#define DEBUG_MSG(fmt...) printf("[daem] " fmt)
#else
#define DEBUG_MSG(fmt...)
#endif

55
56
57
58
59
60
/** @internal Query resolution task. */
struct qr_task
{
	struct kr_request req;
	struct worker_ctx *worker;
	knot_pkt_t *pktbuf;
61
	array_t(struct qr_task *) waiting;
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
	uv_handle_t *pending[MAX_PENDING];
	uint16_t pending_count;
	uint16_t addrlist_count;
	uint16_t addrlist_turn;
	struct sockaddr *addrlist;
	uv_timer_t retry, timeout;
	worker_cb_t on_complete;
	void *baton;
	struct {
		union {
			struct sockaddr_in ip4;
			struct sockaddr_in6 ip6;
		} addr;
		uv_handle_t *handle;
	} source;
	uint16_t iter_count;
	uint16_t refs;
	uint16_t bytes_remaining;
80
81
	bool finished;
	bool leading;
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
};

/* Convenience macros */
#define qr_task_ref(task) \
	do { ++(task)->refs; } while(0)
#define qr_task_unref(task) \
	do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0)
#define qr_valid_handle(task, checked) \
	(!uv_is_closing((checked)) || (task)->source.handle == (checked))

/* Forward decls */
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet);

/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
{
	return uv_default_loop()->data;
}

101
102
static inline struct ioreq *ioreq_take(struct worker_ctx *worker)
{
103
	struct ioreq *req = NULL;
104
105
106
107
108
109
	if (worker->ioreqs.len > 0) {
		req = array_tail(worker->ioreqs);
		array_pop(worker->ioreqs);
	} else {
		req = malloc(sizeof(*req));
	}
110
	kr_asan_unpoison(req, sizeof(*req));
111
112
113
114
115
	return req;
}

static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req)
{
116
	kr_asan_poison(req, sizeof(*req));
117
	if (!req || worker->ioreqs.len < 4 * MP_FREELIST_SIZE) {
118
119
120
121
122
123
		array_push(worker->ioreqs, req);
	} else {
		free(req);
	}
}

124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype)
{
	if (task->pending_count >= MAX_PENDING) {
		return NULL;
	}
	/* Create connection for iterative query */
	uv_handle_t *req = (uv_handle_t *)ioreq_take(task->worker);
	if (!req) {
		return NULL;
	}
	io_create(task->worker->loop, req, socktype);
	req->data = task;
	/* Connect or issue query datagram */
	task->pending[task->pending_count] = req;
	task->pending_count += 1;
	return req;
}

static void ioreq_on_close(uv_handle_t *handle)
{
	struct worker_ctx *worker = get_worker();
	ioreq_release(worker, (struct ioreq *)handle);
}

static void ioreq_kill(uv_handle_t *req)
{
	assert(req);
	if (!uv_is_closing(req)) {
		io_stop_read(req);
		uv_close(req, ioreq_on_close);
	}
}

static void ioreq_killall(struct qr_task *task)
{
	for (size_t i = 0; i < task->pending_count; ++i) {
		ioreq_kill(task->pending[i]);
	}
	task->pending_count = 0;
}

165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/** @cond This memory layout is internal to mempool.c, use only for debugging. */
#if defined(__SANITIZE_ADDRESS__)
struct mempool_chunk {
  struct mempool_chunk *next;
  size_t size;
};
static void mp_poison(struct mempool *mp, bool poison)
{
	if (!poison) { /* @note mempool is part of the first chunk, unpoison it first */
		kr_asan_unpoison(mp, sizeof(*mp));
	}
	struct mempool_chunk *chunk = mp->state.last[0];
	void *chunk_off = (void *)chunk - chunk->size;
	if (poison) {
		kr_asan_poison(chunk_off, chunk->size);
	} else {
		kr_asan_unpoison(chunk_off, chunk->size);
	}
}
#else
#define mp_poison(mp, enable)
#endif
/** @endcond */

189
190
191
192
193
194
195
196
197
198
static inline struct mempool *pool_take(struct worker_ctx *worker)
{
	/* Recycle available mempool if possible */
	struct mempool *mp = NULL;
	if (worker->pools.len > 0) {
		mp = array_tail(worker->pools);
		array_pop(worker->pools);
	} else { /* No mempool on the freelist, create new one */
		mp = mp_new (4 * CPU_PAGE_SIZE);
	}
199
	mp_poison(mp, 0);
200
201
202
203
204
205
206
207
208
	return mp;
}

static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
{
	/* Return mempool to ring or free it if it's full */
	if (worker->pools.len < MP_FREELIST_SIZE) {
		mp_flush(mp);
		array_push(worker->pools, mp);
209
		mp_poison(mp, 1);
210
211
212
213
214
	} else {
		mp_delete(mp);
	}
}

215
static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr *addr)
216
{
217
	/* How much can client handle? */
218
	struct engine *engine = worker->engine;
219
	size_t answer_max = KNOT_WIRE_MIN_PKTSIZE;
220
	size_t pktbuf_max = KR_EDNS_PAYLOAD;
221
222
223
	if (engine->resolver.opt_rr) {
		pktbuf_max = MAX(knot_edns_get_payload(engine->resolver.opt_rr), pktbuf_max);
	}
224
	if (!addr && handle) { /* TCP */
225
		answer_max = KNOT_WIRE_MAX_PKTSIZE;
226
		pktbuf_max = KNOT_WIRE_MAX_PKTSIZE;
227
228
229
230
	} else if (knot_pkt_has_edns(query)) { /* EDNS */
		answer_max = MAX(knot_edns_get_payload(query->opt_rr), KNOT_WIRE_MIN_PKTSIZE);
	}

231
	/* Recycle available mempool if possible */
232
	knot_mm_t pool = {
233
		.ctx = pool_take(worker),
234
		.alloc = (knot_mm_alloc_t) mp_alloc
235
	};
236

237
	/* Create resolution task */
238
239
240
241
242
	struct qr_task *task = mm_alloc(&pool, sizeof(*task));
	if (!task) {
		mp_delete(pool.ctx);
		return NULL;
	}
243
	/* Create packet buffers for answer and subrequests */
244
	task->req.pool = pool;
245
246
247
	knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &task->req.pool);
	knot_pkt_t *answer = knot_pkt_new(NULL, answer_max, &task->req.pool);
	if (!pktbuf || !answer) {
248
249
250
251
		mp_delete(pool.ctx);
		return NULL;
	}
	task->req.answer = answer;
252
	task->pktbuf = pktbuf;
253
	array_init(task->waiting);
254
255
	task->addrlist = NULL;
	task->pending_count = 0;
256
	task->bytes_remaining = 0;
257
	task->iter_count = 0;
258
	task->refs = 1;
259
	task->finished = false;
260
	task->leading = false;
261
262
	task->worker = worker;
	task->source.handle = handle;
263
	uv_timer_init(worker->loop, &task->retry);
264
	uv_timer_init(worker->loop, &task->timeout);
265
	task->retry.data = task;
266
	task->timeout.data = task;
267
	task->on_complete = NULL;
268
269
	task->req.qsource.key = NULL;
	task->req.qsource.addr = NULL;
270
	/* Remember query source addr */
271
	if (addr) {
272
273
274
275
		size_t addr_len = sizeof(struct sockaddr_in);
		if (addr->sa_family == AF_INET6)
			addr_len = sizeof(struct sockaddr_in6);
		memcpy(&task->source.addr, addr, addr_len);
276
		task->req.qsource.addr = (const struct sockaddr *)&task->source.addr;
277
278
279
	} else {
		task->source.addr.ip4.sin_family = AF_UNSPEC;
	}
280
281
282
283
	/* Remember query source TSIG key */
	if (query->tsig_rr) {
		task->req.qsource.key = knot_rrset_copy(query->tsig_rr, &task->req.pool);
	}
284
285

	/* Start resolution */
286
	kr_resolve_begin(&task->req, &engine->resolver, answer);
287
	worker->stats.concurrent += 1;
288
	worker->stats.queries += 1;
289
290
291
	return task;
}

292
/* This is called when the task refcount is zero, free memory. */
293
static void qr_task_free(struct qr_task *task)
294
{
295
	/* Return mempool to ring or free it if it's full */
296
	struct worker_ctx *worker = task->worker;
297
298
	pool_release(worker, task->req.pool.ctx);
	/* @note The 'task' is invalidated from now on. */
Marek Vavruša's avatar
Marek Vavruša committed
299
	/* Decommit memory every once in a while */
300
	static int mp_delete_count = 0;
301
302
303
	if (++mp_delete_count == 100000) {
		lua_gc(worker->engine->L, LUA_GCCOLLECT, 0);
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
Marek Vavruša's avatar
Marek Vavruša committed
304
		malloc_trim(0);
305
#endif
Marek Vavruša's avatar
Marek Vavruša committed
306
		mp_delete_count = 0;
307
	}
308
}
309

310
311
312
313
314
315
316
317
/* This is called when retry timer closes */
static void retransmit_close(uv_handle_t *handle)
{
	struct qr_task *task = handle->data;
	qr_task_unref(task);
}

/* This is called when task completes and timeout timer is closed. */
318
319
320
321
static void qr_task_complete(uv_handle_t *handle)
{
	struct qr_task *task = handle->data;
	struct worker_ctx *worker = task->worker;
322
323
	/* Kill pending I/O requests */
	ioreq_killall(task);
324
325
	assert(task->waiting.len == 0);
	assert(task->leading == false);
326
327
328
329
330
331
332
333
334
335
336
337
	/* Run the completion callback. */
	if (task->on_complete) {
		task->on_complete(worker, &task->req, task->baton);
	}
	/* Return handle to the event loop in case
	 * it was exclusively taken by this task. */
	if (task->source.handle && !uv_has_ref(task->source.handle)) {
		uv_ref(task->source.handle);
		io_start_read(task->source.handle);
	}
	/* Release task */
	qr_task_unref(task);
338
339
	/* Update stats */
	worker->stats.concurrent -= 1;
340
341
}

342
343
/* This is called when I/O timeouts */
static void on_timeout(uv_timer_t *req)
344
345
{
	struct qr_task *task = req->data;
346
347
348
349
350
351
352
353
	uv_handle_t *handle = (uv_handle_t *)req;
#ifdef DEBUG
	char qname_str[KNOT_DNAME_MAXLEN] = {'\0'}, type_str[16] = {'\0'};
	knot_dname_to_str(qname_str, knot_pkt_qname(task->pktbuf), sizeof(qname_str));
	knot_rrtype_to_string(knot_pkt_qtype(task->pktbuf), type_str, sizeof(type_str));
	DEBUG_MSG("ioreq timeout %s %s %p\n", qname_str, type_str, req);
#endif
	if (!uv_is_closing(handle)) {
354
		struct worker_ctx *worker = task->worker;
355
		worker->stats.timeout += 1;
356
		qr_task_step(task, NULL, NULL);
357
358
359
	}
}

360
/* This is called when we send subrequest / answer */
361
static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
362
{
363
	if (!task->finished) {
364
365
		if (status == 0 && handle) {
			io_start_read(handle); /* Start reading answer */
366
		} else {
367
			DEBUG_MSG("ioreq send_done %p => %d, %s\n", handle, status, uv_strerror(status));
368
		}
369
370
371
372
373
	} else {
		/* Close retry timer (borrows task) */
		qr_task_ref(task);
		uv_close((uv_handle_t *)&task->retry, retransmit_close);
		/* Close timeout timer (finishes task) */
374
		uv_close((uv_handle_t *)&task->timeout, qr_task_complete);
375
	}
376
	return status;
377
378
}

379
380
static void on_send(uv_udp_send_t *req, int status)
{
381
	struct worker_ctx *worker = get_worker();
382
	struct qr_task *task = req->data;
383
	if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
384
385
		qr_task_on_send(task, (uv_handle_t *)req->handle, status);
	}
386
	qr_task_unref(task);
387
	ioreq_release(worker, (struct ioreq *)req);
388
389
390
391
}

static void on_write(uv_write_t *req, int status)
{
392
	struct worker_ctx *worker = get_worker();
393
	struct qr_task *task = req->data;
394
	if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
395
396
		qr_task_on_send(task, (uv_handle_t *)req->handle, status);
	}
397
	qr_task_unref(task);
398
	ioreq_release(worker, (struct ioreq *)req);
399
400
}

401
402
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
{
Marek Vavruša's avatar
Marek Vavruša committed
403
	if (!handle) {
404
		return qr_task_on_send(task, handle, kr_error(EIO));
405
	}
406
407
408
	struct ioreq *send_req = ioreq_take(task->worker);
	if (!send_req) {
		return qr_task_on_send(task, handle, kr_error(ENOMEM));
Marek Vavruša's avatar
Marek Vavruša committed
409
	}
410

411
	/* Send using given protocol */
412
	int ret = 0;
413
414
	if (handle->type == UV_UDP) {
		uv_buf_t buf = { (char *)pkt->wire, pkt->size };
415
416
		send_req->as.send.data = task;
		ret = uv_udp_send(&send_req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
417
418
419
420
421
422
	} else {
		uint16_t pkt_size = htons(pkt->size);
		uv_buf_t buf[2] = {
			{ (char *)&pkt_size, sizeof(pkt_size) },
			{ (char *)pkt->wire, pkt->size }
		};
423
424
425
426
		send_req->as.write.data = task;
		ret = uv_write(&send_req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
	}
	if (ret == 0) {
427
		qr_task_ref(task); /* Pending ioreq on current task */
428
	} else {
429
		DEBUG_MSG("ioreq send_start %p => %d, %s\n", send_req, ret, uv_strerror(ret));
430
		ioreq_release(task->worker, send_req);
431
	}
432

433
434
435
436
	/* Update statistics */
	if (handle != task->source.handle && addr) {
		if (handle->type == UV_UDP)
			task->worker->stats.udp += 1;
437
438
		else
			task->worker->stats.tcp += 1;
439
440
		if (addr->sa_family == AF_INET6)
			task->worker->stats.ipv6 += 1;
441
442
		else
			task->worker->stats.ipv4 += 1;
Marek Vavruša's avatar
Marek Vavruša committed
443
	}
444
	return ret;
445
446
}

447
static void on_connect(uv_connect_t *req, int status)
448
{
449
	struct worker_ctx *worker = get_worker();
450
	struct qr_task *task = req->data;
451
452
	uv_stream_t *handle = req->handle;
	if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
453
		if (status == 0) {
454
			qr_task_send(task, (uv_handle_t *)handle, NULL, task->pktbuf);
455
		} else {
456
			DEBUG_MSG("ioreq conn_done %p => %d, %s\n", req, status, uv_strerror(status));
457
			qr_task_step(task, task->addrlist, NULL);
458
		}
459
	}
460
	qr_task_unref(task);
461
	ioreq_release(worker, (struct ioreq *)req);
462
463
}

464
static bool retransmit(struct qr_task *task)
465
{
466
	if (task && task->addrlist) {
467
		uv_handle_t *subreq = ioreq_spawn(task, SOCK_DGRAM);
468
		if (subreq) { /* Create connection for iterative query */
469
470
471
			struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
			if (qr_task_send(task, subreq, (struct sockaddr *)choice, task->pktbuf) == 0) {
				task->addrlist_turn = (task->addrlist_turn + 1) % task->addrlist_count; /* Round robin */
472
				return true;
473
474
475
			}
		}
	}
476
477
478
479
480
481
482
	return false;
}

static void on_retransmit(uv_timer_t *req)
{
	if (uv_is_closing((uv_handle_t *)req))
		return;
483
484
	if (!retransmit(req->data)) {
		uv_timer_stop(req); /* Not possible to spawn request, stop trying */
485
	}
486
487
}

488
489
/** @internal Get key from current outstanding subrequest. */
static int subreq_key(char *dst, struct qr_task *task)
490
{
491
492
493
	assert(task);
	knot_pkt_t *pkt = task->pktbuf;
	assert(knot_wire_get_qr(pkt->wire) == false);
494
	return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
495
496
}

497
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
498
{
499
	/* Close pending I/O requests */
500
501
502
503
	if (uv_is_active((uv_handle_t *)&task->retry))
		uv_timer_stop(&task->retry);
	if (uv_is_active((uv_handle_t *)&task->timeout))
		uv_timer_stop(&task->timeout);
504
	ioreq_killall(task);
505
506
507
	/* Clear from outstanding table. */
	if (!task->leading)
		return;
508
	char key[KR_RRKEY_LEN];
509
510
511
512
513
514
	int ret = subreq_key(key, task);
	if (ret > 0) {
		assert(map_get(&task->worker->outstanding, key) == task);
		map_del(&task->worker->outstanding, key);
	}
	/* Notify waiting tasks. */
515
	struct kr_query *leader_qry = array_tail(task->req.rplan.pending);
516
517
	for (size_t i = task->waiting.len; i --> 0;) {
		struct qr_task *follower = task->waiting.at[i];
518
		struct kr_query *qry = array_tail(follower->req.rplan.pending);
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
		/* Reuse MSGID and 0x20 secret */
		if (qry) {
			qry->id = leader_qry->id;
			qry->secret = leader_qry->secret;
			leader_qry->secret = 0; /* Next will be already decoded */
		}
		qr_task_step(follower, packet_source, pkt);
		qr_task_unref(follower);
	}
	task->waiting.len = 0;
	task->leading = false;
}

static void subreq_lead(struct qr_task *task)
{
	assert(task);
535
	char key[KR_RRKEY_LEN];
536
537
538
539
540
541
542
543
544
545
	if (subreq_key(key, task) > 0) {
		assert(map_contains(&task->worker->outstanding, key) == false);
		map_set(&task->worker->outstanding, key, task);
		task->leading = true;
	}
}

static bool subreq_enqueue(struct qr_task *task)
{
	assert(task);
546
	char key[KR_RRKEY_LEN];
547
548
549
550
	if (subreq_key(key, task) > 0) {
		struct qr_task *leader = map_get(&task->worker->outstanding, key);
		if (leader) {
			/* Enqueue itself to leader for this subrequest. */
551
			int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, kr_memreserve, &leader->req.pool);
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
			if (ret == 0) {
				array_push(leader->waiting, task);
				qr_task_ref(task);
				return true;
			}
		}
	}
	return false;
}

static int qr_task_finalize(struct qr_task *task, int state)
{
	assert(task && task->leading == false);
	kr_resolve_finish(&task->req, state);
	task->finished = true;
	/* Send back answer */
	(void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
	return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
570
}
571

572
573
574
575
576
577
578
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
{
	/* No more steps after we're finished. */
	if (task->finished) {
		return kr_error(ESTALE);
	}
	/* Close pending I/O requests */
579
	subreq_finalize(task, packet_source, packet);
580
581
	/* Consume input and produce next query */
	int sock_type = -1;
582
583
584
	task->addrlist = NULL;
	task->addrlist_count = 0;
	task->addrlist_turn = 0;
585
	int state = kr_resolve_consume(&task->req, packet_source, packet);
586
	while (state == KNOT_STATE_PRODUCE) {
587
		state = kr_resolve_produce(&task->req, &task->addrlist, &sock_type, task->pktbuf);
588
		if (unlikely(++task->iter_count > KR_ITER_LIMIT)) {
589
			DEBUG_MSG("task iter_limit %p\n", task);
590
591
			return qr_task_finalize(task, KNOT_STATE_FAIL);
		}
592
593
594
595
596
	}

	/* We're done, no more iterations needed */
	if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
		return qr_task_finalize(task, state);
597
	} else if (!task->addrlist || sock_type < 0) {
598
		return qr_task_step(task, NULL, NULL);
599
600
	}

601
602
603
604
605
	/* Count available address choices */
	struct sockaddr_in6 *choice = (struct sockaddr_in6 *)task->addrlist;
	for (size_t i = 0; i < KR_NSREP_MAXADDR && choice->sin6_family != AF_UNSPEC; ++i) {
		task->addrlist_count += 1;
		choice += 1;
606
607
	}

608
	/* Start fast retransmit with UDP, otherwise connect. */
609
	if (sock_type == SOCK_DGRAM) {
610
611
612
613
614
		/* If such subrequest is outstanding, enqueue to it. */
		if (subreq_enqueue(task)) {
			return kr_ok(); /* Will be notified when outstanding subrequest finishes. */
		}
		/* Start transmitting */
615
		if (retransmit(task)) {
616
			uv_timer_start(&task->retry, on_retransmit, KR_CONN_RETRY, KR_CONN_RETRY);
617
618
		} else {
			return qr_task_step(task, NULL, NULL);
619
		}
620
621
622
623
		/* Announce and start subrequest.
		 * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
		 */
		subreq_lead(task);
624
625
626
	} else {
		struct ioreq *conn = ioreq_take(task->worker);
		if (!conn) {
627
			return qr_task_step(task, NULL, NULL);
628
		}
629
630
631
		uv_handle_t *client = ioreq_spawn(task, sock_type);
		if (!client) {
			ioreq_release(task->worker, conn);
632
			return qr_task_step(task, NULL, NULL);
633
		}
634
635
636
		conn->as.connect.data = task;
		if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
			ioreq_release(task->worker, conn);
637
			return qr_task_step(task, NULL, NULL);
638
		}
639
		/* Connect request borrows task */
640
		qr_task_ref(task);
641
642
	}

643
644
	/* Start next step with timeout, fatal if can't start a timer. */
	int ret = uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0);
645
646
	if (ret != 0) {
		subreq_finalize(task, packet_source, packet);
647
		return qr_task_finalize(task, KNOT_STATE_FAIL);
648
649
650
	}

	return ret;
651
652
}

653
static int parse_packet(knot_pkt_t *query)
654
{
655
656
657
	if (!query)
		return kr_error(EINVAL);

658
659
660
661
662
663
664
665
666
667
668
669
670
671
	/* Parse query packet. */
	int ret = knot_pkt_parse(query, 0);
	if (ret != KNOT_EOK) {
		return kr_error(EPROTO); /* Ignore malformed query. */
	}

	/* Check if at least header is parsed. */
	if (query->parsed < query->size) {
		return kr_error(EMSGSIZE);
	}

	return kr_ok();
}

672
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr)
673
{
674
	if (!worker || !handle) {
675
676
677
		return kr_error(EINVAL);
	}

678
679
	/* Parse packet */
	int ret = parse_packet(query);
680

681
	/* Start new task on master sockets, or resume existing */
682
	struct qr_task *task = handle->data;
683
684
	bool is_master_socket = (!task);
	if (is_master_socket) {
685
		/* Ignore badly formed queries or responses. */
686
		if (!query || ret != 0 || knot_wire_get_qr(query->wire)) {
687
			DEBUG_MSG("task bad_query %p => %d, %s\n", task, ret, kr_strerror(ret));
688
			worker->stats.dropped += 1;
689
690
			return kr_error(EINVAL); /* Ignore. */
		}
691
		task = qr_task_create(worker, handle, query, addr);
692
693
694
695
696
697
		if (!task) {
			return kr_error(ENOMEM);
		}
	}

	/* Consume input and produce next query */
698
	return qr_task_step(task, addr, query);
699
}
700

701
702
703
704
705
706
/* Return DNS/TCP message size. */
static int msg_size(const uint8_t *msg, size_t len)
{
		if (len < 2) {
			return kr_error(EMSGSIZE);
		}
707
		return wire_read_u16(msg);
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
}

int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len)
{
	if (!worker || !handle || !msg) {
		return kr_error(EINVAL);
	}

	int nbytes = msg_size(msg, len);
	struct qr_task *task = handle->data;
	const bool start_assembly = (task && task->bytes_remaining == 0);

	/* Message is a query (we have no context to buffer it) or complete. */
	if (!task || (start_assembly && nbytes == len - 2)) {
		if (nbytes <= 0) {
			return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);	
		}
		knot_pkt_t *pkt_nocopy = knot_pkt_new((void *)(msg + 2), nbytes, &worker->pkt_pool);
		return worker_exec(worker, handle, pkt_nocopy, NULL);
	}
	/* Starting a new message assembly */
	knot_pkt_t *pkt_buf = task->pktbuf;
	if (start_assembly) {
		if (nbytes <= 0) {
			return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);	
		}	
		knot_pkt_clear(pkt_buf);
		pkt_buf->size = 0;
		/* Cut off message length */
		task->bytes_remaining = nbytes;
		len -= 2;
		msg += 2;
	}
	/* Message is too long, can't process it. */
	if (len > pkt_buf->max_size - pkt_buf->size) {
		task->bytes_remaining = 0;
		return worker_exec(worker, handle, NULL, NULL);
	}
	/* Buffer message and check if it's complete */
	memcpy(pkt_buf->wire + pkt_buf->size, msg, len);
	pkt_buf->size += len;
	if (len >= task->bytes_remaining) {
		task->bytes_remaining = 0;
		return worker_exec(worker, handle, pkt_buf, NULL);
	}
	/* Return number of bytes remaining to receive. */
	task->bytes_remaining -= len;
	return task->bytes_remaining;
}

758
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton)
759
{
760
	if (!worker || !query) {
761
762
763
764
765
766
767
768
		return kr_error(EINVAL);
	}

	/* Create task */
	struct qr_task *task = qr_task_create(worker, NULL, query, NULL);
	if (!task) {
		return kr_error(ENOMEM);
	}
769
770
	task->baton = baton;
	task->on_complete = on_complete;
771
	task->req.options |= options;
772
	return qr_task_step(task, NULL, query);
773
774
}

775
776
777
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
	array_init(worker->pools);
Marek Vavruša's avatar
Marek Vavruša committed
778
	array_init(worker->ioreqs);
779
780
	if (array_reserve(worker->pools, ring_maxlen) || array_reserve(worker->ioreqs, ring_maxlen))
		return kr_error(ENOMEM);
Marek Vavruša's avatar
Marek Vavruša committed
781
782
	memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
	worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
783
	worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc;
784
	worker->outstanding = map_make();
Marek Vavruša's avatar
Marek Vavruša committed
785
	return kr_ok();
786
787
}

788
#define reclaim_freelist(list, type, cb) \
789
	for (unsigned i = 0; i < list.len; ++i) { \
790
791
792
		type *elm = list.at[i]; \
		kr_asan_unpoison(elm, sizeof(type)); \
		cb(elm); \
793
794
795
	} \
	array_clear(list)

796
797
void worker_reclaim(struct worker_ctx *worker)
{
798
799
	reclaim_freelist(worker->pools, struct mempool, mp_delete);
	reclaim_freelist(worker->ioreqs, struct ioreq, free);
Marek Vavruša's avatar
Marek Vavruša committed
800
801
	mp_delete(worker->pkt_pool.ctx);
	worker->pkt_pool.ctx = NULL;
802
	map_clear(&worker->outstanding);
803
}
804
805

#undef DEBUG_MSG