diff --git a/daemon/README.rst b/daemon/README.rst index 794e7f496c2197d754c7d297e17bd48158d32aef..5b9a812ade6dc892b81c535b24b444cdb7e34961 100644 --- a/daemon/README.rst +++ b/daemon/README.rst @@ -520,6 +520,18 @@ For when listening on ``localhost`` just doesn't cut it. > net.bufsize() 4096 +.. function:: net.tcp_pipeline([len]) + + Get/set per-client TCP pipeline limit (number of outstanding queries that a single client connection can make in parallel). Default is 50. + + Example output: + + .. code-block:: lua + + > net.tcp_pipeline() + 50 + > net.tcp_pipeline(100) + Trust anchors and DNSSEC ^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/daemon/bindings.c b/daemon/bindings.c index 3b07a0a392ea2831039ac5cadbe2bb628598bfa3..489d48bd379ebd3efd140a085d8d146df5ee2b2f 100644 --- a/daemon/bindings.c +++ b/daemon/bindings.c @@ -48,6 +48,13 @@ static int format_error(lua_State* L, const char *err) return 1; } +static inline struct worker_ctx *wrk_luaget(lua_State *L) { + lua_getglobal(L, "__worker"); + struct worker_ctx *worker = lua_touserdata(L, -1); + lua_pop(L, 1); + return worker; +} + /** List loaded modules */ static int mod_list(lua_State *L) { @@ -302,14 +309,36 @@ static int net_bufsize(lua_State *L) return 0; } +/** Set TCP pipelining size. */ +static int net_pipeline(lua_State *L) +{ + struct worker_ctx *worker = wrk_luaget(L); + if (!worker) { + return 0; + } + if (!lua_isnumber(L, 1)) { + lua_pushnumber(L, worker->tcp_pipeline_max); + return 1; + } + int len = lua_tointeger(L, 1); + if (len < 0 || len > 4096) { + format_error(L, "tcp_pipeline must be within <0, 4096>"); + lua_error(L); + } + worker->tcp_pipeline_max = len; + lua_pushnumber(L, len); + return 1; +} + int lib_net(lua_State *L) { static const luaL_Reg lib[] = { - { "list", net_list }, - { "listen", net_listen }, - { "close", net_close }, - { "interfaces", net_interfaces }, - { "bufsize", net_bufsize }, + { "list", net_list }, + { "listen", net_listen }, + { "close", net_close }, + { "interfaces", net_interfaces }, + { "bufsize", net_bufsize }, + { "tcp_pipeline", net_pipeline }, { NULL, NULL } }; register_lib(L, "net", lib); @@ -599,13 +628,6 @@ int lib_event(lua_State *L) return 1; } -static inline struct worker_ctx *wrk_luaget(lua_State *L) { - lua_getglobal(L, "__worker"); - struct worker_ctx *worker = lua_touserdata(L, -1); - lua_pop(L, 1); - return worker; -} - /* @internal Call the Lua callback stored in baton. */ static void resolve_callback(struct worker_ctx *worker, struct kr_request *req, void *baton) { diff --git a/daemon/engine.h b/daemon/engine.h index 1eddda6330c7e124ff625da03a982b45c7d463c6..fde0c2aa93693d1c148c8b511576e963dc546079 100644 --- a/daemon/engine.h +++ b/daemon/engine.h @@ -32,6 +32,9 @@ #ifndef QUERY_RATE_THRESHOLD #define QUERY_RATE_THRESHOLD (2 * MP_FREELIST_SIZE) /**< Nr of parallel queries considered as high rate */ #endif +#ifndef MAX_PIPELINED +#define MAX_PIPELINED 100 +#endif /* * @internal These are forward decls to allow building modules with engine but without Lua. diff --git a/daemon/io.c b/daemon/io.c index 373ac9fdac488166dffc30e20b07f2563d159847..439d2a947afdec76291a2e60d9ad50abecd74c56 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -18,6 +18,7 @@ #include <libknot/errcode.h> #include <contrib/ucw/lib.h> #include <contrib/ucw/mempool.h> +#include <assert.h> #include "daemon/io.h" #include "daemon/network.h" @@ -44,14 +45,56 @@ static void check_bufsize(uv_handle_t* handle) #undef negotiate_bufsize -static void *handle_alloc(uv_loop_t *loop, size_t size) +static void session_clear(struct session *s) { - return malloc(size); + assert(s->is_subreq || s->tasks.len == 0); + array_clear(s->tasks); + memset(s, 0, sizeof(*s)); } -static void handle_free(uv_handle_t *handle) +void session_free(struct session *s) { - free(handle); + session_clear(s); + free(s); +} + +struct session *session_new(void) +{ + return calloc(1, sizeof(struct session)); +} + +static struct session *session_borrow(struct worker_ctx *worker) +{ + struct session *s = NULL; + if (worker->pool_sessions.len > 0) { + s = array_tail(worker->pool_sessions); + array_pop(worker->pool_sessions); + kr_asan_unpoison(s, sizeof(*s)); + } else { + s = session_new(); + } + return s; +} + +static void session_release(struct worker_ctx *worker, struct session *s) +{ + if (worker->pool_sessions.len < MP_FREELIST_SIZE) { + session_clear(s); + array_push(worker->pool_sessions, s); + kr_asan_poison(s, sizeof(*s)); + } else { + session_free(s); + } +} + +static uv_stream_t *handle_alloc(uv_loop_t *loop) +{ + uv_stream_t *handle = calloc(1, sizeof(*handle)); + if (!handle) { + return NULL; + } + + return handle; } static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) @@ -61,14 +104,20 @@ static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* * guaranteed to be unchanged only for the duration of * udp_read() and tcp_read(). */ + struct session *session = handle->data; uv_loop_t *loop = handle->loop; struct worker_ctx *worker = loop->data; buf->base = (char *)worker->wire_buf; - /* Use recvmmsg() on master sockets if possible. */ - if (handle->data) + /* Limit TCP stream buffer size to 4K for granularity in batches of incoming queries. */ + if (handle->type == UV_TCP) { + buf->len = MIN(suggested_size, 4096); + /* Regular buffer size for subrequests. */ + } else if (session->is_subreq) { buf->len = suggested_size; - else + /* Use recvmmsg() on master sockets if possible. */ + } else { buf->len = sizeof(worker->wire_buf); + } } void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, @@ -78,7 +127,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, struct worker_ctx *worker = loop->data; if (nread <= 0) { if (nread < 0) { /* Error response, notify resolver */ - worker_exec(worker, (uv_handle_t *)handle, NULL, addr); + worker_submit(worker, (uv_handle_t *)handle, NULL, addr); } /* nread == 0 is for freeing buffers, we don't need to do this */ return; } @@ -86,7 +135,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, knot_pkt_t *query = knot_pkt_new(buf->base, nread, &worker->pkt_pool); if (query) { query->max_size = KNOT_WIRE_MAX_PKTSIZE; - worker_exec(worker, (uv_handle_t *)handle, query, addr); + worker_submit(worker, (uv_handle_t *)handle, query, addr); } mp_flush(worker->pkt_pool.ctx); } @@ -101,35 +150,53 @@ int udp_bind(uv_udp_t *handle, struct sockaddr *addr) if (ret != 0) { return ret; } - handle->data = NULL; check_bufsize((uv_handle_t *)handle); + /* Handle is already created, just create context. */ + handle->data = session_new(); + assert(handle->data); return io_start_read((uv_handle_t *)handle); } +static void tcp_timeout(uv_handle_t *timer) +{ + uv_handle_t *handle = timer->data; + uv_close(handle, io_free); +} + +static void tcp_timeout_trigger(uv_timer_t *timer) +{ + uv_handle_t *handle = timer->data; + struct session *session = handle->data; + if (session->tasks.len > 0) { + uv_timer_again(timer); + } else { + uv_close((uv_handle_t *)timer, tcp_timeout); + } +} + static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { uv_loop_t *loop = handle->loop; + struct session *s = handle->data; struct worker_ctx *worker = loop->data; - - /* Check for originator connection close. */ - if (nread <= 0) { - if (handle->data) { - worker_exec(worker, (uv_handle_t *)handle, NULL, NULL); - } - if (!uv_is_closing((uv_handle_t *)handle)) { - uv_close((uv_handle_t *)handle, handle_free); - } - return; - } - + /* TCP pipelining is rather complicated and requires cooperation from the worker + * so the whole message reassembly and demuxing logic is inside worker */ int ret = worker_process_tcp(worker, (uv_handle_t *)handle, (const uint8_t *)buf->base, nread); - if (ret == 0) { - /* Push - pull, stop reading from this handle until - * the task is finished. Since the handle has no track of the - * pending tasks, it might be freed before the task finishes - * leading various errors. */ - uv_unref((uv_handle_t *)handle); - io_stop_read((uv_handle_t *)handle); + if (ret < 0) { + worker_end_tcp(worker, (uv_handle_t *)handle); + /* Exceeded per-connection quota for outstanding requests + * stop reading from stream and close after last message is processed. */ + if (!s->is_subreq && !uv_is_closing((uv_handle_t *)&s->timeout)) { + uv_timer_stop(&s->timeout); + if (s->tasks.len == 0) { + uv_close((uv_handle_t *)&s->timeout, tcp_timeout); + } else { /* If there are tasks running, defer until they finish. */ + uv_timer_start(&s->timeout, tcp_timeout_trigger, 1, KR_CONN_RTT_MAX/2); + } + } + /* Connection spawned more than one request, reset its deadline for next query. */ + } else if (ret > 0 && !s->is_subreq) { + uv_timer_again(&s->timeout); } mp_flush(worker->pkt_pool.ctx); } @@ -139,41 +206,79 @@ static void tcp_accept(uv_stream_t *master, int status) if (status != 0) { return; } - - uv_stream_t *client = handle_alloc(master->loop, sizeof(*client)); + uv_stream_t *client = handle_alloc(master->loop); if (!client) { return; } memset(client, 0, sizeof(*client)); io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM); if (uv_accept(master, client) != 0) { - handle_free((uv_handle_t *)client); + io_free((uv_handle_t *)client); return; } + /* Set deadlines for TCP connection and start reading. + * It will re-check every half of a request time limit if the connection + * is idle and should be terminated, this is an educated guess. */ + struct session *session = client->data; + uv_timer_t *timer = &session->timeout; + uv_timer_init(master->loop, timer); + timer->data = client; + uv_timer_start(timer, tcp_timeout_trigger, KR_CONN_RTT_MAX/2, KR_CONN_RTT_MAX/2); io_start_read((uv_handle_t *)client); } -int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr) +static int set_tcp_option(uv_tcp_t *handle, int option, int val) { - unsigned flags = UV_UDP_REUSEADDR; + uv_os_fd_t fd = 0; + if (uv_fileno((uv_handle_t *)handle, &fd) == 0) { + return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val)); + } + return 0; /* N/A */ +} + +static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb connection) +{ + unsigned flags = 0; if (addr->sa_family == AF_INET6) { - flags |= UV_UDP_IPV6ONLY; + flags |= UV_TCP_IPV6ONLY; } + int ret = uv_tcp_bind(handle, addr, flags); if (ret != 0) { return ret; } - ret = uv_listen((uv_stream_t *)handle, 16, tcp_accept); + /* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */ +#ifdef TCP_DEFER_ACCEPT + if (set_tcp_option(handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) { + kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno)); + } +#endif + + ret = uv_listen((uv_stream_t *)handle, 16, connection); if (ret != 0) { return ret; } + /* TCP_FASTOPEN enables 1 RTT connection resumptions. */ +#ifdef TCP_FASTOPEN +# ifdef __linux__ + (void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */ +# else + (void) set_tcp_option(handle, TCP_FASTOPEN, 1); /* Accepts on/off */ +# endif +#endif + handle->data = NULL; return 0; } +int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr) +{ + return _tcp_bind(handle, addr, tcp_accept); +} + void io_create(uv_loop_t *loop, uv_handle_t *handle, int type) { if (type == SOCK_DGRAM) { @@ -182,6 +287,34 @@ void io_create(uv_loop_t *loop, uv_handle_t *handle, int type) uv_tcp_init(loop, (uv_tcp_t *)handle); uv_tcp_nodelay((uv_tcp_t *)handle, 1); } + + struct worker_ctx *worker = loop->data; + handle->data = session_borrow(worker); + assert(handle->data); +} + +void io_deinit(uv_handle_t *handle) +{ + if (!handle) { + return; + } + uv_loop_t *loop = handle->loop; + if (loop && loop->data) { + struct worker_ctx *worker = loop->data; + session_release(worker, handle->data); + } else { + session_free(handle->data); + } + handle->data = NULL; +} + +void io_free(uv_handle_t *handle) +{ + if (!handle) { + return; + } + io_deinit(handle); + free(handle); } int io_start_read(uv_handle_t *handle) diff --git a/daemon/io.h b/daemon/io.h index b47805a6035a16b6721cef4c6975158d6a0b7fa6..5cdd2de761aa1acdab8480fd68202919614d038c 100644 --- a/daemon/io.h +++ b/daemon/io.h @@ -18,9 +18,30 @@ #include <uv.h> #include <libknot/packet/pkt.h> +#include "lib/generic/array.h" + +struct qr_task; + +/* Per-session (TCP or UDP) persistent structure, + * that exists between remote counterpart and a local socket. + */ +struct session { + bool is_subreq; + bool throttled; + uv_timer_t timeout; + struct qr_task *buffering; + array_t(struct qr_task *) tasks; +}; + +void session_free(struct session *s); +struct session *session_new(void); int udp_bind(uv_udp_t *handle, struct sockaddr *addr); int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr); + void io_create(uv_loop_t *loop, uv_handle_t *handle, int type); +void io_deinit(uv_handle_t *handle); +void io_free(uv_handle_t *handle); + int io_start_read(uv_handle_t *handle); -int io_stop_read(uv_handle_t *handle); \ No newline at end of file +int io_stop_read(uv_handle_t *handle); diff --git a/daemon/network.c b/daemon/network.c index 334edcf55fe585593f4f6d94120963ef37abcc59..26bd72fd8235f988b83e7f1a06c5d68ad31b2273 100644 --- a/daemon/network.c +++ b/daemon/network.c @@ -21,7 +21,8 @@ #include "daemon/io.h" /* libuv 1.7.0+ is able to support SO_REUSEPORT for loadbalancing */ -#if (defined(ENABLE_REUSEPORT) || defined(UV_VERSION_HEX)) && (__linux__ && SO_REUSEPORT) +#if defined(UV_VERSION_HEX) +#if (__linux__ && SO_REUSEPORT) #define handle_init(type, loop, handle, family) do { \ uv_ ## type ## _init_ex((loop), (handle), (family)); \ uv_os_fd_t fd = 0; \ @@ -30,6 +31,12 @@ setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)); \ } \ } while (0) +/* libuv 1.7.0+ is able to assign fd immediately */ +#else + #define handle_init(type, loop, handle, family) do { \ + uv_ ## type ## _init_ex((loop), (handle), (family)); \ + } while (0) +#endif #else #define handle_init(type, loop, handle, family) \ uv_ ## type ## _init((loop), (handle)) diff --git a/daemon/network.h b/daemon/network.h index 77db270dedac444b1c514bdefe95b39e8b2e3299..622927a6256085db068a5b59e8e6ac08c385133f 100644 --- a/daemon/network.h +++ b/daemon/network.h @@ -24,7 +24,7 @@ enum endpoint_flag { NET_DOWN = 0 << 0, NET_UDP = 1 << 0, - NET_TCP = 1 << 1 + NET_TCP = 1 << 1, }; struct endpoint { diff --git a/daemon/worker.c b/daemon/worker.c index 5bdd7c2a6eae84174b510458d3fb42d3f35dcfee..7ed015a96db24f43a52b0da1e2c2d20187e4c1da 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -31,21 +31,22 @@ #include "daemon/engine.h" #include "daemon/io.h" -/* @internal IO request entry. */ -struct ioreq +/* @internal Union of various libuv objects for freelist. */ +struct req { union { + /* Socket handles, these have session as their `handle->data` and own it. */ uv_udp_t udp; uv_tcp_t tcp; + /* I/O events, these have only a reference to the task they're operating on. */ uv_udp_send_t send; uv_write_t write; uv_connect_t connect; + /* Timer events */ + uv_timer_t timer; } as; }; -/** @internal Number of request within timeout window. */ -#define MAX_PENDING (KR_NSREP_MAXADDR + (KR_NSREP_MAXADDR / 2)) - /** @internal Debugging facility. */ #ifdef DEBUG #define DEBUG_MSG(fmt...) printf("[daem] " fmt) @@ -53,35 +54,6 @@ struct ioreq #define DEBUG_MSG(fmt...) #endif -/** @internal Query resolution task. */ -struct qr_task -{ - struct kr_request req; - struct worker_ctx *worker; - knot_pkt_t *pktbuf; - array_t(struct qr_task *) waiting; - uv_handle_t *pending[MAX_PENDING]; - uint16_t pending_count; - uint16_t addrlist_count; - uint16_t addrlist_turn; - struct sockaddr *addrlist; - uv_timer_t retry, timeout; - worker_cb_t on_complete; - void *baton; - struct { - union { - struct sockaddr_in ip4; - struct sockaddr_in6 ip6; - } addr; - uv_handle_t *handle; - } source; - uint16_t iter_count; - uint16_t refs; - uint16_t bytes_remaining; - bool finished; - bool leading; -}; - /* Convenience macros */ #define qr_task_ref(task) \ do { ++(task)->refs; } while(0) @@ -91,6 +63,7 @@ struct qr_task (!uv_is_closing((checked)) || (task)->source.handle == (checked)) /* Forward decls */ +static void qr_task_free(struct qr_task *task); static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet); /** @internal Get singleton worker. */ @@ -99,58 +72,72 @@ static inline struct worker_ctx *get_worker(void) return uv_default_loop()->data; } -static inline struct ioreq *ioreq_take(struct worker_ctx *worker) +static inline struct req *req_borrow(struct worker_ctx *worker) { - struct ioreq *req = NULL; - if (worker->ioreqs.len > 0) { - req = array_tail(worker->ioreqs); - array_pop(worker->ioreqs); + struct req *req = NULL; + if (worker->pool_ioreq.len > 0) { + req = array_tail(worker->pool_ioreq); + array_pop(worker->pool_ioreq); + kr_asan_unpoison(req, sizeof(*req)); } else { req = malloc(sizeof(*req)); } - kr_asan_unpoison(req, sizeof(*req)); return req; } -static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req) +static inline void req_release(struct worker_ctx *worker, struct req *req) { - kr_asan_poison(req, sizeof(*req)); - if (!req || worker->ioreqs.len < 4 * MP_FREELIST_SIZE) { - array_push(worker->ioreqs, req); + if (!req || worker->pool_ioreq.len < 4 * MP_FREELIST_SIZE) { + array_push(worker->pool_ioreq, req); + kr_asan_poison(req, sizeof(*req)); } else { free(req); } } +/*! @internal Create a UDP/TCP handle */ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype) { if (task->pending_count >= MAX_PENDING) { return NULL; } /* Create connection for iterative query */ - uv_handle_t *req = (uv_handle_t *)ioreq_take(task->worker); - if (!req) { + uv_handle_t *handle = (uv_handle_t *)req_borrow(task->worker); + if (!handle) { + return NULL; + } + io_create(task->worker->loop, handle, socktype); + /* Set current handle as a subrequest type. */ + struct session *session = handle->data; + session->is_subreq = true; + int ret = array_push(session->tasks, task); + if (ret != 0) { + io_deinit(handle); + req_release(task->worker, (struct req *)handle); return NULL; } - io_create(task->worker->loop, req, socktype); - req->data = task; + qr_task_ref(task); /* Connect or issue query datagram */ - task->pending[task->pending_count] = req; + task->pending[task->pending_count] = handle; task->pending_count += 1; - return req; + return handle; } static void ioreq_on_close(uv_handle_t *handle) { struct worker_ctx *worker = get_worker(); - ioreq_release(worker, (struct ioreq *)handle); + /* Handle-type events own a session, must close it. */ + struct session *session = handle->data; + struct qr_task *task = session->tasks.at[0]; + io_deinit(handle); + qr_task_unref(task); + req_release(worker, (struct req *)handle); } static void ioreq_kill(uv_handle_t *req) { assert(req); if (!uv_is_closing(req)) { - io_stop_read(req); uv_close(req, ioreq_on_close); } } @@ -187,51 +174,44 @@ static void mp_poison(struct mempool *mp, bool poison) #endif /** @endcond */ -static inline struct mempool *pool_take(struct worker_ctx *worker) +static inline struct mempool *pool_borrow(struct worker_ctx *worker) { /* Recycle available mempool if possible */ struct mempool *mp = NULL; - if (worker->pools.len > 0) { - mp = array_tail(worker->pools); - array_pop(worker->pools); + if (worker->pool_mp.len > 0) { + mp = array_tail(worker->pool_mp); + array_pop(worker->pool_mp); + mp_poison(mp, 0); } else { /* No mempool on the freelist, create new one */ mp = mp_new (4 * CPU_PAGE_SIZE); } - mp_poison(mp, 0); return mp; } static inline void pool_release(struct worker_ctx *worker, struct mempool *mp) { /* Return mempool to ring or free it if it's full */ - if (worker->pools.len < MP_FREELIST_SIZE) { + if (worker->pool_mp.len < MP_FREELIST_SIZE) { mp_flush(mp); - array_push(worker->pools, mp); + array_push(worker->pool_mp, mp); mp_poison(mp, 1); } else { mp_delete(mp); } } -static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr *addr) +static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr) { /* How much can client handle? */ struct engine *engine = worker->engine; - size_t answer_max = KNOT_WIRE_MIN_PKTSIZE; size_t pktbuf_max = KR_EDNS_PAYLOAD; if (engine->resolver.opt_rr) { pktbuf_max = MAX(knot_edns_get_payload(engine->resolver.opt_rr), pktbuf_max); } - if (!addr && handle) { /* TCP */ - answer_max = KNOT_WIRE_MAX_PKTSIZE; - pktbuf_max = KNOT_WIRE_MAX_PKTSIZE; - } else if (knot_pkt_has_edns(query)) { /* EDNS */ - answer_max = MAX(knot_edns_get_payload(query->opt_rr), KNOT_WIRE_MIN_PKTSIZE); - } /* Recycle available mempool if possible */ knot_mm_t pool = { - .ctx = pool_take(worker), + .ctx = pool_borrow(worker), .alloc = (knot_mm_alloc_t) mp_alloc }; @@ -244,27 +224,26 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha /* Create packet buffers for answer and subrequests */ task->req.pool = pool; knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &task->req.pool); - knot_pkt_t *answer = knot_pkt_new(NULL, answer_max, &task->req.pool); - if (!pktbuf || !answer) { + if (!pktbuf) { mp_delete(pool.ctx); return NULL; } - task->req.answer = answer; + pktbuf->size = 0; + task->req.answer = NULL; task->pktbuf = pktbuf; array_init(task->waiting); task->addrlist = NULL; task->pending_count = 0; task->bytes_remaining = 0; task->iter_count = 0; + task->timeouts = 0; task->refs = 1; task->finished = false; task->leading = false; task->worker = worker; + task->session = NULL; task->source.handle = handle; - uv_timer_init(worker->loop, &task->retry); - uv_timer_init(worker->loop, &task->timeout); - task->retry.data = task; - task->timeout.data = task; + task->timeout = NULL; task->on_complete = NULL; task->req.qsource.key = NULL; task->req.qsource.addr = NULL; @@ -278,27 +257,36 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha } else { task->source.addr.ip4.sin_family = AF_UNSPEC; } - /* Remember query source TSIG key */ - if (query->tsig_rr) { - task->req.qsource.key = knot_rrset_copy(query->tsig_rr, &task->req.pool); - } - - /* Start resolution */ - kr_resolve_begin(&task->req, &engine->resolver, answer); worker->stats.concurrent += 1; - worker->stats.queries += 1; - /* Throttle outbound queries only when high pressure */ - if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) { - task->req.options |= QUERY_NO_THROTTLE; - } return task; } /* This is called when the task refcount is zero, free memory. */ static void qr_task_free(struct qr_task *task) { - /* Return mempool to ring or free it if it's full */ + struct session *session = task->session; + if (session) { + /* Walk the session task list and remove itself. */ + for (size_t i = 0; i < session->tasks.len; ++i) { + if (session->tasks.at[i] == task) { + array_del(session->tasks, i); + break; + } + } + /* Start reading again if the session is throttled and + * the number of outstanding requests is below watermark. */ + uv_handle_t *handle = task->source.handle; + if (handle && session->tasks.len < task->worker->tcp_pipeline_max/2) { + if (!uv_is_closing(handle) && session->throttled) { + io_start_read(handle); + session->throttled = false; + } + } + } + /* Update stats */ struct worker_ctx *worker = task->worker; + worker->stats.concurrent -= 1; + /* Return mempool to ring or free it if it's full */ pool_release(worker, task->req.pool.ctx); /* @note The 'task' is invalidated from now on. */ /* Decommit memory every once in a while */ @@ -312,17 +300,65 @@ static void qr_task_free(struct qr_task *task) } } -/* This is called when retry timer closes */ -static void retransmit_close(uv_handle_t *handle) +static int qr_task_start(struct qr_task *task, knot_pkt_t *query) { - struct qr_task *task = handle->data; - qr_task_unref(task); + assert(task && query); + size_t answer_max = KNOT_WIRE_MIN_PKTSIZE; + if (!task->source.handle || task->source.handle->type == UV_TCP) { + answer_max = KNOT_WIRE_MAX_PKTSIZE; + } else if (knot_pkt_has_edns(query)) { /* EDNS */ + answer_max = MAX(knot_edns_get_payload(query->opt_rr), KNOT_WIRE_MIN_PKTSIZE); + } + + knot_pkt_t *answer = knot_pkt_new(NULL, answer_max, &task->req.pool); + if (!answer) { + return kr_error(ENOMEM); + } + task->req.answer = answer; + + /* Remember query source TSIG key */ + if (query->tsig_rr) { + task->req.qsource.key = knot_rrset_copy(query->tsig_rr, &task->req.pool); + } + + /* Start resolution */ + struct worker_ctx *worker = task->worker; + struct engine *engine = worker->engine; + kr_resolve_begin(&task->req, &engine->resolver, answer); + worker->stats.queries += 1; + /* Throttle outbound queries only when high pressure */ + if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) { + task->req.options |= QUERY_NO_THROTTLE; + } + return 0; } -/* This is called when task completes and timeout timer is closed. */ -static void qr_task_complete(uv_handle_t *handle) +/*@ Register qr_task within session. */ +static int qr_task_register(struct qr_task *task, struct session *session) +{ + int ret = array_reserve(session->tasks, session->tasks.len + 1); + if (ret != 0) { + return kr_error(ENOMEM); + } + array_push(session->tasks, task); + task->session = session; + /* Soft-limit on parallel queries, there is no "slow down" RCODE + * that we could use to signalize to client, but we can stop reading, + * an in effect shrink TCP window size. To get more precise throttling, + * we would need to copy remainder of the unread buffer and reassemble + * when resuming reading. This is NYI. */ + if (session->tasks.len >= task->worker->tcp_pipeline_max) { + uv_handle_t *handle = task->source.handle; + if (handle && !session->throttled && !uv_is_closing(handle)) { + io_stop_read(handle); + session->throttled = true; + } + } + return 0; +} + +static void qr_task_complete(struct qr_task *task) { - struct qr_task *task = handle->data; struct worker_ctx *worker = task->worker; /* Kill pending I/O requests */ ioreq_killall(task); @@ -332,51 +368,8 @@ static void qr_task_complete(uv_handle_t *handle) if (task->on_complete) { task->on_complete(worker, &task->req, task->baton); } - /* Return handle to the event loop in case - * it was exclusively taken by this task. */ - if (task->source.handle && !uv_has_ref(task->source.handle)) { - uv_ref(task->source.handle); - io_start_read(task->source.handle); - } - /* Release task */ + /* Release primary reference to task. */ qr_task_unref(task); - /* Update stats */ - worker->stats.concurrent -= 1; -} - -/* This is called when I/O timeouts */ -static void on_timeout(uv_timer_t *req) -{ - struct qr_task *task = req->data; - uv_handle_t *handle = (uv_handle_t *)req; -#ifdef DEBUG - char qname_str[KNOT_DNAME_MAXLEN] = {'\0'}, type_str[16] = {'\0'}; - knot_dname_to_str(qname_str, knot_pkt_qname(task->pktbuf), sizeof(qname_str)); - knot_rrtype_to_string(knot_pkt_qtype(task->pktbuf), type_str, sizeof(type_str)); - DEBUG_MSG("ioreq timeout %s %s %p\n", qname_str, type_str, req); -#endif - /* Ignore if this timeout is being terminated. */ - if (uv_is_closing(handle)) { - return; - } - /* Penalize all tried nameservers with a timeout. */ - struct worker_ctx *worker = task->worker; - if (task->leading && task->pending_count > 0) { - struct kr_query *qry = array_tail(task->req.rplan.pending); - struct sockaddr_in6 *addrlist = (struct sockaddr_in6 *)task->addrlist; - for (uint16_t i = 0; i < MIN(task->pending_count, task->addrlist_count); ++i) { - struct sockaddr *choice = (struct sockaddr *)(&addrlist[i]); - WITH_DEBUG { - char addr_str[INET6_ADDRSTRLEN]; - inet_ntop(choice->sa_family, kr_inaddr(choice), addr_str, sizeof(addr_str)); - QRDEBUG(qry, "wrkr", "=> server: '%s' flagged as 'bad'\n", addr_str); - } - kr_nsrep_update_rtt(&qry->ns, choice, KR_NS_TIMEOUT, worker->engine->resolver.cache_rtt); - } - } - /* Interrupt current pending request. */ - worker->stats.timeout += 1; - qr_task_step(task, NULL, NULL); } /* This is called when we send subrequest / answer */ @@ -385,15 +378,10 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status if (!task->finished) { if (status == 0 && handle) { io_start_read(handle); /* Start reading new query */ - } else { - DEBUG_MSG("ioreq send_done %p => %d, %s\n", handle, status, uv_strerror(status)); } } else { - /* Close retry timer (borrows task) */ - qr_task_ref(task); - uv_close((uv_handle_t *)&task->retry, retransmit_close); - /* Close timeout timer (finishes task) */ - uv_close((uv_handle_t *)&task->timeout, qr_task_complete); + assert(task->timeout == NULL); + qr_task_complete(task); } return status; } @@ -406,7 +394,7 @@ static void on_send(uv_udp_send_t *req, int status) qr_task_on_send(task, (uv_handle_t *)req->handle, status); } qr_task_unref(task); - ioreq_release(worker, (struct ioreq *)req); + req_release(worker, (struct req *)req); } static void on_write(uv_write_t *req, int status) @@ -417,7 +405,7 @@ static void on_write(uv_write_t *req, int status) qr_task_on_send(task, (uv_handle_t *)req->handle, status); } qr_task_unref(task); - ioreq_release(worker, (struct ioreq *)req); + req_release(worker, (struct req *)req); } static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt) @@ -425,7 +413,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad if (!handle) { return qr_task_on_send(task, handle, kr_error(EIO)); } - struct ioreq *send_req = ioreq_take(task->worker); + struct req *send_req = req_borrow(task->worker); if (!send_req) { return qr_task_on_send(task, handle, kr_error(ENOMEM)); } @@ -448,8 +436,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad if (ret == 0) { qr_task_ref(task); /* Pending ioreq on current task */ } else { - DEBUG_MSG("ioreq send_start %p => %d, %s\n", send_req, ret, uv_strerror(ret)); - ioreq_release(task->worker, send_req); + req_release(task->worker, send_req); } /* Update statistics */ @@ -475,17 +462,53 @@ static void on_connect(uv_connect_t *req, int status) if (status == 0) { qr_task_send(task, (uv_handle_t *)handle, NULL, task->pktbuf); } else { - DEBUG_MSG("ioreq conn_done %p => %d, %s\n", req, status, uv_strerror(status)); qr_task_step(task, task->addrlist, NULL); } } qr_task_unref(task); - ioreq_release(worker, (struct ioreq *)req); + req_release(worker, (struct req *)req); +} + +static void on_timer_close(uv_handle_t *handle) +{ + struct qr_task *task = handle->data; + req_release(task->worker, (struct req *)handle); + qr_task_unref(task); +} + +/* This is called when I/O timeouts */ +static void on_timeout(uv_timer_t *req) +{ + struct qr_task *task = req->data; + + /* Penalize all tried nameservers with a timeout. */ + struct worker_ctx *worker = task->worker; + if (task->leading && task->pending_count > 0) { + struct kr_query *qry = array_tail(task->req.rplan.pending); + struct sockaddr_in6 *addrlist = (struct sockaddr_in6 *)task->addrlist; + for (uint16_t i = 0; i < MIN(task->pending_count, task->addrlist_count); ++i) { + struct sockaddr *choice = (struct sockaddr *)(&addrlist[i]); + WITH_DEBUG { + char addr_str[INET6_ADDRSTRLEN]; + inet_ntop(choice->sa_family, kr_inaddr(choice), addr_str, sizeof(addr_str)); + QRDEBUG(qry, "wrkr", "=> server: '%s' flagged as 'bad'\n", addr_str); + } + kr_nsrep_update_rtt(&qry->ns, choice, KR_NS_TIMEOUT, worker->engine->resolver.cache_rtt); + } + } + /* Release timer handle */ + task->timeout = NULL; + req_release(worker, (struct req *)req); + /* Interrupt current pending request. */ + task->timeouts += 1; + worker->stats.timeout += 1; + qr_task_step(task, NULL, NULL); + qr_task_unref(task); /* Return borrowed task */ } static bool retransmit(struct qr_task *task) { - if (task && task->addrlist) { + if (task && task->addrlist && task->addrlist_count > 0) { uv_handle_t *subreq = ioreq_spawn(task, SOCK_DGRAM); if (subreq) { /* Create connection for iterative query */ struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn]; @@ -500,13 +523,38 @@ static bool retransmit(struct qr_task *task) static void on_retransmit(uv_timer_t *req) { - if (uv_is_closing((uv_handle_t *)req)) - return; + uv_timer_stop(req); + struct qr_task *task = req->data; if (!retransmit(req->data)) { - uv_timer_stop(req); /* Not possible to spawn request, stop trying */ + /* Not possible to spawn request, start timeout timer with remaining deadline. */ + uint64_t timeout = KR_CONN_RTT_MAX - task->pending_count * KR_CONN_RETRY; + uv_timer_start(req, on_timeout, timeout, 0); + } else { + uv_timer_start(req, on_retransmit, KR_CONN_RETRY, 0); } } +static int timer_start(struct qr_task *task, uv_timer_cb cb, uint64_t timeout, uint64_t repeat) +{ + assert(task->timeout == NULL); + struct worker_ctx *worker = task->worker; + uv_timer_t *timer = (uv_timer_t *)req_borrow(worker); + if (!timer) { + return kr_error(ENOMEM); + } + uv_timer_init(worker->loop, timer); + int ret = uv_timer_start(timer, cb, timeout, repeat); + if (ret != 0) { + uv_timer_stop(timer); + req_release(worker, (struct req *)timer); + return kr_error(ENOMEM); + } + timer->data = task; + qr_task_ref(task); + task->timeout = timer; + return 0; +} + /** @internal Get key from current outstanding subrequest. */ static int subreq_key(char *dst, struct qr_task *task) { @@ -518,11 +566,14 @@ static int subreq_key(char *dst, struct qr_task *task) static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt) { - /* Close pending I/O requests */ - if (uv_is_active((uv_handle_t *)&task->retry)) - uv_timer_stop(&task->retry); - if (uv_is_active((uv_handle_t *)&task->timeout)) - uv_timer_stop(&task->timeout); + /* Close pending timer */ + if (task->timeout) { + /* Timer was running so it holds reference to task, make sure the timer event + * never fires and release the reference on timer close instead. */ + uv_timer_stop(task->timeout); + uv_close((uv_handle_t *)task->timeout, on_timer_close); + task->timeout = NULL; + } ioreq_killall(task); /* Clear from outstanding table. */ if (!task->leading) @@ -594,7 +645,7 @@ static int qr_task_finalize(struct qr_task *task, int state) static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet) { /* No more steps after we're finished. */ - if (task->finished) { + if (!task || task->finished) { return kr_error(ESTALE); } /* Close pending I/O requests */ @@ -607,8 +658,7 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour int state = kr_resolve_consume(&task->req, packet_source, packet); while (state == KNOT_STATE_PRODUCE) { state = kr_resolve_produce(&task->req, &task->addrlist, &sock_type, task->pktbuf); - if (unlikely(++task->iter_count > KR_ITER_LIMIT)) { - DEBUG_MSG("task iter_limit %p\n", task); + if (unlikely(++task->iter_count > KR_ITER_LIMIT || task->timeouts >= KR_TIMEOUT_LIMIT)) { return qr_task_finalize(task, KNOT_STATE_FAIL); } } @@ -628,6 +678,7 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour } /* Start fast retransmit with UDP, otherwise connect. */ + int ret = 0; if (sock_type == SOCK_DGRAM) { /* If such subrequest is outstanding, enqueue to it. */ if (subreq_enqueue(task)) { @@ -635,7 +686,7 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour } /* Start transmitting */ if (retransmit(task)) { - uv_timer_start(&task->retry, on_retransmit, KR_CONN_RETRY, KR_CONN_RETRY); + ret = timer_start(task, on_retransmit, KR_CONN_RETRY, 0); } else { return qr_task_step(task, NULL, NULL); } @@ -644,38 +695,37 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour */ subreq_lead(task); } else { - struct ioreq *conn = ioreq_take(task->worker); + uv_connect_t *conn = (uv_connect_t *)req_borrow(task->worker); if (!conn) { return qr_task_step(task, NULL, NULL); } uv_handle_t *client = ioreq_spawn(task, sock_type); if (!client) { - ioreq_release(task->worker, conn); + req_release(task->worker, (struct req *)conn); return qr_task_step(task, NULL, NULL); } - conn->as.connect.data = task; - if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) { - ioreq_release(task->worker, conn); + conn->data = task; + if (uv_tcp_connect(conn, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) { + req_release(task->worker, (struct req *)conn); return qr_task_step(task, NULL, NULL); } - /* Connect request borrows task */ - qr_task_ref(task); + qr_task_ref(task); /* Connect request borrows task */ + ret = timer_start(task, on_timeout, KR_CONN_RTT_MAX, 0); } /* Start next step with timeout, fatal if can't start a timer. */ - int ret = uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0); if (ret != 0) { subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KNOT_STATE_FAIL); } - - return ret; + return 0; } static int parse_packet(knot_pkt_t *query) { - if (!query) + if (!query){ return kr_error(EINVAL); + } /* Parse query packet. */ int ret = knot_pkt_parse(query, 0); @@ -691,90 +741,185 @@ static int parse_packet(knot_pkt_t *query) return kr_ok(); } -int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr) +int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *msg, const struct sockaddr* addr) { if (!worker || !handle) { return kr_error(EINVAL); } + struct session *session = handle->data; + assert(session); + /* Parse packet */ - int ret = parse_packet(query); + int ret = parse_packet(msg); - /* Start new task on master sockets, or resume existing */ - struct qr_task *task = handle->data; - bool is_master_socket = (!task); - if (is_master_socket) { + /* Start new task on listening sockets, or resume if this is subrequest */ + struct qr_task *task = NULL; + if (!session->is_subreq) { /* Ignore badly formed queries or responses. */ - if (!query || ret != 0 || knot_wire_get_qr(query->wire)) { - DEBUG_MSG("task bad_query %p => %d, %s\n", task, ret, kr_strerror(ret)); - worker->stats.dropped += 1; + if (!msg || ret != 0 || knot_wire_get_qr(msg->wire)) { + if (msg) worker->stats.dropped += 1; return kr_error(EINVAL); /* Ignore. */ } - task = qr_task_create(worker, handle, query, addr); + task = qr_task_create(worker, handle, addr); if (!task) { return kr_error(ENOMEM); } + ret = qr_task_start(task, msg); + if (ret != 0) { + qr_task_free(task); + return kr_error(ENOMEM); + } + } else { + task = session->tasks.len > 0 ? array_tail(session->tasks) : NULL; } - /* Consume input and produce next query */ - return qr_task_step(task, addr, query); + /* Consume input and produce next message */ + return qr_task_step(task, addr, msg); } /* Return DNS/TCP message size. */ -static int msg_size(const uint8_t *msg, size_t len) +static int msg_size(const uint8_t *msg) { - if (len < 2) { - return kr_error(EMSGSIZE); - } return wire_read_u16(msg); } -int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len) +/* If buffering, close last task as it isn't live yet. */ +static void discard_buffered(struct session *session) +{ + if (session->buffering) { + qr_task_free(session->buffering); + session->buffering = NULL; + } +} + +int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle) { - if (!worker || !handle || !msg) { + if (!worker || !handle) { return kr_error(EINVAL); } + /* If this is subrequest, notify parent task with empty input + * because in this case session doesn't own tasks, it has just + * borrowed the task from parent session. */ + struct session *session = handle->data; + if (session->is_subreq) { + worker_submit(worker, (uv_handle_t *)handle, NULL, NULL); + } else { + discard_buffered(session); + } + return 0; +} - int nbytes = msg_size(msg, len); - struct qr_task *task = handle->data; - const bool start_assembly = (task && task->bytes_remaining == 0); +int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, ssize_t len) +{ + if (!worker || !handle) { + return kr_error(EINVAL); + } + /* Connection error or forced disconnect */ + struct session *session = handle->data; + if (len <= 0 || !msg) { + /* If we have pending tasks, we must dissociate them from the + * connection so they don't try to access closed and freed handle */ + for (size_t i = 0; i < session->tasks.len; ++i) { + struct qr_task *task = session->tasks.at[i]; + task->session = NULL; + task->source.handle = NULL; + } + session->tasks.len = 0; + return kr_error(ECONNRESET); + } + + int submitted = 0; + ssize_t nbytes = 0; + struct qr_task *task = session->buffering; - /* Message is a query (we have no context to buffer it) or complete. */ - if (!task || (start_assembly && nbytes == len - 2)) { - if (nbytes <= 0) { - return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL); + /* If this is a new query, create a new task that we can use + * to buffer incoming message until it's complete. */ + if (!session->is_subreq) { + if (!task) { + task = qr_task_create(worker, handle, NULL); + if (!task) { + return kr_error(ENOMEM); + } + session->buffering = task; } - knot_pkt_t *pkt_nocopy = knot_pkt_new((void *)(msg + 2), nbytes, &worker->pkt_pool); - return worker_exec(worker, handle, pkt_nocopy, NULL); + } else { + assert(session->tasks.len > 0); + task = array_tail(session->tasks); } - /* Starting a new message assembly */ + /* At this point session must have either created new task or it's already assigned. */ + assert(task); + assert(len > 0); + /* Start reading DNS/TCP message length */ knot_pkt_t *pkt_buf = task->pktbuf; - if (start_assembly) { - if (nbytes <= 0) { - return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL); - } + if (task->bytes_remaining == 0 && pkt_buf->size == 0) { knot_pkt_clear(pkt_buf); + /* Read only one byte as TCP fragment may end at a 1B boundary + * which would lead to OOB read or improper reassembly length. */ + pkt_buf->size = 1; + pkt_buf->wire[0] = msg[0]; + len -= 1; + msg += 1; + if (len == 0) { + return 0; + } + } + /* Finish reading DNS/TCP message length. */ + if (task->bytes_remaining == 0 && pkt_buf->size == 1) { + pkt_buf->wire[1] = msg[0]; + nbytes = msg_size(pkt_buf->wire); + len -= 1; + msg += 1; + /* Cut off fragment length and start reading DNS message. */ pkt_buf->size = 0; - /* Cut off message length */ task->bytes_remaining = nbytes; - len -= 2; - msg += 2; } /* Message is too long, can't process it. */ - if (len > pkt_buf->max_size - pkt_buf->size) { + ssize_t to_read = MIN(len, task->bytes_remaining); + if (to_read > (ssize_t)(pkt_buf->max_size - pkt_buf->size)) { + pkt_buf->size = 0; task->bytes_remaining = 0; - return worker_exec(worker, handle, NULL, NULL); + return kr_error(EMSGSIZE); } /* Buffer message and check if it's complete */ - memcpy(pkt_buf->wire + pkt_buf->size, msg, len); - pkt_buf->size += len; - if (len >= task->bytes_remaining) { + memcpy(pkt_buf->wire + pkt_buf->size, msg, to_read); + pkt_buf->size += to_read; + if (to_read >= task->bytes_remaining) { task->bytes_remaining = 0; - return worker_exec(worker, handle, pkt_buf, NULL); + /* Parse the packet and start resolving complete query */ + int ret = parse_packet(pkt_buf); + if (ret == 0 && !session->is_subreq) { + ret = qr_task_start(task, pkt_buf); + if (ret != 0) { + return ret; + } + ret = qr_task_register(task, session); + if (ret != 0) { + return ret; + } + /* Task is now registered in session, clear temporary. */ + session->buffering = NULL; + submitted += 1; + } + /* Start only new queries, not subrequests that are already pending */ + if (ret == 0) { + ret = qr_task_step(task, NULL, pkt_buf); + } + /* Process next message part in the stream if no error so far */ + if (ret != 0) { + return ret; + } + if (len - to_read > 0 && !session->is_subreq) { + ret = worker_process_tcp(worker, handle, msg + to_read, len - to_read); + if (ret < 0) { + return ret; + } + submitted += ret; + } + } else { + task->bytes_remaining -= to_read; } - /* Return number of bytes remaining to receive. */ - task->bytes_remaining -= len; - return task->bytes_remaining; + return submitted; } int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton) @@ -784,26 +929,36 @@ int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned option } /* Create task */ - struct qr_task *task = qr_task_create(worker, NULL, query, NULL); + struct qr_task *task = qr_task_create(worker, NULL, NULL); if (!task) { return kr_error(ENOMEM); } task->baton = baton; task->on_complete = on_complete; task->req.options |= options; + /* Start task */ + int ret = qr_task_start(task, query); + if (ret != 0) { + qr_task_unref(task); + return ret; + } return qr_task_step(task, NULL, query); } int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) { - array_init(worker->pools); - array_init(worker->ioreqs); - if (array_reserve(worker->pools, ring_maxlen) || array_reserve(worker->ioreqs, ring_maxlen)) + array_init(worker->pool_mp); + array_init(worker->pool_ioreq); + array_init(worker->pool_sessions); + if (array_reserve(worker->pool_mp, ring_maxlen) || + array_reserve(worker->pool_ioreq, ring_maxlen) || + array_reserve(worker->pool_sessions, ring_maxlen)) return kr_error(ENOMEM); memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool)); worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t)); worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc; worker->outstanding = map_make(); + worker->tcp_pipeline_max = MAX_PIPELINED; return kr_ok(); } @@ -817,8 +972,9 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) void worker_reclaim(struct worker_ctx *worker) { - reclaim_freelist(worker->pools, struct mempool, mp_delete); - reclaim_freelist(worker->ioreqs, struct ioreq, free); + reclaim_freelist(worker->pool_mp, struct mempool, mp_delete); + reclaim_freelist(worker->pool_ioreq, struct req, free); + reclaim_freelist(worker->pool_sessions, struct session, session_free); mp_delete(worker->pkt_pool.ctx); worker->pkt_pool.ctx = NULL; map_clear(&worker->outstanding); diff --git a/daemon/worker.h b/daemon/worker.h index c598c7ba6cb8b8bb222877a41e6de92e4b8cb375..aeda6a68b9c5a760f03a5421783eedf35fbe18b9 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -20,9 +20,11 @@ #include "lib/generic/array.h" #include "lib/generic/map.h" +/** @internal Number of request within timeout window. */ +#define MAX_PENDING (KR_NSREP_MAXADDR + (KR_NSREP_MAXADDR / 2)) + /** @cond internal Freelist of available mempools. */ typedef array_t(void *) mp_freelist_t; -/* @endcond */ /** * Query resolution worker. @@ -32,6 +34,7 @@ struct worker_ctx { uv_loop_t *loop; int id; int count; + unsigned tcp_pipeline_max; #if __linux__ uint8_t wire_buf[RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE]; #else @@ -48,27 +51,67 @@ struct worker_ctx { size_t timeout; } stats; map_t outstanding; - mp_freelist_t pools; - mp_freelist_t ioreqs; + mp_freelist_t pool_mp; + mp_freelist_t pool_ioreq; + mp_freelist_t pool_sessions; knot_mm_t pkt_pool; }; /* Worker callback */ typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, void *baton); +/** @internal Query resolution task. */ +struct qr_task +{ + struct kr_request req; + struct worker_ctx *worker; + struct session *session; + knot_pkt_t *pktbuf; + array_t(struct qr_task *) waiting; + uv_handle_t *pending[MAX_PENDING]; + uint16_t pending_count; + uint16_t addrlist_count; + uint16_t addrlist_turn; + uint16_t timeouts; + uint16_t iter_count; + uint16_t bytes_remaining; + struct sockaddr *addrlist; + uv_timer_t *timeout; + worker_cb_t on_complete; + void *baton; + struct { + union { + struct sockaddr_in ip4; + struct sockaddr_in6 ip6; + } addr; + uv_handle_t *handle; + } source; + uint32_t refs; + bool finished : 1; + bool leading : 1; +}; +/* @endcond */ + /** * Process incoming packet (query or answer to subrequest). * @return 0 or an error code */ -int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr); +int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr); /** - * Process incoming DNS/TCP message fragment. + * Process incoming DNS/TCP message fragment(s). * If the fragment contains only a partial message, it is buffered. * If the fragment contains a complete query or completes current fragment, execute it. - * @return 0, number of bytes remaining to assemble, or an error code + * @return 0 or an error code */ -int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len); +int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, ssize_t len); + +/** + * End current DNS/TCP session, this disassociates pending tasks from this session + * which may be freely closed afterwards. + */ +int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle); + /** * Schedule query for resolution. diff --git a/lib/defines.h b/lib/defines.h index aeda472b959d4c7d378eb6f83db18ddf8fd6661c..8d9796c80ca2f3645121c9935b672592d93ecaa0 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -54,6 +54,7 @@ static inline int __attribute__((__cold__)) kr_error(int x) { #define KR_CONN_RETRY 300 /* Retry interval for network activity */ #define KR_ITER_LIMIT 50 /* Built-in iterator limit */ #define KR_CNAME_CHAIN_LIMIT 40 /* Built-in maximum CNAME chain length */ +#define KR_TIMEOUT_LIMIT 4 /* Maximum number of retries after timeout. */ /* * Defines. diff --git a/lib/zonecut.c b/lib/zonecut.c index 8ef2d1c39ba0523f5395f425c1ee3d712573323d..b3de6e7eddbe96520cb4168623c677fd8769195b 100644 --- a/lib/zonecut.c +++ b/lib/zonecut.c @@ -114,6 +114,7 @@ void kr_zonecut_deinit(struct kr_zonecut *cut) map_clear(&cut->nsset); knot_rrset_free(&cut->key, cut->pool); knot_rrset_free(&cut->trust_anchor, cut->pool); + cut->name = NULL; } void kr_zonecut_set(struct kr_zonecut *cut, const knot_dname_t *name) diff --git a/tests/tests.mk b/tests/tests.mk index dbb0efeef794adebd75d044b602f8e01ae3a3e56..c971de628f3d92d2442d65b5d4afbdf4969df828 100644 --- a/tests/tests.mk +++ b/tests/tests.mk @@ -10,8 +10,9 @@ deckard_DIR := tests/deckard TESTS := sets/resolver TEMPLATE := template/kresd.j2 $(deckard_DIR)/Makefile: - @git submodule update --init + @git submodule update --init --recursive check-integration: $(deckard_DIR)/Makefile + @[ ! -d $(deckard_DIR)/contrib/libswrap/obj ] && mkdir $(deckard_DIR)/contrib/libswrap/obj @$(MAKE) -s -C $(deckard_DIR) TESTS=$(TESTS) DAEMON=$(abspath daemon/kresd) TEMPLATE=$(TEMPLATE) DYLD_LIBRARY_PATH=$(DYLD_LIBRARY_PATH) deckard: check-integration