worker.c 23.6 KB
Newer Older
Marek Vavruša's avatar
Marek Vavruša committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*  Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

17
#include <uv.h>
18
#include <lua.h>
19
#include <libknot/packet/pkt.h>
20
#include <libknot/descriptor.h>
21
22
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
23
#include <contrib/wire.h>
Marek Vavruša's avatar
Marek Vavruša committed
24
25
26
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
#include <malloc.h>
#endif
27
28
#include <assert.h>
#include "lib/utils.h"
29
#include "lib/layer.h"
30
#include "daemon/worker.h"
31
#include "daemon/engine.h"
32
#include "daemon/io.h"
33

34
35
36
/* @internal IO request entry. */
struct ioreq
{
37
38
39
40
41
42
43
	union {
		uv_udp_t      udp;
		uv_tcp_t      tcp;
		uv_udp_send_t send;
		uv_write_t    write;
		uv_connect_t  connect;
	} as;
44
45
};

46
47
48
/** @internal Number of request within timeout window. */
#define MAX_PENDING (KR_NSREP_MAXADDR + (KR_NSREP_MAXADDR / 2))

49
50
51
52
53
54
55
/** @internal Debugging facility. */
#ifdef DEBUG
#define DEBUG_MSG(fmt...) printf("[daem] " fmt)
#else
#define DEBUG_MSG(fmt...)
#endif

56
57
58
59
60
61
/** @internal Query resolution task. */
struct qr_task
{
	struct kr_request req;
	struct worker_ctx *worker;
	knot_pkt_t *pktbuf;
62
	array_t(struct qr_task *) waiting;
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
	uv_handle_t *pending[MAX_PENDING];
	uint16_t pending_count;
	uint16_t addrlist_count;
	uint16_t addrlist_turn;
	struct sockaddr *addrlist;
	uv_timer_t retry, timeout;
	worker_cb_t on_complete;
	void *baton;
	struct {
		union {
			struct sockaddr_in ip4;
			struct sockaddr_in6 ip6;
		} addr;
		uv_handle_t *handle;
	} source;
	uint16_t iter_count;
	uint16_t refs;
	uint16_t bytes_remaining;
81
82
	bool finished;
	bool leading;
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
};

/* Convenience macros */
#define qr_task_ref(task) \
	do { ++(task)->refs; } while(0)
#define qr_task_unref(task) \
	do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0)
#define qr_valid_handle(task, checked) \
	(!uv_is_closing((checked)) || (task)->source.handle == (checked))

/* Forward decls */
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet);

/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
{
	return uv_default_loop()->data;
}

102
103
static inline struct ioreq *ioreq_take(struct worker_ctx *worker)
{
104
	struct ioreq *req = NULL;
105
106
107
108
109
110
	if (worker->ioreqs.len > 0) {
		req = array_tail(worker->ioreqs);
		array_pop(worker->ioreqs);
	} else {
		req = malloc(sizeof(*req));
	}
111
	kr_asan_unpoison(req, sizeof(*req));
112
113
114
115
116
	return req;
}

static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req)
{
117
	kr_asan_poison(req, sizeof(*req));
118
	if (!req || worker->ioreqs.len < 4 * MP_FREELIST_SIZE) {
119
120
121
122
123
124
		array_push(worker->ioreqs, req);
	} else {
		free(req);
	}
}

125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype)
{
	if (task->pending_count >= MAX_PENDING) {
		return NULL;
	}
	/* Create connection for iterative query */
	uv_handle_t *req = (uv_handle_t *)ioreq_take(task->worker);
	if (!req) {
		return NULL;
	}
	io_create(task->worker->loop, req, socktype);
	req->data = task;
	/* Connect or issue query datagram */
	task->pending[task->pending_count] = req;
	task->pending_count += 1;
	return req;
}

static void ioreq_on_close(uv_handle_t *handle)
{
	struct worker_ctx *worker = get_worker();
	ioreq_release(worker, (struct ioreq *)handle);
}

static void ioreq_kill(uv_handle_t *req)
{
	assert(req);
	if (!uv_is_closing(req)) {
		io_stop_read(req);
		uv_close(req, ioreq_on_close);
	}
}

static void ioreq_killall(struct qr_task *task)
{
	for (size_t i = 0; i < task->pending_count; ++i) {
		ioreq_kill(task->pending[i]);
	}
	task->pending_count = 0;
}

166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
/** @cond This memory layout is internal to mempool.c, use only for debugging. */
#if defined(__SANITIZE_ADDRESS__)
struct mempool_chunk {
  struct mempool_chunk *next;
  size_t size;
};
static void mp_poison(struct mempool *mp, bool poison)
{
	if (!poison) { /* @note mempool is part of the first chunk, unpoison it first */
		kr_asan_unpoison(mp, sizeof(*mp));
	}
	struct mempool_chunk *chunk = mp->state.last[0];
	void *chunk_off = (void *)chunk - chunk->size;
	if (poison) {
		kr_asan_poison(chunk_off, chunk->size);
	} else {
		kr_asan_unpoison(chunk_off, chunk->size);
	}
}
#else
#define mp_poison(mp, enable)
#endif
/** @endcond */

190
191
192
193
194
195
196
197
198
199
static inline struct mempool *pool_take(struct worker_ctx *worker)
{
	/* Recycle available mempool if possible */
	struct mempool *mp = NULL;
	if (worker->pools.len > 0) {
		mp = array_tail(worker->pools);
		array_pop(worker->pools);
	} else { /* No mempool on the freelist, create new one */
		mp = mp_new (4 * CPU_PAGE_SIZE);
	}
200
	mp_poison(mp, 0);
201
202
203
204
205
206
207
208
209
	return mp;
}

static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
{
	/* Return mempool to ring or free it if it's full */
	if (worker->pools.len < MP_FREELIST_SIZE) {
		mp_flush(mp);
		array_push(worker->pools, mp);
210
		mp_poison(mp, 1);
211
212
213
214
215
	} else {
		mp_delete(mp);
	}
}

216
static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr *addr)
217
{
218
	/* How much can client handle? */
219
	struct engine *engine = worker->engine;
220
	size_t answer_max = KNOT_WIRE_MIN_PKTSIZE;
221
	size_t pktbuf_max = KR_EDNS_PAYLOAD;
222
223
224
	if (engine->resolver.opt_rr) {
		pktbuf_max = MAX(knot_edns_get_payload(engine->resolver.opt_rr), pktbuf_max);
	}
225
	if (!addr && handle) { /* TCP */
226
		answer_max = KNOT_WIRE_MAX_PKTSIZE;
227
		pktbuf_max = KNOT_WIRE_MAX_PKTSIZE;
228
229
230
231
	} else if (knot_pkt_has_edns(query)) { /* EDNS */
		answer_max = MAX(knot_edns_get_payload(query->opt_rr), KNOT_WIRE_MIN_PKTSIZE);
	}

232
	/* Recycle available mempool if possible */
233
	knot_mm_t pool = {
234
		.ctx = pool_take(worker),
235
		.alloc = (knot_mm_alloc_t) mp_alloc
236
	};
237

238
	/* Create resolution task */
239
240
241
242
243
	struct qr_task *task = mm_alloc(&pool, sizeof(*task));
	if (!task) {
		mp_delete(pool.ctx);
		return NULL;
	}
244
	/* Create packet buffers for answer and subrequests */
245
	task->req.pool = pool;
246
247
248
	knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &task->req.pool);
	knot_pkt_t *answer = knot_pkt_new(NULL, answer_max, &task->req.pool);
	if (!pktbuf || !answer) {
249
250
251
252
		mp_delete(pool.ctx);
		return NULL;
	}
	task->req.answer = answer;
253
	task->pktbuf = pktbuf;
254
	array_init(task->waiting);
255
256
	task->addrlist = NULL;
	task->pending_count = 0;
257
	task->bytes_remaining = 0;
258
	task->iter_count = 0;
259
	task->refs = 1;
260
	task->finished = false;
261
	task->leading = false;
262
263
	task->worker = worker;
	task->source.handle = handle;
264
	uv_timer_init(worker->loop, &task->retry);
265
	uv_timer_init(worker->loop, &task->timeout);
266
	task->retry.data = task;
267
	task->timeout.data = task;
268
	task->on_complete = NULL;
269
270
	task->req.qsource.key = NULL;
	task->req.qsource.addr = NULL;
271
	/* Remember query source addr */
272
	if (addr) {
273
274
275
276
		size_t addr_len = sizeof(struct sockaddr_in);
		if (addr->sa_family == AF_INET6)
			addr_len = sizeof(struct sockaddr_in6);
		memcpy(&task->source.addr, addr, addr_len);
277
		task->req.qsource.addr = (const struct sockaddr *)&task->source.addr;
278
279
280
	} else {
		task->source.addr.ip4.sin_family = AF_UNSPEC;
	}
281
282
283
284
	/* Remember query source TSIG key */
	if (query->tsig_rr) {
		task->req.qsource.key = knot_rrset_copy(query->tsig_rr, &task->req.pool);
	}
285
286

	/* Start resolution */
287
	kr_resolve_begin(&task->req, &engine->resolver, answer);
288
	worker->stats.concurrent += 1;
289
	worker->stats.queries += 1;
290
291
292
	return task;
}

293
/* This is called when the task refcount is zero, free memory. */
294
static void qr_task_free(struct qr_task *task)
295
{
296
	/* Return mempool to ring or free it if it's full */
297
	struct worker_ctx *worker = task->worker;
298
299
	pool_release(worker, task->req.pool.ctx);
	/* @note The 'task' is invalidated from now on. */
Marek Vavruša's avatar
Marek Vavruša committed
300
	/* Decommit memory every once in a while */
301
	static int mp_delete_count = 0;
302
303
304
	if (++mp_delete_count == 100000) {
		lua_gc(worker->engine->L, LUA_GCCOLLECT, 0);
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
Marek Vavruša's avatar
Marek Vavruša committed
305
		malloc_trim(0);
306
#endif
Marek Vavruša's avatar
Marek Vavruša committed
307
		mp_delete_count = 0;
308
	}
309
}
310

311
312
313
314
315
316
317
318
/* This is called when retry timer closes */
static void retransmit_close(uv_handle_t *handle)
{
	struct qr_task *task = handle->data;
	qr_task_unref(task);
}

/* This is called when task completes and timeout timer is closed. */
319
320
321
322
static void qr_task_complete(uv_handle_t *handle)
{
	struct qr_task *task = handle->data;
	struct worker_ctx *worker = task->worker;
323
324
	/* Kill pending I/O requests */
	ioreq_killall(task);
325
326
	assert(task->waiting.len == 0);
	assert(task->leading == false);
327
328
329
330
331
332
333
334
335
336
337
338
	/* Run the completion callback. */
	if (task->on_complete) {
		task->on_complete(worker, &task->req, task->baton);
	}
	/* Return handle to the event loop in case
	 * it was exclusively taken by this task. */
	if (task->source.handle && !uv_has_ref(task->source.handle)) {
		uv_ref(task->source.handle);
		io_start_read(task->source.handle);
	}
	/* Release task */
	qr_task_unref(task);
339
340
	/* Update stats */
	worker->stats.concurrent -= 1;
341
342
}

343
344
/* This is called when I/O timeouts */
static void on_timeout(uv_timer_t *req)
345
346
{
	struct qr_task *task = req->data;
347
348
349
350
351
352
353
	uv_handle_t *handle = (uv_handle_t *)req;
#ifdef DEBUG
	char qname_str[KNOT_DNAME_MAXLEN] = {'\0'}, type_str[16] = {'\0'};
	knot_dname_to_str(qname_str, knot_pkt_qname(task->pktbuf), sizeof(qname_str));
	knot_rrtype_to_string(knot_pkt_qtype(task->pktbuf), type_str, sizeof(type_str));
	DEBUG_MSG("ioreq timeout %s %s %p\n", qname_str, type_str, req);
#endif
354
355
356
	/* Ignore if this timeout is being terminated. */
	if (uv_is_closing(handle)) {
		return;
357
	}
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
	/* Penalize all tried nameservers with a timeout. */
	struct worker_ctx *worker = task->worker;
	if (task->leading && task->pending_count > 0) {
		struct kr_query *qry = array_tail(task->req.rplan.pending);
		struct sockaddr_in6 *addrlist = (struct sockaddr_in6 *)task->addrlist;
		for (uint16_t i = 0; i < MIN(task->pending_count, task->addrlist_count); ++i) {
			struct sockaddr *choice = (struct sockaddr *)(&addrlist[i]);
			WITH_DEBUG {
				char addr_str[INET6_ADDRSTRLEN];
				inet_ntop(choice->sa_family, kr_inaddr(choice), addr_str, sizeof(addr_str));
				QRDEBUG(qry, "wrkr", "=> server: '%s' flagged as 'bad'\n", addr_str);
			}
			kr_nsrep_update_rtt(&qry->ns, choice, KR_NS_TIMEOUT, worker->engine->resolver.cache_rtt);
		}
	}
	/* Interrupt current pending request. */
	worker->stats.timeout += 1;
	qr_task_step(task, NULL, NULL);
376
377
}

378
/* This is called when we send subrequest / answer */
379
static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
380
{
381
	if (!task->finished) {
382
		if (status == 0 && handle) {
Daniel Kahn Gillmor's avatar
Daniel Kahn Gillmor committed
383
			io_start_read(handle); /* Start reading new query */
384
		} else {
385
			DEBUG_MSG("ioreq send_done %p => %d, %s\n", handle, status, uv_strerror(status));
386
		}
387
388
389
390
391
	} else {
		/* Close retry timer (borrows task) */
		qr_task_ref(task);
		uv_close((uv_handle_t *)&task->retry, retransmit_close);
		/* Close timeout timer (finishes task) */
392
		uv_close((uv_handle_t *)&task->timeout, qr_task_complete);
393
	}
394
	return status;
395
396
}

397
398
static void on_send(uv_udp_send_t *req, int status)
{
399
	struct worker_ctx *worker = get_worker();
400
	struct qr_task *task = req->data;
401
	if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
402
403
		qr_task_on_send(task, (uv_handle_t *)req->handle, status);
	}
404
	qr_task_unref(task);
405
	ioreq_release(worker, (struct ioreq *)req);
406
407
408
409
}

static void on_write(uv_write_t *req, int status)
{
410
	struct worker_ctx *worker = get_worker();
411
	struct qr_task *task = req->data;
412
	if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
413
414
		qr_task_on_send(task, (uv_handle_t *)req->handle, status);
	}
415
	qr_task_unref(task);
416
	ioreq_release(worker, (struct ioreq *)req);
417
418
}

419
420
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
{
Marek Vavruša's avatar
Marek Vavruša committed
421
	if (!handle) {
422
		return qr_task_on_send(task, handle, kr_error(EIO));
423
	}
424
425
426
	struct ioreq *send_req = ioreq_take(task->worker);
	if (!send_req) {
		return qr_task_on_send(task, handle, kr_error(ENOMEM));
Marek Vavruša's avatar
Marek Vavruša committed
427
	}
428

429
	/* Send using given protocol */
430
	int ret = 0;
431
432
	if (handle->type == UV_UDP) {
		uv_buf_t buf = { (char *)pkt->wire, pkt->size };
433
434
		send_req->as.send.data = task;
		ret = uv_udp_send(&send_req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
435
436
437
438
439
440
	} else {
		uint16_t pkt_size = htons(pkt->size);
		uv_buf_t buf[2] = {
			{ (char *)&pkt_size, sizeof(pkt_size) },
			{ (char *)pkt->wire, pkt->size }
		};
441
442
443
444
		send_req->as.write.data = task;
		ret = uv_write(&send_req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
	}
	if (ret == 0) {
445
		qr_task_ref(task); /* Pending ioreq on current task */
446
	} else {
447
		DEBUG_MSG("ioreq send_start %p => %d, %s\n", send_req, ret, uv_strerror(ret));
448
		ioreq_release(task->worker, send_req);
449
	}
450

451
452
453
454
	/* Update statistics */
	if (handle != task->source.handle && addr) {
		if (handle->type == UV_UDP)
			task->worker->stats.udp += 1;
455
456
		else
			task->worker->stats.tcp += 1;
457
458
		if (addr->sa_family == AF_INET6)
			task->worker->stats.ipv6 += 1;
459
460
		else
			task->worker->stats.ipv4 += 1;
Marek Vavruša's avatar
Marek Vavruša committed
461
	}
462
	return ret;
463
464
}

465
static void on_connect(uv_connect_t *req, int status)
466
{
467
	struct worker_ctx *worker = get_worker();
468
	struct qr_task *task = req->data;
469
470
	uv_stream_t *handle = req->handle;
	if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
471
		if (status == 0) {
472
			qr_task_send(task, (uv_handle_t *)handle, NULL, task->pktbuf);
473
		} else {
474
			DEBUG_MSG("ioreq conn_done %p => %d, %s\n", req, status, uv_strerror(status));
475
			qr_task_step(task, task->addrlist, NULL);
476
		}
477
	}
478
	qr_task_unref(task);
479
	ioreq_release(worker, (struct ioreq *)req);
480
481
}

482
static bool retransmit(struct qr_task *task)
483
{
484
	if (task && task->addrlist) {
485
		uv_handle_t *subreq = ioreq_spawn(task, SOCK_DGRAM);
486
		if (subreq) { /* Create connection for iterative query */
487
488
489
			struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
			if (qr_task_send(task, subreq, (struct sockaddr *)choice, task->pktbuf) == 0) {
				task->addrlist_turn = (task->addrlist_turn + 1) % task->addrlist_count; /* Round robin */
490
				return true;
491
492
493
			}
		}
	}
494
495
496
497
498
499
500
	return false;
}

static void on_retransmit(uv_timer_t *req)
{
	if (uv_is_closing((uv_handle_t *)req))
		return;
501
502
	if (!retransmit(req->data)) {
		uv_timer_stop(req); /* Not possible to spawn request, stop trying */
503
	}
504
505
}

506
507
/** @internal Get key from current outstanding subrequest. */
static int subreq_key(char *dst, struct qr_task *task)
508
{
509
510
511
	assert(task);
	knot_pkt_t *pkt = task->pktbuf;
	assert(knot_wire_get_qr(pkt->wire) == false);
512
	return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
513
514
}

515
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
516
{
517
	/* Close pending I/O requests */
518
519
520
521
	if (uv_is_active((uv_handle_t *)&task->retry))
		uv_timer_stop(&task->retry);
	if (uv_is_active((uv_handle_t *)&task->timeout))
		uv_timer_stop(&task->timeout);
522
	ioreq_killall(task);
523
524
525
	/* Clear from outstanding table. */
	if (!task->leading)
		return;
526
	char key[KR_RRKEY_LEN];
527
528
529
530
531
532
	int ret = subreq_key(key, task);
	if (ret > 0) {
		assert(map_get(&task->worker->outstanding, key) == task);
		map_del(&task->worker->outstanding, key);
	}
	/* Notify waiting tasks. */
533
	struct kr_query *leader_qry = array_tail(task->req.rplan.pending);
534
535
	for (size_t i = task->waiting.len; i --> 0;) {
		struct qr_task *follower = task->waiting.at[i];
536
		struct kr_query *qry = array_tail(follower->req.rplan.pending);
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
		/* Reuse MSGID and 0x20 secret */
		if (qry) {
			qry->id = leader_qry->id;
			qry->secret = leader_qry->secret;
			leader_qry->secret = 0; /* Next will be already decoded */
		}
		qr_task_step(follower, packet_source, pkt);
		qr_task_unref(follower);
	}
	task->waiting.len = 0;
	task->leading = false;
}

static void subreq_lead(struct qr_task *task)
{
	assert(task);
553
	char key[KR_RRKEY_LEN];
554
555
556
557
558
559
560
561
562
563
	if (subreq_key(key, task) > 0) {
		assert(map_contains(&task->worker->outstanding, key) == false);
		map_set(&task->worker->outstanding, key, task);
		task->leading = true;
	}
}

static bool subreq_enqueue(struct qr_task *task)
{
	assert(task);
564
	char key[KR_RRKEY_LEN];
565
566
567
568
	if (subreq_key(key, task) > 0) {
		struct qr_task *leader = map_get(&task->worker->outstanding, key);
		if (leader) {
			/* Enqueue itself to leader for this subrequest. */
569
			int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, kr_memreserve, &leader->req.pool);
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
			if (ret == 0) {
				array_push(leader->waiting, task);
				qr_task_ref(task);
				return true;
			}
		}
	}
	return false;
}

static int qr_task_finalize(struct qr_task *task, int state)
{
	assert(task && task->leading == false);
	kr_resolve_finish(&task->req, state);
	task->finished = true;
	/* Send back answer */
	(void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
	return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
588
}
589

590
591
592
593
594
595
596
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
{
	/* No more steps after we're finished. */
	if (task->finished) {
		return kr_error(ESTALE);
	}
	/* Close pending I/O requests */
597
	subreq_finalize(task, packet_source, packet);
598
599
	/* Consume input and produce next query */
	int sock_type = -1;
600
601
602
	task->addrlist = NULL;
	task->addrlist_count = 0;
	task->addrlist_turn = 0;
603
	int state = kr_resolve_consume(&task->req, packet_source, packet);
604
	while (state == KNOT_STATE_PRODUCE) {
605
		state = kr_resolve_produce(&task->req, &task->addrlist, &sock_type, task->pktbuf);
606
		if (unlikely(++task->iter_count > KR_ITER_LIMIT)) {
607
			DEBUG_MSG("task iter_limit %p\n", task);
608
609
			return qr_task_finalize(task, KNOT_STATE_FAIL);
		}
610
611
612
613
614
	}

	/* We're done, no more iterations needed */
	if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
		return qr_task_finalize(task, state);
615
	} else if (!task->addrlist || sock_type < 0) {
616
		return qr_task_step(task, NULL, NULL);
617
618
	}

619
620
621
622
623
	/* Count available address choices */
	struct sockaddr_in6 *choice = (struct sockaddr_in6 *)task->addrlist;
	for (size_t i = 0; i < KR_NSREP_MAXADDR && choice->sin6_family != AF_UNSPEC; ++i) {
		task->addrlist_count += 1;
		choice += 1;
624
625
	}

626
	/* Start fast retransmit with UDP, otherwise connect. */
627
	if (sock_type == SOCK_DGRAM) {
628
629
630
631
632
		/* If such subrequest is outstanding, enqueue to it. */
		if (subreq_enqueue(task)) {
			return kr_ok(); /* Will be notified when outstanding subrequest finishes. */
		}
		/* Start transmitting */
633
		if (retransmit(task)) {
634
			uv_timer_start(&task->retry, on_retransmit, KR_CONN_RETRY, KR_CONN_RETRY);
635
636
		} else {
			return qr_task_step(task, NULL, NULL);
637
		}
638
639
640
641
		/* Announce and start subrequest.
		 * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
		 */
		subreq_lead(task);
642
643
644
	} else {
		struct ioreq *conn = ioreq_take(task->worker);
		if (!conn) {
645
			return qr_task_step(task, NULL, NULL);
646
		}
647
648
649
		uv_handle_t *client = ioreq_spawn(task, sock_type);
		if (!client) {
			ioreq_release(task->worker, conn);
650
			return qr_task_step(task, NULL, NULL);
651
		}
652
653
654
		conn->as.connect.data = task;
		if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
			ioreq_release(task->worker, conn);
655
			return qr_task_step(task, NULL, NULL);
656
		}
657
		/* Connect request borrows task */
658
		qr_task_ref(task);
659
660
	}

661
662
	/* Start next step with timeout, fatal if can't start a timer. */
	int ret = uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0);
663
664
	if (ret != 0) {
		subreq_finalize(task, packet_source, packet);
665
		return qr_task_finalize(task, KNOT_STATE_FAIL);
666
667
668
	}

	return ret;
669
670
}

671
static int parse_packet(knot_pkt_t *query)
672
{
673
674
675
	if (!query)
		return kr_error(EINVAL);

676
677
678
679
680
681
682
683
684
685
686
687
688
689
	/* Parse query packet. */
	int ret = knot_pkt_parse(query, 0);
	if (ret != KNOT_EOK) {
		return kr_error(EPROTO); /* Ignore malformed query. */
	}

	/* Check if at least header is parsed. */
	if (query->parsed < query->size) {
		return kr_error(EMSGSIZE);
	}

	return kr_ok();
}

690
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr)
691
{
692
	if (!worker || !handle) {
693
694
695
		return kr_error(EINVAL);
	}

696
697
	/* Parse packet */
	int ret = parse_packet(query);
698

699
	/* Start new task on master sockets, or resume existing */
700
	struct qr_task *task = handle->data;
701
702
	bool is_master_socket = (!task);
	if (is_master_socket) {
703
		/* Ignore badly formed queries or responses. */
704
		if (!query || ret != 0 || knot_wire_get_qr(query->wire)) {
705
			DEBUG_MSG("task bad_query %p => %d, %s\n", task, ret, kr_strerror(ret));
706
			worker->stats.dropped += 1;
707
708
			return kr_error(EINVAL); /* Ignore. */
		}
709
		task = qr_task_create(worker, handle, query, addr);
710
711
712
713
714
715
		if (!task) {
			return kr_error(ENOMEM);
		}
	}

	/* Consume input and produce next query */
716
	return qr_task_step(task, addr, query);
717
}
718

719
720
721
722
723
724
/* Return DNS/TCP message size. */
static int msg_size(const uint8_t *msg, size_t len)
{
		if (len < 2) {
			return kr_error(EMSGSIZE);
		}
725
		return wire_read_u16(msg);
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
}

int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len)
{
	if (!worker || !handle || !msg) {
		return kr_error(EINVAL);
	}

	int nbytes = msg_size(msg, len);
	struct qr_task *task = handle->data;
	const bool start_assembly = (task && task->bytes_remaining == 0);

	/* Message is a query (we have no context to buffer it) or complete. */
	if (!task || (start_assembly && nbytes == len - 2)) {
		if (nbytes <= 0) {
			return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);	
		}
		knot_pkt_t *pkt_nocopy = knot_pkt_new((void *)(msg + 2), nbytes, &worker->pkt_pool);
		return worker_exec(worker, handle, pkt_nocopy, NULL);
	}
	/* Starting a new message assembly */
	knot_pkt_t *pkt_buf = task->pktbuf;
	if (start_assembly) {
		if (nbytes <= 0) {
			return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);	
		}	
		knot_pkt_clear(pkt_buf);
		pkt_buf->size = 0;
		/* Cut off message length */
		task->bytes_remaining = nbytes;
		len -= 2;
		msg += 2;
	}
	/* Message is too long, can't process it. */
	if (len > pkt_buf->max_size - pkt_buf->size) {
		task->bytes_remaining = 0;
		return worker_exec(worker, handle, NULL, NULL);
	}
	/* Buffer message and check if it's complete */
	memcpy(pkt_buf->wire + pkt_buf->size, msg, len);
	pkt_buf->size += len;
	if (len >= task->bytes_remaining) {
		task->bytes_remaining = 0;
		return worker_exec(worker, handle, pkt_buf, NULL);
	}
	/* Return number of bytes remaining to receive. */
	task->bytes_remaining -= len;
	return task->bytes_remaining;
}

776
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton)
777
{
778
	if (!worker || !query) {
779
780
781
782
783
784
785
786
		return kr_error(EINVAL);
	}

	/* Create task */
	struct qr_task *task = qr_task_create(worker, NULL, query, NULL);
	if (!task) {
		return kr_error(ENOMEM);
	}
787
788
	task->baton = baton;
	task->on_complete = on_complete;
789
	task->req.options |= options;
790
	return qr_task_step(task, NULL, query);
791
792
}

793
794
795
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
	array_init(worker->pools);
Marek Vavruša's avatar
Marek Vavruša committed
796
	array_init(worker->ioreqs);
797
798
	if (array_reserve(worker->pools, ring_maxlen) || array_reserve(worker->ioreqs, ring_maxlen))
		return kr_error(ENOMEM);
Marek Vavruša's avatar
Marek Vavruša committed
799
800
	memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
	worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
801
	worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc;
802
	worker->outstanding = map_make();
Marek Vavruša's avatar
Marek Vavruša committed
803
	return kr_ok();
804
805
}

806
#define reclaim_freelist(list, type, cb) \
807
	for (unsigned i = 0; i < list.len; ++i) { \
808
809
810
		type *elm = list.at[i]; \
		kr_asan_unpoison(elm, sizeof(type)); \
		cb(elm); \
811
812
813
	} \
	array_clear(list)

814
815
void worker_reclaim(struct worker_ctx *worker)
{
816
817
	reclaim_freelist(worker->pools, struct mempool, mp_delete);
	reclaim_freelist(worker->ioreqs, struct ioreq, free);
Marek Vavruša's avatar
Marek Vavruša committed
818
819
	mp_delete(worker->pkt_pool.ctx);
	worker->pkt_pool.ctx = NULL;
820
	map_clear(&worker->outstanding);
821
}
822
823

#undef DEBUG_MSG