worker.c 52.9 KB
Newer Older
1
/*  Copyright (C) 2014-2017 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
Marek Vavruša's avatar
Marek Vavruša committed
2 3 4 5 6 7 8 9 10 11 12 13

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
14
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
Marek Vavruša's avatar
Marek Vavruša committed
15 16
 */

17
#include <uv.h>
18
#include <lua.h>
19
#include <libknot/packet/pkt.h>
20
#include <libknot/descriptor.h>
21 22
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
23
#include <contrib/wire.h>
Marek Vavruša's avatar
Marek Vavruša committed
24 25 26
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
#include <malloc.h>
#endif
27
#include <assert.h>
28 29
#include <sys/types.h>
#include <unistd.h>
30
#include <gnutls/gnutls.h>
31
#include "lib/utils.h"
32
#include "lib/layer.h"
33
#include "daemon/worker.h"
34
#include "daemon/bindings.h"
35
#include "daemon/engine.h"
36
#include "daemon/io.h"
37
#include "daemon/tls.h"
Grigorii Demidov's avatar
Grigorii Demidov committed
38
#include "daemon/zimport.h"
39
#include "daemon/session.h"
40

41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56

/* Magic defaults for the worker. */
#ifndef MP_FREELIST_SIZE
# ifdef __clang_analyzer__
#  define MP_FREELIST_SIZE 0
# else
#  define MP_FREELIST_SIZE 64 /**< Maximum length of the worker mempool freelist */
# endif
#endif
#ifndef QUERY_RATE_THRESHOLD
#define QUERY_RATE_THRESHOLD (2 * MP_FREELIST_SIZE) /**< Nr of parallel queries considered as high rate */
#endif
#ifndef MAX_PIPELINED
#define MAX_PIPELINED 100
#endif

57
#define VERBOSE_MSG(qry, ...) QRVERBOSE(qry, "wrkr", __VA_ARGS__)
58

59 60 61 62 63 64 65 66 67 68 69 70 71
/** Client request state. */
struct request_ctx
{
	struct kr_request req;
	struct {
		union inaddr addr;
		union inaddr dst_addr;
		/* uv_handle_t *handle; */

		/** NULL if the request didn't come over network. */
		struct session *session;
	} source;
	struct worker_ctx *worker;
72
	struct qr_task *task;
73 74 75 76 77 78 79 80
};

/** Query resolution task. */
struct qr_task
{
	struct request_ctx *ctx;
	knot_pkt_t *pktbuf;
	qr_tasklist_t waiting;
81
	struct session *pending[MAX_PENDING];
82 83 84 85 86 87 88 89 90
	uint16_t pending_count;
	uint16_t addrlist_count;
	uint16_t addrlist_turn;
	uint16_t timeouts;
	uint16_t iter_count;
	struct sockaddr *addrlist;
	uint32_t refs;
	bool finished : 1;
	bool leading  : 1;
91
	uint64_t creation_time;
92 93
};

94

95 96 97 98
/* Convenience macros */
#define qr_task_ref(task) \
	do { ++(task)->refs; } while(0)
#define qr_task_unref(task) \
99
	do { if (task && --(task)->refs == 0) { qr_task_free(task); } } while (0)
100 101 102 103 104

/** @internal get key for tcp session
 *  @note kr_straddr() return pointer to static string
 */
#define tcpsess_key(addr) kr_straddr(addr)
105 106

/* Forward decls */
107
static void qr_task_free(struct qr_task *task);
108 109 110
static int qr_task_step(struct qr_task *task,
			const struct sockaddr *packet_source,
			knot_pkt_t *packet);
111
static int qr_task_send(struct qr_task *task, struct session *session,
112 113 114 115
			struct sockaddr *addr, knot_pkt_t *pkt);
static int qr_task_finalize(struct qr_task *task, int state);
static void qr_task_complete(struct qr_task *task);
static struct session* worker_find_tcp_connected(struct worker_ctx *worker,
116
						 const struct sockaddr *addr);
117 118 119 120 121 122
static int worker_add_tcp_waiting(struct worker_ctx *worker,
				  const struct sockaddr *addr,
				  struct session *session);
static int worker_del_tcp_waiting(struct worker_ctx *worker,
				  const struct sockaddr *addr);
static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
123
					       const struct sockaddr *addr);
124
static void on_tcp_connect_timeout(uv_timer_t *timer);
125 126 127 128 129 130 131

/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
{
	return uv_default_loop()->data;
}

132 133
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
 *  socktype is SOCK_* */
134
static uv_handle_t *ioreq_spawn(struct worker_ctx *worker, int socktype, sa_family_t family)
135
{
136 137 138
	bool precond = (socktype == SOCK_DGRAM || socktype == SOCK_STREAM)
			&& (family == AF_INET  || family == AF_INET6);
	if (!precond) {
139 140
		/* assert(false); see #245 */
		kr_log_verbose("[work] ioreq_spawn: pre-condition failed\n");
141 142 143
		return NULL;
	}

144
	/* Create connection for iterative query */
145 146
	uv_handle_t *handle = malloc(socktype == SOCK_DGRAM
					? sizeof(uv_udp_t) : sizeof(uv_tcp_t));
147 148 149
	if (!handle) {
		return NULL;
	}
150 151 152 153 154 155
	int ret = io_create(worker->loop, handle, socktype, family);
	if (ret) {
		if (ret == UV_EMFILE) {
			worker->too_many_open = true;
			worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
		}
156
		free(handle);
157 158
		return NULL;
	}
159 160 161 162

	/* Bind to outgoing address, according to IP v4/v6. */
	union inaddr *addr;
	if (family == AF_INET) {
163
		addr = (union inaddr *)&worker->out_addr4;
164
	} else {
165
		addr = (union inaddr *)&worker->out_addr6;
166 167 168 169
	}
	if (addr->ip.sa_family != AF_UNSPEC) {
		assert(addr->ip.sa_family == family);
		if (socktype == SOCK_DGRAM) {
170 171 172 173 174
			uv_udp_t *udp = (uv_udp_t *)handle;
			ret = uv_udp_bind(udp, &addr->ip, 0);
		} else if (socktype == SOCK_STREAM){
			uv_tcp_t *tcp = (uv_tcp_t *)handle;
			ret = uv_tcp_bind(tcp, &addr->ip, 0);
175 176 177
		}
	}

178
	if (ret != 0) {
179
		io_deinit(handle);
180
		free(handle);
181 182
		return NULL;
	}
183 184 185 186

	/* Set current handle as a subrequest type. */
	struct session *session = handle->data;
	session_flags(session)->outgoing = true;
187
	/* Connect or issue query datagram */
188
	return handle;
189 190
}

191
static void ioreq_kill_pending(struct qr_task *task)
192
{
193
	for (uint16_t i = 0; i < task->pending_count; ++i) {
194
		session_kill_ioreq(task->pending[i], task);
195 196 197 198
	}
	task->pending_count = 0;
}

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
/** @cond This memory layout is internal to mempool.c, use only for debugging. */
#if defined(__SANITIZE_ADDRESS__)
struct mempool_chunk {
  struct mempool_chunk *next;
  size_t size;
};
static void mp_poison(struct mempool *mp, bool poison)
{
	if (!poison) { /* @note mempool is part of the first chunk, unpoison it first */
		kr_asan_unpoison(mp, sizeof(*mp));
	}
	struct mempool_chunk *chunk = mp->state.last[0];
	void *chunk_off = (void *)chunk - chunk->size;
	if (poison) {
		kr_asan_poison(chunk_off, chunk->size);
	} else {
		kr_asan_unpoison(chunk_off, chunk->size);
	}
}
#else
#define mp_poison(mp, enable)
#endif
/** @endcond */

223
/** Get a mempool.  (Recycle if possible.)  */
224
static inline struct mempool *pool_borrow(struct worker_ctx *worker)
225 226
{
	struct mempool *mp = NULL;
227 228 229 230
	if (worker->pool_mp.len > 0) {
		mp = array_tail(worker->pool_mp);
		array_pop(worker->pool_mp);
		mp_poison(mp, 0);
231 232 233 234 235 236
	} else { /* No mempool on the freelist, create new one */
		mp = mp_new (4 * CPU_PAGE_SIZE);
	}
	return mp;
}

237
/** Return a mempool.  (Cache them up to some count.) */
238 239
static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
{
240
	if (worker->pool_mp.len < MP_FREELIST_SIZE) {
241
		mp_flush(mp);
242
		array_push(worker->pool_mp, mp);
243
		mp_poison(mp, 1);
244 245 246 247 248
	} else {
		mp_delete(mp);
	}
}

249 250 251 252 253 254 255 256 257 258 259 260
/** Create a key for an outgoing subrequest: qname, qclass, qtype.
 * @param key Destination buffer for key size, MUST be SUBREQ_KEY_LEN or larger.
 * @return key length if successful or an error
 */
static const size_t SUBREQ_KEY_LEN = KR_RRKEY_LEN;
static int subreq_key(char *dst, knot_pkt_t *pkt)
{
	assert(pkt);
	return kr_rrkey(dst, knot_pkt_qclass(pkt), knot_pkt_qname(pkt),
			knot_pkt_qtype(pkt), knot_pkt_qtype(pkt));
}

261 262 263 264 265 266 267
/** Create and initialize a request_ctx (on a fresh mempool).
 *
 * handle and addr point to the source of the request, and they are NULL
 * in case the request didn't come from network.
 */
static struct request_ctx *request_create(struct worker_ctx *worker,
					  uv_handle_t *handle,
268 269
					  const struct sockaddr *addr,
					  uint32_t uid)
270
{
271
	knot_mm_t pool = {
272
		.ctx = pool_borrow(worker),
273
		.alloc = (knot_mm_alloc_t) mp_alloc
274
	};
275

276 277 278 279
	/* Create request context */
	struct request_ctx *ctx = mm_alloc(&pool, sizeof(*ctx));
	if (!ctx) {
		pool_release(worker, pool.ctx);
280 281
		return NULL;
	}
282

283 284 285 286
	memset(ctx, 0, sizeof(*ctx));

	/* TODO Relocate pool to struct request */
	ctx->worker = worker;
287 288
	struct session *s = handle ? handle->data : NULL;
	if (s) {
289
		assert(session_flags(s)->outgoing == false);
290
	}
291
	ctx->source.session = s;
292 293 294

	struct kr_request *req = &ctx->req;
	req->pool = pool;
295
	req->vars_ref = LUA_NOREF;
296
	req->uid = uid;
297
	req->daemon_context = worker;
298

299
	/* Remember query source addr */
300 301 302
	if (!addr || (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)) {
		ctx->source.addr.ip.sa_family = AF_UNSPEC;
	} else {
303
		memcpy(&ctx->source.addr, addr, kr_sockaddr_len(addr));
304
		ctx->req.qsource.addr = &ctx->source.addr.ip;
305
	}
306 307 308 309 310

	worker->stats.rconcurrent += 1;

	if (!handle) {
		return ctx;
311
	}
312

313
	/* Remember the destination address. */
314 315 316 317 318 319
	int addr_len = sizeof(ctx->source.dst_addr);
	struct sockaddr *dst_addr = &ctx->source.dst_addr.ip;
	ctx->source.dst_addr.ip.sa_family = AF_UNSPEC;
	if (handle->type == UV_UDP) {
		if (uv_udp_getsockname((uv_udp_t *)handle, dst_addr, &addr_len) == 0) {
			req->qsource.dst_addr = dst_addr;
320
		}
321
		req->qsource.flags.tcp = false;
322
		req->qsource.flags.tls = false;
323 324 325
	} else if (handle->type == UV_TCP) {
		if (uv_tcp_getsockname((uv_tcp_t *)handle, dst_addr, &addr_len) == 0) {
			req->qsource.dst_addr = dst_addr;
326
		}
327
		req->qsource.flags.tcp = true;
328
		req->qsource.flags.tls = s && session_flags(s)->has_tls;
329
	}
330 331

	return ctx;
332 333
}

334 335
/** More initialization, related to the particular incoming query/packet. */
static int request_start(struct request_ctx *ctx, knot_pkt_t *query)
336
{
337 338 339 340 341
	assert(query && ctx);
	size_t answer_max = KNOT_WIRE_MIN_PKTSIZE;
	struct kr_request *req = &ctx->req;

	/* source.session can be empty if request was generated by kresd itself */
342 343
	struct session *s = ctx->source.session;
	if (!s || session_get_handle(s)->type == UV_TCP) {
344 345 346 347
		answer_max = KNOT_WIRE_MAX_PKTSIZE;
	} else if (knot_pkt_has_edns(query)) { /* EDNS */
		answer_max = MAX(knot_edns_get_payload(query->opt_rr),
				 KNOT_WIRE_MIN_PKTSIZE);
348
	}
349
	req->qsource.size = query->size;
350 351 352
	if (knot_pkt_has_tsig(query)) {
		req->qsource.size += query->tsig_wire.len;
	}
353

354 355
	knot_pkt_t *answer = knot_pkt_new(NULL, answer_max, &req->pool);
	if (!answer) { /* Failed to allocate answer */
356 357 358
		return kr_error(ENOMEM);
	}

359 360 361
	knot_pkt_t *pkt = knot_pkt_new(NULL, req->qsource.size, &req->pool);
	if (!pkt) {
		return kr_error(ENOMEM);
362
	}
363 364 365

	int ret = knot_pkt_copy(pkt, query);
	if (ret != KNOT_EOK && ret != KNOT_ETRAIL) {
366
		return kr_error(ENOMEM);
367
	}
368 369
	req->qsource.packet = pkt;

370 371 372
	/* Start resolution */
	struct worker_ctx *worker = ctx->worker;
	struct engine *engine = worker->engine;
373
	kr_resolve_begin(req, &engine->resolver, answer);
374 375 376 377 378 379 380 381 382 383 384
	worker->stats.queries += 1;
	/* Throttle outbound queries only when high pressure */
	if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
		req->options.NO_THROTTLE = true;
	}
	return kr_ok();
}

static void request_free(struct request_ctx *ctx)
{
	struct worker_ctx *worker = ctx->worker;
385 386 387 388 389 390 391 392 393 394 395 396 397 398
	/* Dereference any Lua vars table if exists */
	if (ctx->req.vars_ref != LUA_NOREF) {
		lua_State *L = worker->engine->L;
		/* Get worker variables table */
		lua_rawgeti(L, LUA_REGISTRYINDEX, worker->vars_table_ref);
		/* Get next free element (position 0) and store it under current reference (forming a list) */
		lua_rawgeti(L, -1, 0);
		lua_rawseti(L, -2, ctx->req.vars_ref);
		/* Set current reference as the next free element */
		lua_pushinteger(L, ctx->req.vars_ref);
		lua_rawseti(L, -2, 0);
		lua_pop(L, 1);
		ctx->req.vars_ref = LUA_NOREF;
	}
399
	/* Return mempool to ring or free it if it's full */
400
	pool_release(worker, ctx->req.pool.ctx);
401
	/* @note The 'task' is invalidated from now on. */
Marek Vavruša's avatar
Marek Vavruša committed
402
	/* Decommit memory every once in a while */
403
	static int mp_delete_count = 0;
404 405 406
	if (++mp_delete_count == 100000) {
		lua_gc(worker->engine->L, LUA_GCCOLLECT, 0);
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
Marek Vavruša's avatar
Marek Vavruša committed
407
		malloc_trim(0);
408
#endif
Marek Vavruša's avatar
Marek Vavruša committed
409
		mp_delete_count = 0;
410
	}
411
	worker->stats.rconcurrent -= 1;
412
}
413

414 415 416 417 418 419 420 421
static struct qr_task *qr_task_create(struct request_ctx *ctx)
{
	/* How much can client handle? */
	struct engine *engine = ctx->worker->engine;
	size_t pktbuf_max = KR_EDNS_PAYLOAD;
	if (engine->resolver.opt_rr) {
		pktbuf_max = MAX(knot_edns_get_payload(engine->resolver.opt_rr),
				 pktbuf_max);
422 423
	}

424 425 426 427
	/* Create resolution task */
	struct qr_task *task = mm_alloc(&ctx->req.pool, sizeof(*task));
	if (!task) {
		return NULL;
428
	}
429
	memset(task, 0, sizeof(*task)); /* avoid accidentally unintialized fields */
430

431 432 433 434 435
	/* Create packet buffers for answer and subrequests */
	knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &ctx->req.pool);
	if (!pktbuf) {
		mm_free(&ctx->req.pool, task);
		return NULL;
436
	}
437
	pktbuf->size = 0;
438

439 440 441 442
	task->ctx = ctx;
	task->pktbuf = pktbuf;
	array_init(task->waiting);
	task->refs = 0;
443 444 445 446
	assert(ctx->task == NULL);
	ctx->task = task;
	/* Make the primary reference to task. */
	qr_task_ref(task);
447
	task->creation_time = kr_now();
448 449
	ctx->worker->stats.concurrent += 1;
	return task;
450 451
}

452 453 454 455 456 457 458 459 460
/* This is called when the task refcount is zero, free memory. */
static void qr_task_free(struct qr_task *task)
{
	struct request_ctx *ctx = task->ctx;

	assert(ctx);

	struct worker_ctx *worker = ctx->worker;

461
	if (ctx->task == NULL) {
462 463 464 465 466 467 468
		request_free(ctx);
	}

	/* Update stats */
	worker->stats.concurrent -= 1;
}

469 470 471
/*@ Register new qr_task within session. */
static int qr_task_register(struct qr_task *task, struct session *session)
{
472
	assert(!session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP);
473 474 475 476 477 478 479 480 481 482 483

	session_tasklist_add(session, task);

	struct request_ctx *ctx = task->ctx;
	assert(ctx && (ctx->source.session == NULL || ctx->source.session == session));
	ctx->source.session = session;
	/* Soft-limit on parallel queries, there is no "slow down" RCODE
	 * that we could use to signalize to client, but we can stop reading,
	 * an in effect shrink TCP window size. To get more precise throttling,
	 * we would need to copy remainder of the unread buffer and reassemble
	 * when resuming reading. This is NYI.  */
484 485 486 487
	if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max &&
	    !session_flags(session)->throttled && !session_flags(session)->closing) {
		session_stop_read(session);
		session_flags(session)->throttled = true;
488 489 490 491 492
	}

	return 0;
}

493
static void qr_task_complete(struct qr_task *task)
494
{
495
	struct request_ctx *ctx = task->ctx;
496

497
	/* Kill pending I/O requests */
498
	ioreq_kill_pending(task);
499 500
	assert(task->waiting.len == 0);
	assert(task->leading == false);
501

502 503
	struct session *s = ctx->source.session;
	if (s) {
504
		assert(!session_flags(s)->outgoing && session_waitinglist_is_empty(s));
505
		ctx->source.session = NULL;
506
		session_tasklist_del(s, task);
Grigorii Demidov's avatar
Grigorii Demidov committed
507
	}
508

509
	/* Release primary reference to task. */
510 511 512 513
	if (ctx->task == task) {
		ctx->task = NULL;
		qr_task_unref(task);
	}
514 515
}

516
/* This is called when we send subrequest / answer */
517
static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
518
{
519

520 521 522 523 524
	if (task->finished) {
		assert(task->leading == false);
		qr_task_complete(task);
	}

525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
	if (!handle || handle->type != UV_TCP) {
		return status;
	}

	struct session* s = handle->data;
	assert(s);
	if (status != 0) {
		session_tasklist_del(s, task);
	}

	if (session_flags(s)->outgoing || session_flags(s)->closing) {
		return status;
	}

	struct worker_ctx *worker = task->ctx->worker;
	if (session_flags(s)->throttled &&
	    session_tasklist_get_len(s) < worker->tcp_pipeline_max/2) {
	   /* Start reading again if the session is throttled and
	    * the number of outgoing requests is below watermark. */
		session_start_read(s);
		session_flags(s)->throttled = false;
546
	}
547

548
	return status;
549 550
}

551 552 553
static void on_send(uv_udp_send_t *req, int status)
{
	struct qr_task *task = req->data;
554 555
	uv_handle_t *h = (uv_handle_t *)req->handle;
	qr_task_on_send(task, h, status);
556
	qr_task_unref(task);
557
	free(req);
558
}
559 560

static void on_write(uv_write_t *req, int status)
561 562
{
	struct qr_task *task = req->data;
563 564
	uv_handle_t *h = (uv_handle_t *)req->handle;
	qr_task_on_send(task, h, status);
565
	qr_task_unref(task);
566
	free(req);
567 568
}

569
static int qr_task_send(struct qr_task *task, struct session *session,
570
			struct sockaddr *addr, knot_pkt_t *pkt)
571
{
572 573
	if (!session) {
		return qr_task_on_send(task, NULL, kr_error(EIO));
574
	}
575

576
	int ret = 0;
577
	struct request_ctx *ctx = task->ctx;
578

579 580
	uv_handle_t *handle = session_get_handle(session);
	assert(handle && handle->data == session);
581 582 583
	const bool is_stream = handle->type == UV_TCP;
	if (!is_stream && handle->type != UV_UDP) abort();

584 585 586 587 588 589 590 591
	if (addr == NULL) {
		addr = session_get_peer(session);
	}

	if (pkt == NULL) {
		pkt = worker_task_get_pktbuf(task);
	}

592
	if (session_flags(session)->outgoing && handle->type == UV_TCP) {
593 594
		size_t try_limit = session_tasklist_get_len(session) + 1;
		uint16_t msg_id = knot_wire_get_id(pkt->wire);
595
		size_t try_count = 0;
596 597 598 599 600 601
		while (session_tasklist_find_msgid(session, msg_id) &&
		       try_count <= try_limit) {
			++msg_id;
			++try_count;
		}
		if (try_count > try_limit) {
602
			return kr_error(ENOENT);
603 604 605 606
		}
		worker_task_pkt_set_msgid(task, msg_id);
	}

607 608 609 610 611
	uv_handle_t *ioreq = malloc(is_stream ? sizeof(uv_write_t) : sizeof(uv_udp_send_t));
	if (!ioreq) {
		return qr_task_on_send(task, handle, kr_error(ENOMEM));
	}

612 613 614
	/* Pending ioreq on current task */
	qr_task_ref(task);

615
	struct worker_ctx *worker = ctx->worker;
616
	/* Send using given protocol */
617 618
	assert(!session_flags(session)->closing);
	if (session_flags(session)->has_tls) {
619 620
		uv_write_t *write_req = (uv_write_t *)ioreq;
		write_req->data = task;
621
		ret = tls_write(write_req, handle, pkt, &on_write);
622
	} else if (handle->type == UV_UDP) {
623
		uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
624
		uv_buf_t buf = { (char *)pkt->wire, pkt->size };
625 626 627 628
		send_req->data = task;
		ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
	} else if (handle->type == UV_TCP) {
		uv_write_t *write_req = (uv_write_t *)ioreq;
629 630 631 632 633
		uint16_t pkt_size = htons(pkt->size);
		uv_buf_t buf[2] = {
			{ (char *)&pkt_size, sizeof(pkt_size) },
			{ (char *)pkt->wire, pkt->size }
		};
634
		write_req->data = task;
635
		ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_write);
636 637
	} else {
		assert(false);
638
	}
639

640
	if (ret == 0) {
641
		session_touch(session);
642 643 644
		if (session_flags(session)->outgoing) {
			session_tasklist_add(session, task);
		}
645 646
		if (worker->too_many_open &&
		    worker->stats.rconcurrent <
647
			worker->rconcurrent_highwatermark - 10) {
648 649
			worker->too_many_open = false;
		}
650
	} else {
651
		free(ioreq);
652
		qr_task_unref(task);
653 654 655
		if (ret == UV_EMFILE) {
			worker->too_many_open = true;
			worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
656
			ret = kr_error(UV_EMFILE);
657
		}
658
	}
659

660
	/* Update statistics */
661 662
	if (session_flags(session)->outgoing && addr) {
		if (session_flags(session)->has_tls)
663 664
			worker->stats.tls += 1;
		else if (handle->type == UV_UDP)
665
			worker->stats.udp += 1;
666
		else
667
			worker->stats.tcp += 1;
668

669
		if (addr->sa_family == AF_INET6)
670
			worker->stats.ipv6 += 1;
671
		else if (addr->sa_family == AF_INET)
672
			worker->stats.ipv4 += 1;
Marek Vavruša's avatar
Marek Vavruša committed
673
	}
674
	return ret;
675 676
}

677 678
static int session_tls_hs_cb(struct session *session, int status)
{
679
	assert(session_flags(session)->outgoing);
680 681 682 683 684
	uv_handle_t *handle = session_get_handle(session);
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
	struct sockaddr *peer = session_get_peer(session);
	int deletion_res = worker_del_tcp_waiting(worker, peer);
685
	int ret = kr_ok();
686

687
	if (status) {
688
		kr_nsrep_update_rtt(NULL, peer, KR_NS_DEAD,
689 690
				    worker->engine->resolver.cache_rtt,
				    KR_NS_UPDATE_NORESET);
691 692 693 694
		return ret;
	}

	/* handshake was completed successfully */
695
	struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session);
696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713
	struct tls_client_paramlist_entry *tls_params = tls_client_ctx->params;
	gnutls_session_t tls_session = tls_client_ctx->c.tls_session;
	if (gnutls_session_is_resumed(tls_session) != 0) {
		kr_log_verbose("[tls_client] TLS session has resumed\n");
	} else {
		kr_log_verbose("[tls_client] TLS session has not resumed\n");
		/* session wasn't resumed, delete old session data ... */
		if (tls_params->session_data.data != NULL) {
			gnutls_free(tls_params->session_data.data);
			tls_params->session_data.data = NULL;
			tls_params->session_data.size = 0;
		}
		/* ... and get the new session data */
		gnutls_datum_t tls_session_data = { NULL, 0 };
		ret = gnutls_session_get_data2(tls_session, &tls_session_data);
		if (ret == 0) {
			tls_params->session_data = tls_session_data;
		}
714
	}
715

716 717 718 719 720 721 722 723 724 725 726 727 728
	ret = kr_ok();
	if (deletion_res == kr_ok()) {
		/* peer was in the waiting list, add to the connected list. */
		ret = worker_add_tcp_connected(worker, peer, session);
	} else {
		/* peer wasn't in the waiting list.
		 * In this case it must be successful rehandshake.
		 * Peer must be already in the connected list. */
		const char *key = tcpsess_key(peer);
		assert(key);
		assert(map_contains(&worker->tcp_connected, key) != 0);
	}
	if (ret == kr_ok()) {
729 730 731 732 733 734 735 736
		while (!session_waitinglist_is_empty(session)) {
			struct qr_task *t = session_waitinglist_get(session);
			ret = qr_task_send(t, session, NULL, NULL);
			if (ret != 0) {
				break;
			}
			session_waitinglist_pop(session, true);
		}
737 738 739 740 741 742 743 744 745
	} else {
		ret = kr_error(EINVAL);
	}

	if (ret != kr_ok()) {
		/* Something went wrong.
		 * Session isn't in the list of waiting sessions,
		 * or addition to the list of connected sessions failed,
		 * or write to upstream failed. */
746
		worker_del_tcp_connected(worker, peer);
747
		session_waitinglist_finalize(session, KR_STATE_FAIL);
748
		assert(session_tasklist_is_empty(session));
749
		session_close(session);
750
	} else {
751
		session_timer_stop(session);
752
		session_timer_start(session, tcp_timeout_trigger,
753
				    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
754 755 756 757
	}
	return kr_ok();
}

758

759 760 761
static struct kr_query *task_get_last_pending_query(struct qr_task *task)
{
	if (!task || task->ctx->req.rplan.pending.len == 0) {
762 763 764 765 766 767
		return NULL;
	}

	return array_tail(task->ctx->req.rplan.pending);
}

768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783
static int send_waiting(struct session *session)
{
	int ret = 0;
	while (!session_waitinglist_is_empty(session)) {
		struct qr_task *t = session_waitinglist_get(session);
		ret = qr_task_send(t, session, NULL, NULL);
		if (ret != 0) {
			session_waitinglist_finalize(session, KR_STATE_FAIL);
			session_tasklist_finalize(session, KR_STATE_FAIL);
			session_close(session);
			break;
		}
		session_waitinglist_pop(session, true);
	}
	return ret;
}
784

785
static void on_connect(uv_connect_t *req, int status)
786
{
787
	struct worker_ctx *worker = get_worker();
788
	uv_stream_t *handle = req->handle;
789
	struct session *session = handle->data;
790
	struct sockaddr *peer = session_get_peer(session);
791
	free(req);
792

793
	assert(session_flags(session)->outgoing);
794 795

	if (status == UV_ECANCELED) {
796
		worker_del_tcp_waiting(worker, peer);
797
		assert(session_is_empty(session) && session_flags(session)->closing);
798 799 800
		return;
	}

801
	if (session_flags(session)->closing) {
802 803
		worker_del_tcp_waiting(worker, peer);
		assert(session_is_empty(session));
804 805 806 807
		return;
	}

	if (status != 0) {
808 809
		worker_del_tcp_waiting(worker, peer);
		assert(session_tasklist_is_empty(session));
810
		session_waitinglist_retry(session, false);
811 812 813 814
		session_close(session);
		return;
	}

815
	if (!session_flags(session)->has_tls) {
816 817
		/* if there is a TLS, session still waiting for handshake,
		 * otherwise remove it from waiting list */
818
		if (worker_del_tcp_waiting(worker, peer) != 0) {
819 820
			/* session isn't in list of waiting queries, *
			 * something gone wrong */
821 822
			session_waitinglist_finalize(session, KR_STATE_FAIL);
			assert(session_tasklist_is_empty(session));
823 824 825 826 827
			session_close(session);
			return;
		}
	}

828
	struct qr_task *task = session_waitinglist_get(session);
829
	struct kr_query *qry = task_get_last_pending_query(task);
830
	WITH_VERBOSE (qry) {
831 832 833 834
		struct sockaddr *peer = session_get_peer(session);
		char peer_str[INET6_ADDRSTRLEN];
		inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
		VERBOSE_MSG(qry, "=> connected to '%s'\n", peer_str);
835 836
	}

837
	session_flags(session)->connected = true;
838
	session_start_read(session);
839 840

	int ret = kr_ok();
841
	if (session_flags(session)->has_tls) {
842 843
		struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
		ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
844
		if (ret == kr_error(EAGAIN)) {
845 846
			session_timer_stop(session);
			session_timer_start(session, tcp_timeout_trigger,
847
					    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
848 849
			return;
		}
850 851
	} else {
		worker_add_tcp_connected(worker, peer, session);
852
	}
853 854 855 856 857

	ret = send_waiting(session);
	if (ret != 0) {
		worker_del_tcp_connected(worker, peer);
		return;
858
	}
859

860 861
	session_timer_stop(session);
	session_timer_start(session, tcp_timeout_trigger,
862
			    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
863 864
}

865
static void on_tcp_connect_timeout(uv_timer_t *timer)
866
{
867 868 869
	struct session *session = timer->data;

	uv_timer_stop(timer);
870
	struct worker_ctx *worker = get_worker();
871

872
	assert (session_tasklist_is_empty(session));
873

874 875
	struct sockaddr *peer = session_get_peer(session);
	worker_del_tcp_waiting(worker, peer);
876

877
	struct qr_task *task = session_waitinglist_get(session);
878
	struct kr_query *qry = task_get_last_pending_query(task);
879
	WITH_VERBOSE (qry) {
880 881 882
		char peer_str[INET6_ADDRSTRLEN];
		inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
		VERBOSE_MSG(qry, "=> connection to '%s' failed\n", peer_str);
883
	}
884

885
	kr_nsrep_update_rtt(NULL, peer, KR_NS_DEAD,
886 887
			    worker->engine->resolver.cache_rtt,
			    KR_NS_UPDATE_NORESET);
Grigorii Demidov's avatar
Grigorii Demidov committed
888

889 890 891
	worker->stats.timeout += session_waitinglist_get_len(session);
	session_waitinglist_retry(session, true);
	assert (session_tasklist_is_empty(session));
892
	session_close(session);
893 894 895
}

/* This is called when I/O timeouts */
896
static void on_udp_timeout(uv_timer_t *timer)
897
{
898
	struct session *session = timer->data;
899 900 901
	assert(session_get_handle(session)->data == session);
	assert(session_tasklist_get_len(session) == 1);
	assert(session_waitinglist_is_empty(session));
902 903

	uv_timer_stop(timer);
904 905

	/* Penalize all tried nameservers with a timeout. */
906
	struct qr_task *task = session_tasklist_get_first(session);
907
	struct worker_ctx *worker = task->ctx->worker;
908
	if (task->leading && task->pending_count > 0) {
909
		struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
910 911 912
		struct sockaddr_in6 *addrlist = (struct sockaddr_in6 *)task->addrlist;
		for (uint16_t i = 0; i < MIN(task->pending_count, task->addrlist_count); ++i) {
			struct sockaddr *choice = (struct sockaddr *)(&addrlist[i]);
913
			WITH_VERBOSE(qry) {
914 915
				char addr_str[INET6_ADDRSTRLEN];
				inet_ntop(choice->sa_family, kr_inaddr(choice), addr_str, sizeof(addr_str));
916
				VERBOSE_MSG(qry, "=> server: '%s' flagged as 'bad'\n", addr_str);
917
			}
918
			kr_nsrep_update_rtt(&qry->ns, choice, KR_NS_DEAD,
919 920
					    worker->engine->resolver.cache_rtt,
					    KR_NS_UPDATE_NORESET);
921 922 923 924 925
		}
	}
	task->timeouts += 1;
	worker->stats.timeout += 1;
	qr_task_step(task, NULL, NULL);
926 927
}

928
static uv_handle_t *retransmit(struct qr_task *task)
929
{
930
	uv_handle_t *ret = NULL;
931
	if (task && task->addrlist && task->addrlist_count > 0) {
932
		struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
Grigorii Demidov's avatar
Grigorii Demidov committed
933 934 935
		if (!choice) {
			return ret;
		}
936 937 938
		if (task->pending_count >= MAX_PENDING) {
			return ret;
		}
939 940 941 942 943 944
		/* Checkout answer before sending it */
		struct request_ctx *ctx = task->ctx;
		if (kr_resolve_checkout(&ctx->req, NULL, (struct sockaddr *)choice, SOCK_DGRAM, task->pktbuf) != 0) {
			return ret;
		}
		ret = ioreq_spawn(ctx->worker, SOCK_DGRAM, choice->sin6_family);
945 946 947 948 949
		if (!ret) {
			return ret;
		}
		struct sockaddr *addr = (struct sockaddr *)choice;
		struct session *session = ret->data;
950
		struct sockaddr *peer = session_get_peer(session);
951
		assert (peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
952
		memcpy(peer, addr, kr_sockaddr_len(addr));
953 954 955 956 957
		if (qr_task_send(task, session, (struct sockaddr *)choice,
				 task->pktbuf) != 0) {
			session_close(session);
			ret = NULL;
		} else {
958
			task->pending[task->pending_count] = session;
959
			task->pending_count += 1;
960 961
			task->addrlist_turn = (task->addrlist_turn + 1) %
					      task->addrlist_count; /* Round robin */
962
			session_start_read(session); /* Start reading answer */
963 964
		}
	}
965
	return ret;
966 967 968 969
}

static void on_retransmit(uv_timer_t *req)
{
970
	struct session *session = req->data;
971
	assert(session_tasklist_get_len(session) == 1);
972 973

	uv_timer_stop(req);
974
	struct qr_task *task = session_tasklist_get_first(session);
975
	if (retransmit(task) == NULL) {
976 977
		/* Not possible to spawn request, start timeout timer with remaining deadline. */
		uint64_t timeout = KR_CONN_RTT_MAX - task->pending_count * KR_CONN_RETRY;
978
		uv_timer_start(req, on_udp_timeout, timeout, 0);
979 980
	} else {
		uv_timer_start(req, on_retransmit, KR_CONN_RETRY, 0);
981
	}
982 983
}

984
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
985
{
986 987 988
	if (!task || task->finished) {
		return;
	}
989
	/* Close pending timer */
990
	ioreq_kill_pending(task);
991
	/* Clear from outgoing table. */
992 993
	if (!task->leading)
		return;
994 995 996 997 998
	char key[SUBREQ_KEY_LEN];
	const int klen = subreq_key(key, task->pktbuf);
	if (klen > 0) {
		void *val_deleted;
		int ret = trie_del(task->ctx->worker->subreq_out, key, klen, &val_deleted);
999
		assert(ret == KNOT_EOK && val_deleted == task); (void)ret;
1000 1001
	}
	/* Notify waiting tasks. */
1002
	struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);