From ab4816f201594f47640d825f77cec21d50c82eb6 Mon Sep 17 00:00:00 2001 From: Grigorii Demidov <grigorii.demidov@nic.cz> Date: Thu, 13 Sep 2018 17:28:23 +0200 Subject: [PATCH] daemon: logic around struct session was relocated to separate module; input data buffering scheme was changed (libuv); attempt was made to simplify processing of the stream --- daemon/daemon.mk | 1 + daemon/io.c | 246 +++++----- daemon/io.h | 27 -- daemon/session.c | 623 ++++++++++++++++++++++++ daemon/session.h | 142 ++++++ daemon/tls.c | 48 +- daemon/tls.h | 2 +- daemon/worker.c | 1211 +++++++++++++++------------------------------- daemon/worker.h | 60 ++- lib/defines.h | 4 + 10 files changed, 1329 insertions(+), 1035 deletions(-) create mode 100644 daemon/session.c create mode 100644 daemon/session.h diff --git a/daemon/daemon.mk b/daemon/daemon.mk index c3c15075e..eef6e8e5b 100644 --- a/daemon/daemon.mk +++ b/daemon/daemon.mk @@ -9,6 +9,7 @@ kresd_SOURCES := \ daemon/tls_ephemeral_credentials.c \ daemon/tls_session_ticket-srv.c \ daemon/zimport.c \ + daemon/session.c \ daemon/main.c kresd_DIST := daemon/lua/kres.lua daemon/lua/kres-gen.lua \ diff --git a/daemon/io.c b/daemon/io.c index e5b8a139a..09eaed48a 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -24,6 +24,7 @@ #include "daemon/network.h" #include "daemon/worker.h" #include "daemon/tls.h" +#include "daemon/session.h" #define negotiate_bufsize(func, handle, bufsize_want) do { \ int bufsize = 0; func(handle, &bufsize); \ @@ -48,86 +49,35 @@ static void check_bufsize(uv_handle_t* handle) #undef negotiate_bufsize -static void session_clear(struct session *s) +static uv_stream_t *handle_borrow(uv_loop_t *loop) { - assert(s->tasks.len == 0 && s->waiting.len == 0); - array_clear(s->tasks); - array_clear(s->waiting); - tls_free(s->tls_ctx); - tls_client_ctx_free(s->tls_client_ctx); - memset(s, 0, sizeof(*s)); -} - -void session_free(struct session *s) -{ - if (s) { - assert(s->tasks.len == 0 && s->waiting.len == 0); - session_clear(s); - free(s); + struct worker_ctx *worker = loop->data; + void *req = worker_iohandle_borrow(worker); + if (!req) { + return NULL; } -} - -struct session *session_new(void) -{ - return calloc(1, sizeof(struct session)); -} -static struct session *session_borrow(struct worker_ctx *worker) -{ - struct session *s = NULL; - if (worker->pool_sessions.len > 0) { - s = array_tail(worker->pool_sessions); - array_pop(worker->pool_sessions); - kr_asan_unpoison(s, sizeof(*s)); - } else { - s = session_new(); - } - return s; -} - -static void session_release(struct worker_ctx *worker, uv_handle_t *handle) -{ - if (!worker || !handle) { - return; - } - struct session *s = handle->data; - if (!s) { - return; - } - assert(s->waiting.len == 0 && s->tasks.len == 0); - assert(s->buffering == NULL); - if (!s->outgoing && handle->type == UV_TCP) { - worker_end_tcp(worker, handle); /* to free the buffering task */ - } - if (worker->pool_sessions.len < MP_FREELIST_SIZE) { - session_clear(s); - array_push(worker->pool_sessions, s); - kr_asan_poison(s, sizeof(*s)); - } else { - session_free(s); - } + return (uv_stream_t *)req; } static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - /* Worker has single buffer which is reused for all incoming - * datagrams / stream reads, the content of the buffer is + /* UDP sessions use worker buffer for wire data, + * TCP sessions use session buffer for wire data + * (see session_set_handle()). + * TLS sessions use buffer from TLS context. + * The content of the worker buffer is * guaranteed to be unchanged only for the duration of * udp_read() and tcp_read(). */ struct session *session = handle->data; - uv_loop_t *loop = handle->loop; - struct worker_ctx *worker = loop->data; - buf->base = (char *)worker->wire_buf; - /* Limit TCP stream buffer size to 4K for granularity in batches of incoming queries. */ - if (handle->type == UV_TCP) { - buf->len = MIN(suggested_size, 4096); - /* Regular buffer size for subrequests. */ - } else if (session->outgoing) { - buf->len = suggested_size; - /* Use recvmmsg() on master sockets if possible. */ + if (!session_has_tls(session)) { + buf->base = (char *) session_wirebuf_get_free_start(session); + buf->len = session_wirebuf_get_free_size(session); } else { - buf->len = sizeof(worker->wire_buf); + struct tls_common_ctx *ctx = session_tls_get_common_ctx(session); + buf->base = (char *) ctx->recv_buf; + buf->len = sizeof(ctx->recv_buf); } } @@ -137,29 +87,30 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, uv_loop_t *loop = handle->loop; struct worker_ctx *worker = loop->data; struct session *s = handle->data; - if (s->closing) { + if (session_is_closing(s)) { return; } if (nread <= 0) { if (nread < 0) { /* Error response, notify resolver */ - worker_submit(worker, (uv_handle_t *)handle, NULL, addr); + worker_submit(s, NULL); } /* nread == 0 is for freeing buffers, we don't need to do this */ return; } if (addr->sa_family == AF_UNSPEC) { return; } - if (s->outgoing) { - assert(s->peer.ip.sa_family != AF_UNSPEC); - if (kr_sockaddr_cmp(&s->peer.ip, addr) != 0) { + struct sockaddr *peer = session_get_peer(s); + if (session_is_outgoing(s)) { + assert(peer->sa_family != AF_UNSPEC); + if (kr_sockaddr_cmp(peer, addr) != 0) { return; } + } else { + memcpy(peer, addr, kr_sockaddr_len(addr)); } - knot_pkt_t *query = knot_pkt_new(buf->base, nread, &worker->pkt_pool); - if (query) { - query->max_size = KNOT_WIRE_MAX_PKTSIZE; - worker_submit(worker, (uv_handle_t *)handle, query, addr); - } + ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base, nread); + assert(consumed == nread); + session_wirebuf_process(s); mp_flush(worker->pkt_pool.ctx); } @@ -167,11 +118,10 @@ static int udp_bind_finalize(uv_handle_t *handle) { check_bufsize(handle); /* Handle is already created, just create context. */ - struct session *session = session_new(); - assert(session); - session->outgoing = false; - session->handle = handle; - handle->data = session; + struct session *s = session_new(); + assert(s); + session_set_outgoing(s, false); + session_set_handle(s, handle); return io_start_read(handle); } @@ -203,14 +153,14 @@ int udp_bindfd(uv_udp_t *handle, int fd) static void tcp_timeout_trigger(uv_timer_t *timer) { - struct session *session = timer->data; + struct session *s = timer->data; - assert(session->outgoing == false); - if (session->tasks.len > 0) { + assert(session_is_outgoing(s) == false); + if (!session_tasklist_is_empty(s)) { uv_timer_again(timer); - } else if (!session->closing) { + } else if (!session_is_closing(s)) { uv_timer_stop(timer); - worker_session_close(session); + session_close(s); } } @@ -218,50 +168,78 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { uv_loop_t *loop = handle->loop; struct session *s = handle->data; - if (s->closing) { + + assert(s && session_get_handle(s) == (uv_handle_t *)handle && + handle->type == UV_TCP); + + if (session_is_closing(s)) { return; } + /* nread might be 0, which does not indicate an error or EOF. * This is equivalent to EAGAIN or EWOULDBLOCK under read(2). */ if (nread == 0) { return; } - if (nread == UV_EOF) { - nread = 0; - } + struct worker_ctx *worker = loop->data; - /* TCP pipelining is rather complicated and requires cooperation from the worker - * so the whole message reassembly and demuxing logic is inside worker */ - int ret = 0; - if (s->has_tls) { - ret = tls_process(worker, handle, (const uint8_t *)buf->base, nread); - } else { - ret = worker_process_tcp(worker, handle, (const uint8_t *)buf->base, nread); + + if (nread < 0 || !buf->base) { + if (kr_verbose_status) { + struct sockaddr *peer = session_get_peer(s); + char peer_str[INET6_ADDRSTRLEN]; + inet_ntop(peer->sa_family, kr_inaddr(peer), + peer_str, sizeof(peer_str)); + kr_log_verbose("[io] => connection to '%s' closed by peer (%s)\n", peer_str, + uv_strerror(nread)); + } + worker_end_tcp(s); + return; } + + ssize_t consumed = 0; + const uint8_t *data = (const uint8_t *)buf->base; + ssize_t data_len = nread; + if (session_has_tls(s)) { + /* buf->base points to start of the tls receive buffer. + Decode data free space in session wire buffer. */ + consumed = tls_process_input_data(s, (const uint8_t *)buf->base, nread); + data = session_wirebuf_get_free_start(s); + data_len = consumed; + } + + /* data points to start of the free space in session wire buffer. + Simple increase internal counter. */ + consumed = session_wirebuf_consume(s, data, data_len); + assert(consumed == data_len); + + int ret = session_wirebuf_process(s); if (ret < 0) { - worker_end_tcp(worker, (uv_handle_t *)handle); + worker_end_tcp(s); /* Exceeded per-connection quota for outstanding requests * stop reading from stream and close after last message is processed. */ - if (!s->outgoing && !uv_is_closing((uv_handle_t *)&s->timeout)) { - uv_timer_stop(&s->timeout); - if (s->tasks.len == 0) { - worker_session_close(s); + uv_timer_t *t = session_get_timer(s); + if (!session_is_outgoing(s) && !uv_is_closing((uv_handle_t *)t)) { + uv_timer_stop(t); + if (session_tasklist_is_empty(s)) { + session_close(s); } else { /* If there are tasks running, defer until they finish. */ - uv_timer_start(&s->timeout, tcp_timeout_trigger, + uv_timer_start(t, tcp_timeout_trigger, MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY); } } /* Connection spawned at least one request, reset its deadline for next query. * https://tools.ietf.org/html/rfc7766#section-6.2.3 */ - } else if (ret > 0 && !s->outgoing && !s->closing) { - uv_timer_again(&s->timeout); + } else if (ret > 0 && !session_is_outgoing(s) && !session_is_closing(s)) { + uv_timer_t *t = session_get_timer(s); + uv_timer_again(t); } mp_flush(worker->pkt_pool.ctx); } static void _tcp_accept(uv_stream_t *master, int status, bool tls) { - if (status != 0) { + if (status != 0) { return; } @@ -298,37 +276,40 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls) /* Set deadlines for TCP connection and start reading. * It will re-check every half of a request time limit if the connection * is idle and should be terminated, this is an educated guess. */ - - struct sockaddr *addr = &(session->peer.ip); - int addr_len = sizeof(union inaddr); - int ret = uv_tcp_getpeername((uv_tcp_t *)client, addr, &addr_len); - if (ret || addr->sa_family == AF_UNSPEC) { - /* close session, close underlying uv handles and - * deallocate (or return to memory pool) memory. */ - worker_session_close(session); + struct session *s = client->data; + assert(session_is_outgoing(s) == false); + + struct sockaddr *peer = session_get_peer(s); + int peer_len = sizeof(union inaddr); + int ret = uv_tcp_getpeername((uv_tcp_t *)client, peer, &peer_len); + if (ret || peer->sa_family == AF_UNSPEC) { + session_close(s); return; } + struct worker_ctx *worker = (struct worker_ctx *)master->loop->data; const struct engine *engine = worker->engine; const struct network *net = &engine->net; uint64_t idle_in_timeout = net->tcp.in_idle_timeout; uint64_t timeout = KR_CONN_RTT_MAX / 2; - session->has_tls = tls; + session_set_has_tls(s, tls); if (tls) { timeout += TLS_MAX_HANDSHAKE_TIME; - if (!session->tls_ctx) { - session->tls_ctx = tls_new(master->loop->data); - if (!session->tls_ctx) { - worker_session_close(session); + struct tls_ctx_t *ctx = session_tls_get_server_ctx(s); + if (!ctx) { + ctx = tls_new(worker); + if (!ctx) { + session_close(s); return; } - session->tls_ctx->c.session = session; - session->tls_ctx->c.handshake_state = TLS_HS_IN_PROGRESS; + ctx->c.session = s; + ctx->c.handshake_state = TLS_HS_IN_PROGRESS; + session_tls_set_server_ctx(s, ctx); } } - uv_timer_t *timer = &session->timeout; - uv_timer_start(timer, tcp_timeout_trigger, timeout, idle_in_timeout); + uv_timer_t *t = session_get_timer(s); + uv_timer_start(t, tcp_timeout_trigger, timeout, idle_in_timeout); io_start_read((uv_handle_t *)client); } @@ -444,13 +425,12 @@ int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family) return ret; } struct worker_ctx *worker = loop->data; - struct session *session = session_borrow(worker); - assert(session); - session->handle = handle; - handle->data = session; - session->timeout.data = session; - uv_timer_init(worker->loop, &session->timeout); - return ret; + struct session *s = worker_session_borrow(worker); + assert(s); + session_set_handle(s, handle); + uv_timer_t *t = session_get_timer(s); + t->data = s; + uv_timer_init(worker->loop, t); } void io_deinit(uv_handle_t *handle) @@ -461,7 +441,7 @@ void io_deinit(uv_handle_t *handle) uv_loop_t *loop = handle->loop; if (loop && loop->data) { struct worker_ctx *worker = loop->data; - session_release(worker, handle); + worker_session_release(worker, handle); } else { session_free(handle->data); } diff --git a/daemon/io.h b/daemon/io.h index 428cc62a3..1b5e5791d 100644 --- a/daemon/io.h +++ b/daemon/io.h @@ -25,33 +25,6 @@ struct tls_ctx_t; struct tls_client_ctx_t; -/* Per-session (TCP or UDP) persistent structure, - * that exists between remote counterpart and a local socket. - */ -struct session { - bool outgoing; /**< True: to upstream; false: from a client. */ - bool throttled; - bool has_tls; - bool connected; - bool closing; - union inaddr peer; - uv_handle_t *handle; - uv_timer_t timeout; - struct qr_task *buffering; /**< Worker buffers the incomplete TCP query here. */ - struct tls_ctx_t *tls_ctx; - struct tls_client_ctx_t *tls_client_ctx; - - uint8_t msg_hdr[4]; /**< Buffer for DNS message header. */ - ssize_t msg_hdr_idx; /**< The number of bytes in msg_hdr filled so far. */ - - qr_tasklist_t tasks; - qr_tasklist_t waiting; - ssize_t bytes_to_skip; -}; - -void session_free(struct session *s); -struct session *session_new(void); - int udp_bind(uv_udp_t *handle, struct sockaddr *addr); int udp_bindfd(uv_udp_t *handle, int fd); int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr); diff --git a/daemon/session.c b/daemon/session.c new file mode 100644 index 000000000..7133baf72 --- /dev/null +++ b/daemon/session.c @@ -0,0 +1,623 @@ +#include <assert.h> + +#include <libknot/packet/pkt.h> + +#include "lib/defines.h" +#include "daemon/session.h" +#include "daemon/engine.h" +#include "daemon/tls.h" +#include "daemon/worker.h" +#include "daemon/io.h" + +/** List of tasks. */ +typedef array_t(struct qr_task *) session_tasklist_t; + +struct session_flags { + bool outgoing : 1; /**< True: to upstream; false: from a client. */ + bool throttled : 1; /**< True: data reading from peer is temporarily stopped. */ + bool has_tls : 1; /**< True: given session uses TLS. */ + bool connected : 1; /**< True: TCP connection is established. */ + bool closing : 1; /**< True: session close sequence is in progress. */ + bool wirebuf_error : 1; /**< True: last operation with wirebuf ended up with an error. */ +}; + + +/* Per-session (TCP or UDP) persistent structure, + * that exists between remote counterpart and a local socket. + */ +struct session { + struct session_flags sflags; /**< miscellaneous flags. */ + union inaddr peer; /**< address of peer; is not set for client's UDP sessions. */ + uv_handle_t *handle; /**< libuv handle for IO operations. */ + uv_timer_t timeout; /**< libuv handle for timer. */ + + struct tls_ctx_t *tls_ctx; /**< server side tls-related data. */ + struct tls_client_ctx_t *tls_client_ctx; /**< client side tls-related data. */ + + session_tasklist_t tasks; /**< list of tasks which assotiated with given session. */ + session_tasklist_t waiting; /**< list of tasks been waiting for IO (subset of taska). */ + + uint8_t *wire_buf; /**< Buffer for DNS message. */ + ssize_t wire_buf_size; /**< Buffer size. */ + ssize_t wire_buf_idx; /**< The number of bytes in wire_buf filled so far. */ +}; + +static void on_session_close(uv_handle_t *handle) +{ + uv_loop_t *loop = handle->loop; + struct worker_ctx *worker = loop->data; + struct session *session = handle->data; + assert(session->handle == handle); + io_deinit(handle); + worker_iohandle_release(worker, handle); +} + +static void on_session_timer_close(uv_handle_t *timer) +{ + struct session *session = timer->data; + uv_handle_t *handle = session->handle; + assert(handle && handle->data == session); + assert (session->sflags.outgoing || handle->type == UV_TCP); + if (!uv_is_closing(handle)) { + uv_close(handle, on_session_close); + } +} + +void session_free(struct session *s) +{ + if (s) { + assert(s->tasks.len == 0 && s->waiting.len == 0); + session_clear(s); + free(s); + } +} + +void session_clear(struct session *s) +{ + assert(s->tasks.len == 0 && s->waiting.len == 0); + if (s->handle && s->handle->type == UV_TCP) { + free(s->wire_buf); + } + array_clear(s->tasks); + array_clear(s->waiting); + tls_free(s->tls_ctx); + tls_client_ctx_free(s->tls_client_ctx); + memset(s, 0, sizeof(*s)); +} + +struct session *session_new(void) +{ + return calloc(1, sizeof(struct session)); +} + +void session_close(struct session *session) +{ + assert(session->tasks.len == 0 && session->waiting.len == 0); + + if (session->sflags.closing) { + return; + } + + uv_handle_t *handle = session->handle; + io_stop_read(handle); + session->sflags.closing = true; + if (session->sflags.outgoing && + session->peer.ip.sa_family != AF_UNSPEC) { + struct worker_ctx *worker = handle->loop->data; + struct sockaddr *peer = &session->peer.ip; + worker_del_tcp_connected(worker, peer); + session->sflags.connected = false; + } + + if (!uv_is_closing((uv_handle_t *)&session->timeout)) { + uv_timer_stop(&session->timeout); + if (session->tls_client_ctx) { + tls_close(&session->tls_client_ctx->c); + } + if (session->tls_ctx) { + tls_close(&session->tls_ctx->c); + } + + session->timeout.data = session; + uv_close((uv_handle_t *)&session->timeout, on_session_timer_close); + } +} + +int session_start_read(struct session *session) +{ + return io_start_read(session->handle); +} + +int session_waitinglist_add(struct session *session, struct qr_task *task) +{ + for (int i = 0; i < session->waiting.len; ++i) { + if (session->waiting.at[i] == task) { + return i; + } + } + int ret = array_push(session->waiting, task); + if (ret >= 0) { + worker_task_ref(task); + } + return ret; +} + +int session_waitinglist_del(struct session *session, struct qr_task *task) +{ + int ret = kr_error(ENOENT); + for (int i = 0; i < session->waiting.len; ++i) { + if (session->waiting.at[i] == task) { + array_del(session->waiting, i); + worker_task_unref(task); + ret = kr_ok(); + break; + } + } + return ret; +} + +int session_waitinglist_del_index(struct session *session, int index) +{ + int ret = kr_error(ENOENT); + if (index < session->waiting.len) { + struct qr_task *task = session->waiting.at[index]; + array_del(session->waiting, index); + worker_task_unref(task); + ret = kr_ok(); + } + return ret; +} + +int session_tasklist_add(struct session *session, struct qr_task *task) +{ + for (int i = 0; i < session->tasks.len; ++i) { + if (session->tasks.at[i] == task) { + return i; + } + } + int ret = array_push(session->tasks, task); + if (ret >= 0) { + worker_task_ref(task); + } + return ret; +} + +int session_tasklist_del(struct session *session, struct qr_task *task) +{ + int ret = kr_error(ENOENT); + for (int i = 0; i < session->tasks.len; ++i) { + if (session->tasks.at[i] == task) { + array_del(session->tasks, i); + worker_task_unref(task); + ret = kr_ok(); + break; + } + } + return ret; +} + +int session_tasklist_del_index(struct session *session, int index) +{ + int ret = kr_error(ENOENT); + if (index < session->tasks.len) { + struct qr_task *task = session->tasks.at[index]; + array_del(session->tasks, index); + worker_task_unref(task); + ret = kr_ok(); + } + return ret; +} + +struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id) +{ + struct qr_task *ret = NULL; + const session_tasklist_t *tasklist = &session->tasks; + for (size_t i = 0; i < tasklist->len; ++i) { + struct qr_task *task = tasklist->at[i]; + knot_pkt_t *pktbuf = worker_task_get_pktbuf(task); + uint16_t task_msg_id = knot_wire_get_id(pktbuf->wire); + if (task_msg_id == msg_id) { + ret = task; + break; + } + } + return ret; +} + +bool session_is_outgoing(const struct session *session) +{ + return session->sflags.outgoing; +} + +void session_set_outgoing(struct session *session, bool outgoing) +{ + session->sflags.outgoing = outgoing; +} + +bool session_is_closing(const struct session *session) +{ + return session->sflags.closing; +} + +void session_set_closing(struct session *session, bool closing) +{ + session->sflags.closing = closing; +} + +bool session_is_connected(const struct session *session) +{ + return session->sflags.connected; +} + +void session_set_connected(struct session *session, bool connected) +{ + session->sflags.connected = connected; +} + +bool session_is_throttled(const struct session *session) +{ + return session->sflags.throttled; +} + +void session_set_throttled(struct session *session, bool throttled) +{ + session->sflags.throttled = throttled; +} + +struct sockaddr *session_get_peer(struct session *session) +{ + return &session->peer.ip; +} + +struct tls_ctx_t *session_tls_get_server_ctx(const struct session *session) +{ + return session->tls_ctx; +} + +void session_tls_set_server_ctx(struct session *session, struct tls_ctx_t *ctx) +{ + session->tls_ctx = ctx; +} + +struct tls_client_ctx_t *session_tls_get_client_ctx(const struct session *session) +{ + return session->tls_client_ctx; +} + +void session_tls_set_client_ctx(struct session *session, struct tls_client_ctx_t *ctx) +{ + session->tls_client_ctx = ctx; +} + +struct tls_common_ctx *session_tls_get_common_ctx(const struct session *session) +{ + struct tls_common_ctx *tls_ctx = session->sflags.outgoing ? &session->tls_client_ctx->c : + &session->tls_ctx->c; + return tls_ctx; +} + +uv_handle_t *session_get_handle(struct session *session) +{ + return session->handle; +} + +int session_set_handle(struct session *session, uv_handle_t *h) +{ + if (!h) { + return kr_error(EINVAL); + } + + if (h->type == UV_TCP) { + uint8_t *wire_buf = malloc(KNOT_WIRE_MAX_PKTSIZE); + if (!wire_buf) { + return kr_error(ENOMEM); + } + session->wire_buf = wire_buf; + session->wire_buf_size = KNOT_WIRE_MAX_PKTSIZE; + } else if (h->type == UV_UDP) { + assert(h->loop->data); + struct worker_ctx *worker = h->loop->data; + session->wire_buf = worker->wire_buf; + session->wire_buf_size = sizeof(worker->wire_buf); + } + + session->handle = h; + h->data = session; + return kr_ok(); +} + +uv_timer_t *session_get_timer(struct session *session) +{ + return &session->timeout; +} + +size_t session_tasklist_get_len(const struct session *session) +{ + return session->tasks.len; +} + +size_t session_waitinglist_get_len(const struct session *session) +{ + return session->waiting.len; +} + +bool session_tasklist_is_empty(const struct session *session) +{ + return session_tasklist_get_len(session) == 0; +} + +bool session_waitinglist_is_empty(const struct session *session) +{ + return session_waitinglist_get_len(session) == 0; +} + +bool session_is_empty(const struct session *session) +{ + return session_tasklist_is_empty(session) && + session_waitinglist_is_empty(session); +} + +bool session_has_tls(const struct session *session) +{ + return session->sflags.has_tls; +} + +void session_set_has_tls(struct session *session, bool has_tls) +{ + session->sflags.has_tls = has_tls; +} + +struct qr_task *session_waitinglist_get_first(const struct session *session) +{ + struct qr_task *t = NULL; + if (session->waiting.len > 0) { + t = session->waiting.at[0]; + } + return t; +} + +struct qr_task *session_tasklist_get_first(const struct session *session) +{ + struct qr_task *t = NULL; + if (session->tasks.len > 0) { + t = session->tasks.at[0]; + } + return t; +} + +void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt) +{ + while (session->waiting.len > 0) { + struct qr_task *task = session->waiting.at[0]; + session_tasklist_del(session, task); + array_del(session->waiting, 0); + assert(worker_task_numrefs(task) > 1); + if (increase_timeout_cnt) { + worker_task_timeout_inc(task); + } + worker_task_unref(task); + worker_task_step(task, NULL, NULL); + } +} + +void session_waitinglist_finalize(struct session *session, int status) +{ + while (session->waiting.len > 0) { + struct qr_task *t = session->waiting.at[0]; + array_del(session->waiting, 0); + session_tasklist_del(session, t); + if (session->sflags.outgoing) { + worker_task_finalize(t, status); + } else { + struct request_ctx *ctx = worker_task_get_request(t); + assert(worker_request_get_source_session(ctx) == session); + worker_request_set_source_session(ctx, NULL); + } + worker_task_unref(t); + } +} + +void session_tasklist_finalize(struct session *session, int status) +{ + while (session->tasks.len > 0) { + struct qr_task *t = session->tasks.at[0]; + array_del(session->tasks, 0); + if (session->sflags.outgoing) { + worker_task_finalize(t, status); + } else { + struct request_ctx *ctx = worker_task_get_request(t); + assert(worker_request_get_source_session(ctx) == session); + worker_request_set_source_session(ctx, NULL); + } + worker_task_unref(t); + } +} + +void session_tasks_finalize(struct session *session, int status) +{ + session_waitinglist_finalize(session, status); + session_tasklist_finalize(session, status); +} + +int session_timer_start(struct session *session, uv_timer_cb cb, + uint64_t timeout, uint64_t repeat) +{ + uv_timer_t *timer = &session->timeout; + assert(timer->data == session); + int ret = uv_timer_start(timer, cb, timeout, repeat); + if (ret != 0) { + uv_timer_stop(timer); + return kr_error(ENOMEM); + } + return 0; +} + +int session_timer_restart(struct session *session) +{ + return uv_timer_again(&session->timeout); +} + +int session_timer_stop(struct session *session) +{ + return uv_timer_stop(&session->timeout); +} + +ssize_t session_wirebuf_consume(struct session *session, const uint8_t *data, ssize_t len) +{ + if (data != &session->wire_buf[session->wire_buf_idx]) { + /* shouldn't happen */ + return kr_error(EINVAL); + } + + if (session->wire_buf_idx + len > session->wire_buf_size) { + /* shouldn't happen */ + return kr_error(EINVAL); + } + + session->wire_buf_idx += len; + return len; +} + +knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm) +{ + if (session->wire_buf_idx == 0) { + session->sflags.wirebuf_error = false; + return NULL; + } + + const uv_handle_t *handle = session->handle; + uint8_t *msg_start = session->wire_buf; + uint16_t msg_size = session->wire_buf_idx; + + session->sflags.wirebuf_error = true; + if (!handle) { + return NULL; + } else if (handle->type == UV_TCP) { + if (session->wire_buf_idx < 2) { + session->sflags.wirebuf_error = false; + return NULL; + } + msg_size = knot_wire_read_u16(session->wire_buf); + if (msg_size + 2 > session->wire_buf_idx) { + session->sflags.wirebuf_error = false; + return NULL; + } + msg_start += 2; + } + + knot_pkt_t *pkt = knot_pkt_new(msg_start, msg_size, mm); + if (pkt) { + session->sflags.wirebuf_error = false; + } + return pkt; +} + +int session_discard_packet(struct session *session, const knot_pkt_t *pkt) +{ + uv_handle_t *handle = session->handle; + uint8_t *wirebuf_data_start = session->wire_buf; + size_t wirebuf_msg_data_size = session->wire_buf_idx; + uint8_t *wirebuf_msg_start = session->wire_buf; + size_t wirebuf_msg_size = session->wire_buf_idx; + uint8_t *pkt_msg_start = pkt->wire; + size_t pkt_msg_size = pkt->size; + + session->sflags.wirebuf_error = true; + if (!handle) { + return kr_error(EINVAL); + } else if (handle->type == UV_TCP) { + if (session->wire_buf_idx < 2) { + return kr_error(EINVAL); + } + wirebuf_msg_size = knot_wire_read_u16(wirebuf_data_start); + wirebuf_msg_start += 2; + wirebuf_msg_data_size = wirebuf_msg_size + 2; + } + + if (wirebuf_msg_start != pkt_msg_start || wirebuf_msg_size != pkt_msg_size) { + return kr_error(EINVAL); + } + + if (wirebuf_msg_data_size > session->wire_buf_idx) { + return kr_error(EINVAL); + } + + uint16_t wirebuf_data_amount = session->wire_buf_idx - wirebuf_msg_data_size; + if (wirebuf_data_amount) { + if (wirebuf_msg_data_size < wirebuf_data_amount) { + memmove(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size], + wirebuf_data_amount); + } else { + memcpy(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size], + wirebuf_data_amount); + } + } + + session->wire_buf_idx = wirebuf_data_amount; + session->sflags.wirebuf_error = false; + + return kr_ok(); +} + +bool session_wirebuf_error(struct session *session) +{ + return session->sflags.wirebuf_error; +} + +uint8_t *session_wirebuf_get_start(struct session *session) +{ + return session->wire_buf; +} + +size_t session_wirebuf_get_len(struct session *session) +{ + return session->wire_buf_idx; +} + +size_t session_wirebuf_get_size(struct session *session) +{ + return sizeof(session->wire_buf); +} + +uint8_t *session_wirebuf_get_free_start(struct session *session) +{ + return &session->wire_buf[session->wire_buf_idx]; +} + +size_t session_wirebuf_get_free_size(struct session *session) +{ + return session->wire_buf_size - session->wire_buf_idx; +} + +void session_poison(struct session *session) +{ + kr_asan_poison(session, sizeof(*session)); +} + +void session_unpoison(struct session *session) +{ + kr_asan_unpoison(session, sizeof(*session)); +} + +int session_wirebuf_process(struct session *session) +{ + int ret = 0; + if (session->wire_buf_idx == 0) { + return ret; + } + struct worker_ctx *worker = session_get_handle(session)->loop->data; + knot_pkt_t *query = NULL; + while (((query = session_produce_packet(session, &worker->pkt_pool)) != NULL) && (ret < 100)) { + worker_submit(session, query); + if (session_discard_packet(session, query) < 0) { + break; + } + ret += 1; + } + assert(ret < 100); + if (session_wirebuf_error(session)) { + ret = -1; + } + return ret; +} + diff --git a/daemon/session.h b/daemon/session.h new file mode 100644 index 000000000..93f8addaa --- /dev/null +++ b/daemon/session.h @@ -0,0 +1,142 @@ +/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <stdbool.h> +#include <uv.h> +#include "lib/generic/array.h" + +struct qr_task; +struct worker_ctx; +struct session; + +/* Allocate new session. */ +struct session *session_new(void); +/* Clear and free given session. */ +void session_free(struct session *s); +/* Clear session. */ +void session_clear(struct session *s); +/** Close session. */ +void session_close(struct session *session); +/** Start reading from underlying libuv IO handle. */ +int session_start_read(struct session *session); + +/** List of tasks been waiting for IO. */ +/** Check if list is empty. */ +bool session_waitinglist_is_empty(const struct session *session); +/** Get the first element. */ +struct qr_task *session_waitinglist_get_first(const struct session *session); +/** Get the list length. */ +size_t session_waitinglist_get_len(const struct session *session); +/** Add task to the list. */ +int session_waitinglist_add(struct session *session, struct qr_task *task); +/** Remove task from the list. */ +int session_waitinglist_del(struct session *session, struct qr_task *task); +/** Remove task from the list by index. */ +int session_waitinglist_del_index(struct session *session, int index); +/** Retry resolution for each task in the list. */ +void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt); +/** Finalize all tasks in the list. */ +void session_waitinglist_finalize(struct session *session, int status); + +/** List of tasks associated with session. */ +/** Check if list is empty. */ +bool session_tasklist_is_empty(const struct session *session); +/** Get the first element. */ +struct qr_task *session_tasklist_get_first(const struct session *session); +/** Get the list length. */ +size_t session_tasklist_get_len(const struct session *session); +/** Add task to the list. */ +int session_tasklist_add(struct session *session, struct qr_task *task); +/** Remove task from the list. */ +int session_tasklist_del(struct session *session, struct qr_task *task); +/** Remove task from the list by index. */ +int session_tasklist_del_index(struct session *session, int index); +/** Find task with given msg_id */ +struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id); +/** Finalize all tasks in the list. */ +void session_tasklist_finalize(struct session *session, int status); + +/** Both of task lists (associated & waiting). */ +/** Check if empty. */ +bool session_is_empty(const struct session *session); +/** Finalize all tasks. */ +void session_tasks_finalize(struct session *session, int status); + +/** Operations with flags */ +bool session_is_outgoing(const struct session *session); +void session_set_outgoing(struct session *session, bool outgoing); +bool session_is_closing(const struct session *session); +void session_set_closing(struct session *session, bool closing); +bool session_is_connected(const struct session *session); +void session_set_connected(struct session *session, bool connected); +bool session_is_throttled(const struct session *session); +void session_set_throttled(struct session *session, bool throttled); +bool session_has_tls(const struct session *session); +void session_set_has_tls(struct session *session, bool has_tls); +bool session_wirebuf_error(struct session *session); + +/** Get peer address. */ +struct sockaddr *session_get_peer(struct session *session); +/** Get pointer to server-side tls-related data. */ +struct tls_ctx_t *session_tls_get_server_ctx(const struct session *session); +/** Set pointer to server-side tls-related data. */ +void session_tls_set_server_ctx(struct session *session, struct tls_ctx_t *ctx); +/** Get pointer to client-side tls-related data. */ +struct tls_client_ctx_t *session_tls_get_client_ctx(const struct session *session); +/** Set pointer to client-side tls-related data. */ +void session_tls_set_client_ctx(struct session *session, struct tls_client_ctx_t *ctx); +/** Get pointer to that part of tls-related data which has common structure for + * server and client. */ +struct tls_common_ctx *session_tls_get_common_ctx(const struct session *session); + +/** Get pointer to underlying libuv handle for IO operations. */ +uv_handle_t *session_get_handle(struct session *session); +/** Set pointer to libuv handle for IO operations. */ +int session_set_handle(struct session *session, uv_handle_t *handle); + +/** Get pointer to session timer handle. */ +uv_timer_t *session_get_timer(struct session *session); +/** Start session timer. */ +int session_timer_start(struct session *session, uv_timer_cb cb, + uint64_t timeout, uint64_t repeat); +/** Restart session timer without changing it parameters. */ +int session_timer_restart(struct session *session); +/** Stop session timer. */ +int session_timer_stop(struct session *session); + +/** Get start of session buffer for wire data. */ +uint8_t *session_wirebuf_get_start(struct session *session); +/** Get size of session wirebuffer. */ +size_t session_wirebuf_get_size(struct session *session); +/** Get length of data in the session wirebuffer. */ +size_t session_wirebuf_get_len(struct session *session); +/** Get start of free space in session wirebuffer. */ +uint8_t *session_wirebuf_get_free_start(struct session *session); +/** Get amount of free space in session wirebuffer. */ +size_t session_wirebuf_get_free_size(struct session *session); +int session_wirebuf_process(struct session *session); +ssize_t session_wirebuf_consume(struct session *session, + const uint8_t *data, ssize_t len); + +/** poison session structure with ASAN. */ +void session_poison(struct session *session); +/** unpoison session structure with ASAN. */ +void session_unpoison(struct session *session); + +knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm); +int session_discard_packet(struct session *session, const knot_pkt_t *pkt); diff --git a/daemon/tls.c b/daemon/tls.c index 0fdc6f9e1..65393a4e4 100644 --- a/daemon/tls.c +++ b/daemon/tls.c @@ -34,6 +34,7 @@ #include "daemon/io.h" #include "daemon/tls.h" #include "daemon/worker.h" +#include "daemon/session.h" #define EPHEMERAL_CERT_EXPIRATION_SECONDS_RENEW_BEFORE 60*60*24*7 #define GNUTLS_PIN_MIN_VERSION 0x030400 @@ -354,9 +355,10 @@ void tls_close(struct tls_common_ctx *ctx) assert(ctx->session); if (ctx->handshake_state == TLS_HS_DONE) { + const struct sockaddr *peer = session_get_peer(ctx->session); kr_log_verbose("[%s] closing tls connection to `%s`\n", ctx->client_side ? "tls_client" : "tls", - kr_straddr(&ctx->session->peer.ip)); + kr_straddr(peer)); ctx->handshake_state = TLS_HS_CLOSING; gnutls_bye(ctx->tls_session, GNUTLS_SHUT_RDWR); } @@ -384,12 +386,11 @@ int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb return kr_error(EINVAL); } - struct session *session = handle->data; - struct tls_common_ctx *tls_ctx = session->outgoing ? &session->tls_client_ctx->c : - &session->tls_ctx->c; + struct session *s = handle->data; + struct tls_common_ctx *tls_ctx = session_tls_get_common_ctx(s); assert (tls_ctx); - assert (session->outgoing == tls_ctx->client_side); + assert (session_is_outgoing(s) == tls_ctx->client_side); const uint16_t pkt_size = htons(pkt->size); const char *logstring = tls_ctx->client_side ? client_logstring : server_logstring; @@ -426,17 +427,16 @@ int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb return kr_ok(); } -int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *buf, ssize_t nread) +ssize_t tls_process_input_data(struct session *s, const uint8_t *buf, ssize_t nread) { - struct session *session = handle->data; - struct tls_common_ctx *tls_p = session->outgoing ? &session->tls_client_ctx->c : - &session->tls_ctx->c; + struct tls_common_ctx *tls_p = session_tls_get_common_ctx(s); if (!tls_p) { return kr_error(ENOSYS); } - assert(tls_p->session == session); - + assert(tls_p->session == s); + assert(tls_p->recv_buf == buf && nread <= sizeof(tls_p->recv_buf)); + const char *logstring = tls_p->client_side ? client_logstring : server_logstring; tls_p->buf = buf; @@ -455,9 +455,13 @@ int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *b } /* See https://gnutls.org/manual/html_node/Data-transfer-and-termination.html#Data-transfer-and-termination */ - int submitted = 0; + ssize_t submitted = 0; + bool is_retrying = false; + uint64_t retrying_start = 0; + uint8_t *wire_buf = session_wirebuf_get_free_start(s); + size_t wire_buf_size = session_wirebuf_get_free_size(s); while (true) { - ssize_t count = gnutls_record_recv(tls_p->tls_session, tls_p->recv_buf, sizeof(tls_p->recv_buf)); + ssize_t count = gnutls_record_recv(tls_p->tls_session, wire_buf, wire_buf_size); if (count == GNUTLS_E_AGAIN) { break; /* No data available */ } else if (count == GNUTLS_E_INTERRUPTED) { @@ -479,17 +483,15 @@ int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *b kr_log_verbose("[%s] gnutls_record_recv failed: %s (%zd)\n", logstring, gnutls_strerror_name(count), count); return kr_error(EIO); - } - DEBUG_MSG("[%s] submitting %zd data to worker\n", logstring, count); - int ret = worker_process_tcp(worker, handle, tls_p->recv_buf, count); - if (ret < 0) { - return ret; - } - if (count <= 0) { + } else if (count == 0) { break; } - submitted += ret; + DEBUG_MSG("[%s] received %zd data\n", logstring, count); + wire_buf += count; + wire_buf_size -= count; + submitted += count; } + assert(tls_p->consumed == tls_p->nread); return submitted; } @@ -1127,13 +1129,13 @@ int tls_client_connect_start(struct tls_client_ctx_t *client_ctx, return kr_error(EINVAL); } - assert(session->outgoing && session->handle->type == UV_TCP); + assert(session_is_outgoing(session) && session_get_handle(session)->type == UV_TCP); struct tls_common_ctx *ctx = &client_ctx->c; gnutls_session_set_ptr(ctx->tls_session, client_ctx); gnutls_handshake_set_timeout(ctx->tls_session, ctx->worker->engine->net.tcp.tls_handshake_timeout); - session->tls_client_ctx = client_ctx; + session_tls_set_client_ctx(session, client_ctx); ctx->handshake_cb = handshake_cb; ctx->handshake_state = TLS_HS_IN_PROGRESS; ctx->session = session; diff --git a/daemon/tls.h b/daemon/tls.h index d208f4cb8..1bfa6ef6d 100644 --- a/daemon/tls.h +++ b/daemon/tls.h @@ -134,7 +134,7 @@ int tls_write(uv_write_t *req, uv_handle_t* handle, knot_pkt_t * pkt, uv_write_c /*! Unwrap incoming data from a TLS stream and pass them to TCP session. * @return the number of newly-completed requests (>=0) or an error code */ -int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *buf, ssize_t nread); +ssize_t tls_process_input_data(struct session *s, const uint8_t *buf, ssize_t nread); /*! Set TLS certificate and key from files. */ int tls_certificate_set(struct network *net, const char *tls_cert, const char *tls_key); diff --git a/daemon/worker.c b/daemon/worker.c index 5a6e2e583..d62633ab3 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -36,6 +36,7 @@ #include "daemon/io.h" #include "daemon/tls.h" #include "daemon/zimport.h" +#include "daemon/session.h" #define VERBOSE_MSG(qry, fmt...) QRVERBOSE(qry, "wrkr", fmt) @@ -97,11 +98,6 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt); static int qr_task_finalize(struct qr_task *task, int state); static void qr_task_complete(struct qr_task *task); -static int worker_add_tcp_connected(struct worker_ctx *worker, - const struct sockaddr *addr, - struct session *session); -static int worker_del_tcp_connected(struct worker_ctx *worker, - const struct sockaddr *addr); static struct session* worker_find_tcp_connected(struct worker_ctx *worker, const struct sockaddr *addr); static int worker_add_tcp_waiting(struct worker_ctx *worker, @@ -111,14 +107,7 @@ static int worker_del_tcp_waiting(struct worker_ctx *worker, const struct sockaddr *addr); static struct session* worker_find_tcp_waiting(struct worker_ctx *worker, const struct sockaddr *addr); -static int session_add_waiting(struct session *session, struct qr_task *task); -static int session_del_waiting(struct session *session, struct qr_task *task); -static int session_add_tasks(struct session *session, struct qr_task *task); -static int session_del_tasks(struct session *session, struct qr_task *task); -static void session_close(struct session *session); static void on_session_idle_timeout(uv_timer_t *timer); -static int timer_start(struct session *session, uv_timer_cb cb, - uint64_t timeout, uint64_t repeat); static void on_tcp_connect_timeout(uv_timer_t *timer); static void on_tcp_watchdog_timeout(uv_timer_t *timer); @@ -248,8 +237,8 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t /* Set current handle as a subrequest type. */ struct session *session = handle->data; if (ret == 0) { - session->outgoing = true; - ret = session_add_tasks(session, task); + session_set_outgoing(session, true); + ret = session_tasklist_add(session, task); } if (ret < 0) { io_deinit(handle); @@ -262,75 +251,53 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t return handle; } -static void on_session_close(uv_handle_t *handle) -{ - uv_loop_t *loop = handle->loop; - struct worker_ctx *worker = loop->data; - struct session *session = handle->data; - assert(session->handle == handle); - session->handle = NULL; - io_deinit(handle); - iohandle_release(worker, handle); -} - -static void on_session_timer_close(uv_handle_t *timer) -{ - struct session *session = timer->data; - uv_handle_t *handle = session->handle; - assert(handle && handle->data == session); - assert (session->outgoing || handle->type == UV_TCP); - if (!uv_is_closing(handle)) { - uv_close(handle, on_session_close); - } -} - static void ioreq_kill_udp(uv_handle_t *req, struct qr_task *task) { assert(req); - struct session *session = req->data; - assert(session->outgoing); - if (session->closing) { + struct session *s = req->data; + assert(session_is_outgoing(s)); + if (session_is_closing(s)) { return; } - uv_timer_stop(&session->timeout); - session_del_tasks(session, task); - assert(session->tasks.len == 0); - session_close(session); + uv_timer_t *t = session_get_timer(s); + uv_timer_stop(t); + session_tasklist_del(s, task); + assert(session_tasklist_is_empty(s)); + session_close(s); } static void ioreq_kill_tcp(uv_handle_t *req, struct qr_task *task) { assert(req); - struct session *session = req->data; - assert(session->outgoing); - if (session->closing) { + struct session *s = req->data; + assert(session_is_outgoing(s)); + if (session_is_closing(s)) { return; } - session_del_waiting(session, task); - session_del_tasks(session, task); + session_waitinglist_del(s, task); + session_tasklist_del(s, task); int res = 0; - if (session->outgoing && session->peer.ip.sa_family != AF_UNSPEC && - session->tasks.len == 0 && session->waiting.len == 0 && !session->closing) { - assert(session->peer.ip.sa_family == AF_INET || - session->peer.ip.sa_family == AF_INET6); + const struct sockaddr *peer = session_get_peer(s); + if (peer->sa_family != AF_UNSPEC && session_is_empty(s) && !session_is_closing(s)) { + assert(peer->sa_family == AF_INET || peer->sa_family == AF_INET6); res = 1; - if (session->connected) { + if (session_is_connected(s)) { /* This is outbound TCP connection which can be reused. * Close it after timeout */ - uv_timer_t *timer = &session->timeout; - timer->data = session; - uv_timer_stop(timer); - res = uv_timer_start(timer, on_session_idle_timeout, + uv_timer_t *t = session_get_timer(s); + t->data = s; + uv_timer_stop(t); + res = uv_timer_start(t, on_session_idle_timeout, KR_CONN_RTT_MAX, 0); } } if (res != 0) { /* if any errors, close the session immediately */ - session_close(session); + session_close(s); } } @@ -348,100 +315,6 @@ static void ioreq_kill_pending(struct qr_task *task) task->pending_count = 0; } -static void session_close(struct session *session) -{ - assert(session->tasks.len == 0 && session->waiting.len == 0); - - if (session->closing) { - return; - } - - if (!session->outgoing && session->buffering != NULL) { - qr_task_complete(session->buffering); - } - session->buffering = NULL; - - uv_handle_t *handle = session->handle; - io_stop_read(handle); - session->closing = true; - if (session->outgoing && - session->peer.ip.sa_family != AF_UNSPEC) { - struct worker_ctx *worker = get_worker(); - struct sockaddr *peer = &session->peer.ip; - worker_del_tcp_connected(worker, peer); - session->connected = false; - } - - if (!uv_is_closing((uv_handle_t *)&session->timeout)) { - uv_timer_stop(&session->timeout); - if (session->tls_client_ctx) { - tls_close(&session->tls_client_ctx->c); - } - if (session->tls_ctx) { - tls_close(&session->tls_ctx->c); - } - - session->timeout.data = session; - uv_close((uv_handle_t *)&session->timeout, on_session_timer_close); - } -} - -static int session_add_waiting(struct session *session, struct qr_task *task) -{ - for (int i = 0; i < session->waiting.len; ++i) { - if (session->waiting.at[i] == task) { - return i; - } - } - int ret = array_push(session->waiting, task); - if (ret >= 0) { - qr_task_ref(task); - } - return ret; -} - -static int session_del_waiting(struct session *session, struct qr_task *task) -{ - int ret = kr_error(ENOENT); - for (int i = 0; i < session->waiting.len; ++i) { - if (session->waiting.at[i] == task) { - array_del(session->waiting, i); - qr_task_unref(task); - ret = kr_ok(); - break; - } - } - return ret; -} - -static int session_add_tasks(struct session *session, struct qr_task *task) -{ - for (int i = 0; i < session->tasks.len; ++i) { - if (session->tasks.at[i] == task) { - return i; - } - } - int ret = array_push(session->tasks, task); - if (ret >= 0) { - qr_task_ref(task); - } - return ret; -} - -static int session_del_tasks(struct session *session, struct qr_task *task) -{ - int ret = kr_error(ENOENT); - for (int i = 0; i < session->tasks.len; ++i) { - if (session->tasks.at[i] == task) { - array_del(session->tasks, i); - qr_task_unref(task); - ret = kr_ok(); - break; - } - } - return ret; -} - /** @cond This memory layout is internal to mempool.c, use only for debugging. */ #if defined(__SANITIZE_ADDRESS__) struct mempool_chunk { @@ -530,11 +403,11 @@ static struct request_ctx *request_create(struct worker_ctx *worker, /* TODO Relocate pool to struct request */ ctx->worker = worker; array_init(ctx->tasks); - struct session *session = handle ? handle->data : NULL; - if (session) { - assert(session->outgoing == false); + struct session *s = handle ? handle->data : NULL; + if (s) { + assert(session_is_outgoing(s) == false); } - ctx->source.session = session; + ctx->source.session = s; struct kr_request *req = &ctx->req; req->pool = pool; @@ -584,8 +457,8 @@ static int request_start(struct request_ctx *ctx, knot_pkt_t *query) struct kr_request *req = &ctx->req; /* source.session can be empty if request was generated by kresd itself */ - if (!ctx->source.session || - ctx->source.session->handle->type == UV_TCP) { + struct session *s = ctx->source.session; + if (!s || session_get_handle(s)->type == UV_TCP) { answer_max = KNOT_WIRE_MAX_PKTSIZE; } else if (knot_pkt_has_edns(query)) { /* EDNS */ answer_max = MAX(knot_edns_get_payload(query->opt_rr), @@ -679,7 +552,6 @@ static int request_del_tasks(struct request_ctx *ctx, struct qr_task *task) return ret; } - static struct qr_task *qr_task_create(struct request_ctx *ctx) { /* How much can client handle? */ @@ -695,7 +567,7 @@ static struct qr_task *qr_task_create(struct request_ctx *ctx) if (!task) { return NULL; } - memset(task, 0, sizeof(*task)); /* avoid accidentally unitialized fields */ + memset(task, 0, sizeof(*task)); /* avoid accidentally unintialized fields */ /* Create packet buffers for answer and subrequests */ knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &ctx->req.pool); @@ -727,19 +599,18 @@ static void qr_task_free(struct qr_task *task) assert(ctx); /* Process outbound session. */ - struct session *source_session = ctx->source.session; + struct session *s = ctx->source.session; struct worker_ctx *worker = ctx->worker; /* Process source session. */ - if (source_session && - source_session->tasks.len < worker->tcp_pipeline_max/2 && - !source_session->closing && source_session->throttled) { - uv_handle_t *handle = source_session->handle; + if (s && session_tasklist_get_len(s) < worker->tcp_pipeline_max/2 && + !session_is_closing(s) && !session_is_throttled(s)) { + uv_handle_t *handle = session_get_handle(s); /* Start reading again if the session is throttled and * the number of outgoing requests is below watermark. */ if (handle) { io_start_read(handle); - source_session->throttled = false; + session_set_throttled(s, false); } } @@ -755,15 +626,14 @@ static void qr_task_free(struct qr_task *task) /*@ Register new qr_task within session. */ static int qr_task_register(struct qr_task *task, struct session *session) { - assert(session->outgoing == false && session->handle->type == UV_TCP); + assert(session_is_outgoing(session) == false && + session_get_handle(session)->type == UV_TCP); - int ret = array_reserve(session->tasks, session->tasks.len + 1); - if (ret != 0) { + int ret = session_tasklist_add(session, task); + if (ret < 0) { return kr_error(ENOMEM); } - session_add_tasks(session, task); - struct request_ctx *ctx = task->ctx; assert(ctx && (ctx->source.session == NULL || ctx->source.session == session)); ctx->source.session = session; @@ -772,11 +642,11 @@ static int qr_task_register(struct qr_task *task, struct session *session) * an in effect shrink TCP window size. To get more precise throttling, * we would need to copy remainder of the unread buffer and reassemble * when resuming reading. This is NYI. */ - if (session->tasks.len >= task->ctx->worker->tcp_pipeline_max) { - uv_handle_t *handle = session->handle; - if (handle && !session->throttled && !session->closing) { + if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max) { + uv_handle_t *handle = session_get_handle(session); + if (handle && !session_is_throttled(session) && !session_is_closing(session)) { io_stop_read(handle); - session->throttled = true; + session_set_throttled(session, true); } } @@ -792,11 +662,10 @@ static void qr_task_complete(struct qr_task *task) assert(task->waiting.len == 0); assert(task->leading == false); - struct session *source_session = ctx->source.session; - if (source_session) { - assert(source_session->outgoing == false && - source_session->waiting.len == 0); - session_del_tasks(source_session, task); + struct session *s = ctx->source.session; + if (s) { + assert(!session_is_outgoing(s) && session_waitinglist_is_empty(s)); + session_tasklist_del(s, task); } /* Release primary reference to task. */ @@ -812,23 +681,25 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status if (!handle || handle->type != UV_TCP) { return status; } - struct session* session = handle->data; - assert(session); - if (!session->outgoing || - session->waiting.len == 0) { + struct session* s = handle->data; + assert(s); + if (!session_is_outgoing(s) || session_waitinglist_is_empty(s)) { return status; } } if (handle) { - struct session* session = handle->data; - if (!session->outgoing && task->ctx->source.session) { - assert (task->ctx->source.session->handle == handle); + struct session* s = handle->data; + bool outgoing = session_is_outgoing(s); + if (!outgoing) { + struct session* source_s = task->ctx->source.session; + if (source_s) { + assert (session_get_handle(source_s) == handle); + } } - if (handle->type == UV_TCP && session->outgoing && - session->waiting.len > 0) { - session_del_waiting(session, task); - if (session->closing) { + if (handle->type == UV_TCP && outgoing && !session_waitinglist_is_empty(s)) { + session_waitinglist_del(s, task); + if (session_is_closing(s)) { return status; } /* Finalize the task, if any errors. @@ -837,46 +708,27 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status * (for instance: tls; send->tls_push->too many non-critical errors-> * on_send with nonzero status->re-add to waiting->send->etc).*/ if (status != 0) { - if (session->outgoing) { + if (outgoing) { qr_task_finalize(task, KR_STATE_FAIL); } else { - assert(task->ctx->source.session == session); + assert(task->ctx->source.session == s); task->ctx->source.session = NULL; } - session_del_tasks(session, task); + session_tasklist_del(s, task); } - if (session->waiting.len > 0) { - struct qr_task *t = session->waiting.at[0]; - int ret = qr_task_send(t, handle, &session->peer.ip, t->pktbuf); + struct qr_task *waiting_task = session_waitinglist_get_first(s); + if (waiting_task) { + struct sockaddr *peer = session_get_peer(s); + knot_pkt_t *pkt = waiting_task->pktbuf; + int ret = qr_task_send(waiting_task, handle, peer, pkt); if (ret != kr_ok()) { - while (session->waiting.len > 0) { - struct qr_task *t = session->waiting.at[0]; - if (session->outgoing) { - qr_task_finalize(t, KR_STATE_FAIL); - } else { - assert(t->ctx->source.session == session); - t->ctx->source.session = NULL; - } - array_del(session->waiting, 0); - session_del_tasks(session, t); - qr_task_unref(t); - } - while (session->tasks.len > 0) { - struct qr_task *t = session->tasks.at[0]; - if (session->outgoing) { - qr_task_finalize(t, KR_STATE_FAIL); - } else { - assert(t->ctx->source.session == session); - t->ctx->source.session = NULL; - } - session_del_tasks(session, t); - } - session_close(session); + session_tasks_finalize(s, KR_STATE_FAIL); + session_close(s); return status; } } } - if (!session->closing) { + if (!session_is_closing(s)) { io_start_read(handle); /* Start reading new query */ } } @@ -989,7 +841,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, /* Update statistics */ if (ctx->source.session && - handle != ctx->source.session->handle && + handle != session_get_handle(ctx->source.session) && addr) { if (session->has_tls) worker->stats.tls += 1; @@ -1009,31 +861,35 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, static int session_next_waiting_send(struct session *session) { - union inaddr *peer = &session->peer; int ret = kr_ok(); - if (session->waiting.len > 0) { - struct qr_task *task = session->waiting.at[0]; - ret = qr_task_send(task, session->handle, &peer->ip, task->pktbuf); + if (!session_waitinglist_is_empty(session)) { + struct sockaddr *peer = session_get_peer(session); + struct qr_task *task = session_waitinglist_get_first(session); + uv_handle_t *handle = session_get_handle(session); + ret = qr_task_send(task, handle, peer, task->pktbuf); } return ret; } static int session_tls_hs_cb(struct session *session, int status) { - struct worker_ctx *worker = get_worker(); - union inaddr *peer = &session->peer; - int deletion_res = worker_del_tcp_waiting(worker, &peer->ip); + assert(session_is_outgoing(session)); + uv_handle_t *handle = session_get_handle(session); + uv_loop_t *loop = handle->loop; + struct worker_ctx *worker = loop->data; + struct sockaddr *peer = session_get_peer(session); + int deletion_res = worker_del_tcp_waiting(worker, peer); int ret = kr_ok(); if (status) { - kr_nsrep_update_rtt(NULL, &peer->ip, KR_NS_DEAD, + kr_nsrep_update_rtt(NULL, peer, KR_NS_DEAD, worker->engine->resolver.cache_rtt, KR_NS_UPDATE_NORESET); return ret; } /* handshake was completed successfully */ - struct tls_client_ctx_t *tls_client_ctx = session->tls_client_ctx; + struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session); struct tls_client_paramlist_entry *tls_params = tls_client_ctx->params; gnutls_session_t tls_session = tls_client_ctx->c.tls_session; if (gnutls_session_is_resumed(tls_session) != 0) { @@ -1054,7 +910,7 @@ static int session_tls_hs_cb(struct session *session, int status) } } - ret = worker_add_tcp_connected(worker, &peer->ip, session); + ret = worker_add_tcp_connected(worker, peer, session); if (deletion_res == kr_ok() && ret == kr_ok()) { ret = session_next_waiting_send(session); } else { @@ -1066,118 +922,99 @@ static int session_tls_hs_cb(struct session *session, int status) * Session isn't in the list of waiting sessions, * or addition to the list of connected sessions failed, * or write to upstream failed. */ - while (session->waiting.len > 0) { - struct qr_task *task = session->waiting.at[0]; - session_del_tasks(session, task); - array_del(session->waiting, 0); - qr_task_finalize(task, KR_STATE_FAIL); - qr_task_unref(task); - } - worker_del_tcp_connected(worker, &peer->ip); - assert(session->tasks.len == 0); + session_waitinglist_finalize(session, KR_STATE_FAIL); + worker_del_tcp_connected(worker, peer); + assert(session_tasklist_is_empty(session)); session_close(session); } else { - uv_timer_stop(&session->timeout); - session->timeout.data = session; - timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0); + uv_timer_t *t = session_get_timer(session); + uv_timer_stop(t); + t->data = session; + session_timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0); } return kr_ok(); } -static struct kr_query *session_current_query(struct session *session) -{ - if (session->waiting.len == 0) { - return NULL; - } - struct qr_task *task = session->waiting.at[0]; - if (task->ctx->req.rplan.pending.len == 0) { +static struct kr_query *task_get_last_pending_query(struct qr_task *task) +{ + if (!task || task->ctx->req.rplan.pending.len == 0) { return NULL; } return array_tail(task->ctx->req.rplan.pending); } + static void on_connect(uv_connect_t *req, int status) { struct worker_ctx *worker = get_worker(); uv_stream_t *handle = req->handle; struct session *session = handle->data; - union inaddr *peer = &session->peer; + struct sockaddr *peer = session_get_peer(session); + + assert(session_is_outgoing(session)); if (status == UV_ECANCELED) { - worker_del_tcp_waiting(worker, &peer->ip); - assert(session->closing && session->waiting.len == 0 && session->tasks.len == 0); + worker_del_tcp_waiting(worker, peer); + assert(session_is_empty(session) && session_is_closing(session)); iorequest_release(worker, req); return; } - if (session->closing) { - worker_del_tcp_waiting(worker, &peer->ip); - assert(session->waiting.len == 0 && session->tasks.len == 0); + if (session_is_closing(session)) { + worker_del_tcp_waiting(worker, peer); + assert(session_is_empty(session)); iorequest_release(worker, req); return; } - uv_timer_stop(&session->timeout); + uv_timer_t *t = session_get_timer(session); + uv_timer_stop(t); if (status != 0) { - worker_del_tcp_waiting(worker, &peer->ip); - while (session->waiting.len > 0) { - struct qr_task *task = session->waiting.at[0]; - session_del_tasks(session, task); - array_del(session->waiting, 0); - assert(task->refs > 1); - qr_task_unref(task); - qr_task_step(task, NULL, NULL); - } - assert(session->tasks.len == 0); + worker_del_tcp_waiting(worker, peer); + session_waitinglist_retry(session, false); + assert(session_tasklist_is_empty(session)); iorequest_release(worker, req); session_close(session); return; } - if (!session->has_tls) { + if (!session_has_tls(session)) { /* if there is a TLS, session still waiting for handshake, * otherwise remove it from waiting list */ - if (worker_del_tcp_waiting(worker, &peer->ip) != 0) { + if (worker_del_tcp_waiting(worker, peer) != 0) { /* session isn't in list of waiting queries, * * something gone wrong */ - while (session->waiting.len > 0) { - struct qr_task *task = session->waiting.at[0]; - session_del_tasks(session, task); - array_del(session->waiting, 0); - ioreq_kill_pending(task); - assert(task->pending_count == 0); - qr_task_finalize(task, KR_STATE_FAIL); - qr_task_unref(task); - } - assert(session->tasks.len == 0); + session_waitinglist_finalize(session, KR_STATE_FAIL); + assert(session_tasklist_is_empty(session)); iorequest_release(worker, req); session_close(session); return; } } - struct kr_query *qry = session_current_query(session); + struct qr_task *task = session_waitinglist_get_first(session); + struct kr_query *qry = task_get_last_pending_query(task); WITH_VERBOSE (qry) { - char addr_str[INET6_ADDRSTRLEN]; - inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip), - addr_str, sizeof(addr_str)); - VERBOSE_MSG(qry, "=> connected to '%s'\n", addr_str); + struct sockaddr *peer = session_get_peer(session); + char peer_str[INET6_ADDRSTRLEN]; + inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str)); + VERBOSE_MSG(qry, "=> connected to '%s'\n", peer_str); } - session->connected = true; - session->handle = (uv_handle_t *)handle; + session_set_connected(session, true); + session_set_handle(session,(uv_handle_t *)handle); int ret = kr_ok(); - if (session->has_tls) { - ret = tls_client_connect_start(session->tls_client_ctx, - session, session_tls_hs_cb); + if (session_has_tls(session)) { + struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session); + ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb); if (ret == kr_error(EAGAIN)) { iorequest_release(worker, req); - io_start_read(session->handle); - timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0); + session_start_read(session); + session_timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0); return; } } @@ -1185,25 +1022,16 @@ static void on_connect(uv_connect_t *req, int status) if (ret == kr_ok()) { ret = session_next_waiting_send(session); if (ret == kr_ok()) { - timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0); - worker_add_tcp_connected(worker, &session->peer.ip, session); + session_timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0); + struct sockaddr *peer = session_get_peer(session); + worker_add_tcp_connected(worker, peer, session); iorequest_release(worker, req); return; } } - while (session->waiting.len > 0) { - struct qr_task *task = session->waiting.at[0]; - session_del_tasks(session, task); - array_del(session->waiting, 0); - ioreq_kill_pending(task); - assert(task->pending_count == 0); - qr_task_finalize(task, KR_STATE_FAIL); - qr_task_unref(task); - } - - assert(session->tasks.len == 0); - + session_waitinglist_finalize(session, KR_STATE_FAIL); + assert(session_tasklist_is_empty(session)); iorequest_release(worker, req); session_close(session); } @@ -1215,76 +1043,48 @@ static void on_tcp_connect_timeout(uv_timer_t *timer) uv_timer_stop(timer); struct worker_ctx *worker = get_worker(); - assert (session->waiting.len == session->tasks.len); + assert (session_waitinglist_get_len(session) == session_tasklist_get_len(session)); - union inaddr *peer = &session->peer; - worker_del_tcp_waiting(worker, &peer->ip); + struct sockaddr *peer = session_get_peer(session); + worker_del_tcp_waiting(worker, peer); - struct kr_query *qry = session_current_query(session); + struct qr_task *task = session_waitinglist_get_first(session); + struct kr_query *qry = task_get_last_pending_query(task); WITH_VERBOSE (qry) { - char addr_str[INET6_ADDRSTRLEN]; - inet_ntop(peer->ip.sa_family, kr_inaddr(&peer->ip), addr_str, sizeof(addr_str)); - VERBOSE_MSG(qry, "=> connection to '%s' failed\n", addr_str); + char peer_str[INET6_ADDRSTRLEN]; + inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str)); + VERBOSE_MSG(qry, "=> connection to '%s' failed\n", peer_str); } - kr_nsrep_update_rtt(NULL, &peer->ip, KR_NS_DEAD, + kr_nsrep_update_rtt(NULL, peer, KR_NS_DEAD, worker->engine->resolver.cache_rtt, KR_NS_UPDATE_NORESET); - while (session->waiting.len > 0) { - struct qr_task *task = session->waiting.at[0]; - assert(task->ctx); - task->timeouts += 1; - worker->stats.timeout += 1; - session_del_tasks(session, task); - array_del(session->waiting, 0); - assert(task->refs > 1); - qr_task_unref(task); - qr_task_step(task, NULL, NULL); - } - - assert (session->tasks.len == 0); + worker->stats.timeout += session_waitinglist_get_len(session); + session_waitinglist_retry(session, true); + assert (session_tasklist_is_empty(session)); session_close(session); } static void on_tcp_watchdog_timeout(uv_timer_t *timer) { struct session *session = timer->data; + struct worker_ctx *worker = timer->loop->data; + struct sockaddr *peer = session_get_peer(session); + + assert(session_is_outgoing(session)); - assert(session->outgoing); uv_timer_stop(timer); - struct worker_ctx *worker = get_worker(); - if (session->outgoing) { - if (session->has_tls) { - worker_del_tcp_waiting(worker, &session->peer.ip); - } - worker_del_tcp_connected(worker, &session->peer.ip); - - while (session->waiting.len > 0) { - struct qr_task *task = session->waiting.at[0]; - task->timeouts += 1; - worker->stats.timeout += 1; - array_del(session->waiting, 0); - session_del_tasks(session, task); - ioreq_kill_pending(task); - assert(task->pending_count == 0); - qr_task_finalize(task, KR_STATE_FAIL); - qr_task_unref(task); - } - } - while (session->tasks.len > 0) { - struct qr_task *task = session->tasks.at[0]; - task->timeouts += 1; - worker->stats.timeout += 1; - assert(task->refs > 1); - array_del(session->tasks, 0); - ioreq_kill_pending(task); - assert(task->pending_count == 0); - qr_task_finalize(task, KR_STATE_FAIL); - qr_task_unref(task); + if (session_has_tls(session)) { + worker_del_tcp_waiting(worker, peer); } + worker_del_tcp_connected(worker, peer); + worker->stats.timeout += session_waitinglist_get_len(session); + session_waitinglist_finalize(session, KR_STATE_FAIL); + worker->stats.timeout += session_tasklist_get_len(session); + session_tasklist_finalize(session, KR_STATE_FAIL); session_close(session); } @@ -1292,14 +1092,14 @@ static void on_tcp_watchdog_timeout(uv_timer_t *timer) static void on_udp_timeout(uv_timer_t *timer) { struct session *session = timer->data; - assert(session->handle->data == session); + assert(session_get_handle(session)->data == session); + assert(session_tasklist_get_len(session) == 1); + assert(session_waitinglist_is_empty(session)); uv_timer_stop(timer); - assert(session->tasks.len == 1); - assert(session->waiting.len == 0); /* Penalize all tried nameservers with a timeout. */ - struct qr_task *task = session->tasks.at[0]; + struct qr_task *task = session_tasklist_get_first(session); struct worker_ctx *worker = task->ctx->worker; if (task->leading && task->pending_count > 0) { struct kr_query *qry = array_tail(task->ctx->req.rplan.pending); @@ -1326,13 +1126,13 @@ static void on_session_idle_timeout(uv_timer_t *timer) struct session *s = timer->data; assert(s); uv_timer_stop(timer); - if (s->closing) { + if (session_is_closing(s)) { return; } /* session was not in use during timer timeout * remove it from connection list and close */ - assert(s->tasks.len == 0 && s->waiting.len == 0); + assert(session_is_empty(s)); session_close(s); } @@ -1350,8 +1150,9 @@ static uv_handle_t *retransmit(struct qr_task *task) } struct sockaddr *addr = (struct sockaddr *)choice; struct session *session = ret->data; - assert (session->peer.ip.sa_family == AF_UNSPEC && session->outgoing); - memcpy(&session->peer, addr, sizeof(session->peer)); + struct sockaddr *peer = session_get_peer(session); + assert (peer->sa_family == AF_UNSPEC && session_is_outgoing(session)); + memcpy(peer, addr, kr_sockaddr_len(addr)); if (qr_task_send(task, ret, (struct sockaddr *)choice, task->pktbuf) == 0) { task->addrlist_turn = (task->addrlist_turn + 1) % @@ -1364,10 +1165,10 @@ static uv_handle_t *retransmit(struct qr_task *task) static void on_retransmit(uv_timer_t *req) { struct session *session = req->data; - assert(session->tasks.len == 1); + assert(session_tasklist_get_len(session) == 1); uv_timer_stop(req); - struct qr_task *task = session->tasks.at[0]; + struct qr_task *task = session_tasklist_get_first(session); if (retransmit(task) == NULL) { /* Not possible to spawn request, start timeout timer with remaining deadline. */ uint64_t timeout = KR_CONN_RTT_MAX - task->pending_count * KR_CONN_RETRY; @@ -1377,19 +1178,6 @@ static void on_retransmit(uv_timer_t *req) } } -static int timer_start(struct session *session, uv_timer_cb cb, - uint64_t timeout, uint64_t repeat) -{ - uv_timer_t *timer = &session->timeout; - assert(timer->data == session); - int ret = uv_timer_start(timer, cb, timeout, repeat); - if (ret != 0) { - uv_timer_stop(timer); - return kr_error(ENOMEM); - } - return 0; -} - static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt) { /* Close pending timer */ @@ -1461,7 +1249,6 @@ static bool subreq_enqueue(struct qr_task *task) return true; } - static int qr_task_finalize(struct qr_task *task, int state) { assert(task && task->leading == false); @@ -1482,8 +1269,8 @@ static int qr_task_finalize(struct qr_task *task, int state) /* Send back answer */ struct session *source_session = ctx->source.session; - uv_handle_t *handle = source_session->handle; - assert(source_session->closing == false); + uv_handle_t *handle = session_get_handle(source_session); + assert(!session_is_closing(source_session)); assert(handle && handle->data == ctx->source.session); assert(ctx->source.addr.ip.sa_family != AF_UNSPEC); int res = qr_task_send(task, handle, @@ -1492,21 +1279,21 @@ static int qr_task_finalize(struct qr_task *task, int state) if (res != kr_ok()) { (void) qr_task_on_send(task, NULL, kr_error(EIO)); /* Since source session is erroneous detach all tasks. */ - while (source_session->tasks.len > 0) { - struct qr_task *t = source_session->tasks.at[0]; + while (!session_tasklist_is_empty(source_session)) { + struct qr_task *t = session_tasklist_get_first(source_session); struct request_ctx *c = t->ctx; assert(c->source.session == source_session); c->source.session = NULL; /* Don't finalize them as there can be other tasks * waiting for answer to this particular task. * (ie. task->leading is true) */ - session_del_tasks(source_session, t); + session_tasklist_del_index(source_session, 0); } session_close(source_session); } else if (handle->type == UV_TCP && ctx->source.session) { /* Don't try to close source session at least * retry_interval_for_timeout_timer milliseconds */ - uv_timer_again(&ctx->source.session->timeout); + session_timer_restart(ctx->source.session); } qr_task_unref(task); @@ -1533,7 +1320,7 @@ static int qr_task_step(struct qr_task *task, task->addrlist = NULL; task->addrlist_count = 0; task->addrlist_turn = 0; - req->has_tls = (ctx->source.session && ctx->source.session->has_tls); + req->has_tls = (ctx->source.session && session_has_tls(ctx->source.session)); if (worker->too_many_open) { struct kr_rplan *rplan = &req->rplan; @@ -1600,8 +1387,8 @@ static int qr_task_step(struct qr_task *task, */ subreq_lead(task); struct session *session = handle->data; - assert(session->handle->type == UV_UDP); - ret = timer_start(session, on_retransmit, timeout, 0); + assert(session_get_handle(session) == handle && (handle->type == UV_UDP)); + ret = session_timer_start(session, on_retransmit, timeout, 0); /* Start next step with timeout, fatal if can't start a timer. */ if (ret != 0) { subreq_finalize(task, packet_source, packet); @@ -1617,8 +1404,8 @@ static int qr_task_step(struct qr_task *task, } struct session* session = NULL; if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) { - assert(session->outgoing); - if (session->closing) { + assert(session_is_outgoing(session)); + if (session_is_closing(session)) { subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } @@ -1626,84 +1413,76 @@ static int qr_task_step(struct qr_task *task, * It means that connection establishing or data sending * is coming right now. */ /* Task will be notified in on_connect() or qr_task_on_send(). */ - ret = session_add_waiting(session, task); + ret = session_waitinglist_add(session, task); if (ret < 0) { subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } - ret = session_add_tasks(session, task); + ret = session_tasklist_add(session, task); if (ret < 0) { - session_del_waiting(session, task); + session_waitinglist_del(session, task); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } assert(task->pending_count == 0); - task->pending[task->pending_count] = session->handle; + task->pending[task->pending_count] = session_get_handle(session); task->pending_count += 1; } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) { /* Connection has been already established */ - assert(session->outgoing); - if (session->closing) { - session_del_tasks(session, task); + assert(session_is_outgoing(session)); + if (session_is_closing(session)) { + session_tasklist_del(session, task); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } - if (session->tasks.len >= worker->tcp_pipeline_max) { - session_del_tasks(session, task); + if (session_tasklist_get_len(session) >= worker->tcp_pipeline_max) { + session_tasklist_del(session, task); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } /* will be removed in qr_task_on_send() */ - ret = session_add_waiting(session, task); + ret = session_waitinglist_add(session, task); if (ret < 0) { - session_del_tasks(session, task); + session_tasklist_del(session, task); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } - ret = session_add_tasks(session, task); + ret = session_tasklist_add(session, task); if (ret < 0) { - session_del_waiting(session, task); - session_del_tasks(session, task); + session_waitinglist_del(session, task); + session_tasklist_del(session, task); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } - if (session->waiting.len == 1) { - ret = qr_task_send(task, session->handle, - &session->peer.ip, task->pktbuf); + if (session_waitinglist_get_len(session) == 1) { + ret = qr_task_send(task, session_get_handle(session), + session_get_peer(session), task->pktbuf); if (ret < 0) { - session_del_waiting(session, task); - session_del_tasks(session, task); - while (session->tasks.len != 0) { - struct qr_task *t = session->tasks.at[0]; - qr_task_finalize(t, KR_STATE_FAIL); - session_del_tasks(session, t); - } + session_waitinglist_del(session, task); + session_tasklist_del(session, task); + session_tasklist_finalize(session, KR_STATE_FAIL); subreq_finalize(task, packet_source, packet); session_close(session); return qr_task_finalize(task, KR_STATE_FAIL); } - if (session->tasks.len == 1) { - uv_timer_stop(&session->timeout); - ret = timer_start(session, on_tcp_watchdog_timeout, - MAX_TCP_INACTIVITY, 0); + if (session_tasklist_get_len(session) == 1) { + session_timer_stop(session); + ret = session_timer_start(session, on_tcp_watchdog_timeout, + MAX_TCP_INACTIVITY, 0); } if (ret < 0) { - session_del_waiting(session, task); - session_del_tasks(session, task); - while (session->tasks.len != 0) { - struct qr_task *t = session->tasks.at[0]; - qr_task_finalize(t, KR_STATE_FAIL); - session_del_tasks(session, t); - } + session_waitinglist_del(session, task); + session_tasklist_del(session, task); + session_tasklist_finalize(session, KR_STATE_FAIL); subreq_finalize(task, packet_source, packet); session_close(session); return qr_task_finalize(task, KR_STATE_FAIL); } } assert(task->pending_count == 0); - task->pending[task->pending_count] = session->handle; + task->pending[task->pending_count] = session_get_handle(session); task->pending_count += 1; } else { /* Make connection */ @@ -1721,15 +1500,15 @@ static int qr_task_step(struct qr_task *task, session = client->data; ret = worker_add_tcp_waiting(ctx->worker, addr, session); if (ret < 0) { - session_del_tasks(session, task); + session_tasklist_del(session, task); iorequest_release(ctx->worker, conn); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } /* will be removed in qr_task_on_send() */ - ret = session_add_waiting(session, task); + ret = session_waitinglist_add(session, task); if (ret < 0) { - session_del_tasks(session, task); + session_tasklist_del(session, task); worker_del_tcp_waiting(ctx->worker, addr); iorequest_release(ctx->worker, conn); subreq_finalize(task, packet_source, packet); @@ -1742,47 +1521,49 @@ static int qr_task_step(struct qr_task *task, const char *key = tcpsess_key(addr); struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key); if (entry) { - assert(session->tls_client_ctx == NULL); + assert(session_tls_get_client_ctx(session) == NULL); struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry, worker); if (!tls_ctx) { - session_del_tasks(session, task); - session_del_waiting(session, task); + session_tasklist_del(session, task); + session_waitinglist_del(session, task); worker_del_tcp_waiting(ctx->worker, addr); iorequest_release(ctx->worker, conn); subreq_finalize(task, packet_source, packet); return qr_task_step(task, NULL, NULL); } tls_client_ctx_set_session(tls_ctx, session); - session->tls_client_ctx = tls_ctx; - session->has_tls = true; + session_tls_set_client_ctx(session, tls_ctx); + session_set_has_tls(session, true); } conn->data = session; - memcpy(&session->peer, addr, sizeof(session->peer)); + struct sockaddr *peer = session_get_peer(session); + memcpy(peer, addr, kr_sockaddr_len(addr)); - ret = timer_start(session, on_tcp_connect_timeout, - KR_CONN_RTT_MAX, 0); + ret = session_timer_start(session, on_tcp_connect_timeout, + KR_CONN_RTT_MAX, 0); if (ret != 0) { - session_del_tasks(session, task); - session_del_waiting(session, task); + session_tasklist_del(session, task); + session_waitinglist_del(session, task); worker_del_tcp_waiting(ctx->worker, addr); iorequest_release(ctx->worker, conn); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } - struct kr_query *qry = session_current_query(session); + struct qr_task *task = session_waitinglist_get_first(session); + struct kr_query *qry = task_get_last_pending_query(task); WITH_VERBOSE (qry) { - char addr_str[INET6_ADDRSTRLEN]; - inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip), addr_str, sizeof(addr_str)); - VERBOSE_MSG(qry, "=> connecting to: '%s'\n", addr_str); + char peer_str[INET6_ADDRSTRLEN]; + inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str)); + VERBOSE_MSG(qry, "=> connecting to: '%s'\n", peer_str); } if (uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect) != 0) { - uv_timer_stop(&session->timeout); - session_del_tasks(session, task); - session_del_waiting(session, task); + session_timer_stop(session); + session_tasklist_del(session, task); + session_waitinglist_del(session, task); worker_del_tcp_waiting(ctx->worker, addr); iorequest_release(ctx->worker, conn); subreq_finalize(task, packet_source, packet); @@ -1814,32 +1595,21 @@ static int parse_packet(knot_pkt_t *query) return ret; } -static struct qr_task* find_task(const struct session *session, uint16_t msg_id) +int worker_submit(struct session *session, knot_pkt_t *query) { - struct qr_task *ret = NULL; - const qr_tasklist_t *tasklist = &session->tasks; - for (size_t i = 0; i < tasklist->len; ++i) { - struct qr_task *task = tasklist->at[i]; - uint16_t task_msg_id = knot_wire_get_id(task->pktbuf->wire); - if (task_msg_id == msg_id) { - ret = task; - break; - } + if (!session) { + assert(false); + return kr_error(EINVAL); } - return ret; -} - -int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, - knot_pkt_t *query, const struct sockaddr* addr) -{ - bool OK = worker && handle && handle->data; + uv_handle_t *handle = session_get_handle(session); + bool OK = handle && handle->loop->data; if (!OK) { assert(false); return kr_error(EINVAL); } - struct session *session = handle->data; + struct worker_ctx *worker = handle->loop->data; /* Parse packet */ int ret = parse_packet(query); @@ -1847,13 +1617,15 @@ int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, /* Start new task on listening sockets, * or resume if this is subrequest */ struct qr_task *task = NULL; - if (!session->outgoing) { /* request from a client */ + struct sockaddr *addr = NULL; + if (!session_is_outgoing(session)) { /* request from a client */ /* Ignore badly formed queries. */ if (!query || ret != 0 || knot_wire_get_qr(query->wire)) { if (query) worker->stats.dropped += 1; return kr_error(EILSEQ); } - struct request_ctx *ctx = request_create(worker, handle, addr); + struct request_ctx *ctx = request_create(worker, handle, + session_get_peer(session)); if (!ctx) { return kr_error(ENOMEM); } @@ -1876,13 +1648,13 @@ int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, /* Ignore badly formed responses. */ return kr_error(EILSEQ); } - task = find_task(session, knot_wire_get_id(query->wire)); + task = session_tasklist_find(session, knot_wire_get_id(query->wire)); if (task == NULL) { return kr_error(ENOENT); } - assert(session->closing == false); + assert(!session_is_closing(session)); } - assert(uv_is_closing(session->handle) == false); + assert(uv_is_closing(session_get_handle(session)) == false); /* Consume input and produce next message */ return qr_task_step(task, addr, query); @@ -1918,7 +1690,7 @@ static struct session* map_find_tcp_session(map_t *map, return ret; } -static int worker_add_tcp_connected(struct worker_ctx *worker, +int worker_add_tcp_connected(struct worker_ctx *worker, const struct sockaddr* addr, struct session *session) { @@ -1931,7 +1703,7 @@ static int worker_add_tcp_connected(struct worker_ctx *worker, return map_add_tcp_session(&worker->tcp_connected, addr, session); } -static int worker_del_tcp_connected(struct worker_ctx *worker, +int worker_del_tcp_connected(struct worker_ctx *worker, const struct sockaddr* addr) { assert(addr && tcpsess_key(addr)); @@ -1976,379 +1748,74 @@ static int get_msg_size(const uint8_t *msg) return wire_read_u16(msg); } -/* If buffering, close last task as it isn't live yet. */ -static void discard_buffered(struct session *session) -{ - if (session->buffering) { - qr_task_free(session->buffering); - session->buffering = NULL; - session->msg_hdr_idx = 0; - } -} - -int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle) +int worker_end_tcp(struct session *session) { - if (!worker || !handle) { + if (!session) { return kr_error(EINVAL); } - /* If this is subrequest, notify parent task with empty input - * because in this case session doesn't own tasks, it has just - * borrowed the task from parent session. */ - struct session *session = handle->data; - if (session->outgoing) { - worker_submit(worker, handle, NULL, NULL); - } else { - discard_buffered(session); - } - return 0; -} -int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle, - const uint8_t *msg, ssize_t len) + session_timer_stop(session); + + uv_handle_t *handle = session_get_handle(session); + struct worker_ctx *worker = handle->loop->data; + struct sockaddr *peer = session_get_peer(session); + worker_del_tcp_connected(worker, peer); + session_set_connected(session, false); -{ - if (!worker || !handle) { - return kr_error(EINVAL); + struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session); + if (tls_client_ctx) { + /* Avoid gnutls_bye() call */ + tls_set_hs_state(&tls_client_ctx->c, TLS_HS_NOT_STARTED); } - /* Connection error or forced disconnect */ - struct session *session = handle->data; - assert(session && session->handle == (uv_handle_t *)handle && handle->type == UV_TCP); - if (session->closing) { - return kr_ok(); - } - if (len <= 0 || !msg) { - /* If we have pending tasks, we must dissociate them from the - * connection so they don't try to access closed and freed handle. - * @warning Do not modify task if this is outgoing request - * as it is shared with originator. - */ - struct kr_query *qry = session_current_query(session); - WITH_VERBOSE (qry) { - char addr_str[INET6_ADDRSTRLEN]; - inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip), - addr_str, sizeof(addr_str)); - VERBOSE_MSG(qry, "=> connection to '%s' closed by peer\n", addr_str); - } - uv_timer_t *timer = &session->timeout; - uv_timer_stop(timer); - struct sockaddr *peer = &session->peer.ip; - worker_del_tcp_connected(worker, peer); - session->connected = false; - - if (session->tls_client_ctx) { - /* Avoid gnutls_bye() call */ - tls_set_hs_state(&session->tls_client_ctx->c, - TLS_HS_NOT_STARTED); - } - if (session->tls_ctx) { - /* Avoid gnutls_bye() call */ - tls_set_hs_state(&session->tls_ctx->c, - TLS_HS_NOT_STARTED); - } - - if (session->outgoing && session->buffering) { - session->buffering = NULL; - } + struct tls_ctx_t *tls_ctx = session_tls_get_server_ctx(session); + if (tls_ctx) { + /* Avoid gnutls_bye() call */ + tls_set_hs_state(&tls_ctx->c, TLS_HS_NOT_STARTED); + } - assert(session->tasks.len >= session->waiting.len); - while (session->waiting.len > 0) { - struct qr_task *task = session->waiting.at[0]; - array_del(session->waiting, 0); - assert(task->refs > 1); - session_del_tasks(session, task); - if (session->outgoing) { - if (task->ctx->req.options.FORWARD) { - /* We are in TCP_FORWARD mode. - * To prevent failing at kr_resolve_consume() - * qry.flags.TCP must be cleared. - * TODO - refactoring is needed. */ + assert(session_tasklist_get_len(session) >= session_waitinglist_get_len(session)); + while (!session_waitinglist_is_empty(session)) { + struct qr_task *task = session_waitinglist_get_first(session); + session_waitinglist_del_index(session, 0); + assert(task->refs > 1); + session_tasklist_del(session, task); + if (session_is_outgoing(session)) { + if (task->ctx->req.options.FORWARD) { + /* We are in TCP_FORWARD mode. + * To prevent failing at kr_resolve_consume() + * qry.flags.TCP must be cleared. + * TODO - refactoring is needed. */ struct kr_request *req = &task->ctx->req; struct kr_rplan *rplan = &req->rplan; struct kr_query *qry = array_tail(rplan->pending); qry->flags.TCP = false; - } - qr_task_step(task, NULL, NULL); - } else { - assert(task->ctx->source.session == session); - task->ctx->source.session = NULL; - } - qr_task_unref(task); - } - while (session->tasks.len > 0) { - struct qr_task *task = session->tasks.at[0]; - if (session->outgoing) { - if (task->ctx->req.options.FORWARD) { - struct kr_request *req = &task->ctx->req; - struct kr_rplan *rplan = &req->rplan; - struct kr_query *qry = array_tail(rplan->pending); - qry->flags.TCP = false; - } - qr_task_step(task, NULL, NULL); - } else { - assert(task->ctx->source.session == session); - task->ctx->source.session = NULL; - } - session_del_tasks(session, task); - } - session_close(session); - return kr_ok(); - } - - if (session->bytes_to_skip) { - assert(session->buffering == NULL); - ssize_t min_len = MIN(session->bytes_to_skip, len); - len -= min_len; - msg += min_len; - session->bytes_to_skip -= min_len; - if (len < 0 || session->bytes_to_skip < 0) { - /* Something gone wrong. - * Better kill the connection */ - return kr_error(EILSEQ); - } - if (len == 0) { - return kr_ok(); - } - assert(session->bytes_to_skip == 0); - } - - int submitted = 0; - struct qr_task *task = session->buffering; - knot_pkt_t *pkt_buf = NULL; - if (task) { - pkt_buf = task->pktbuf; - } else { - /* Update DNS header in session->msg_hdr* */ - assert(session->msg_hdr_idx <= sizeof(session->msg_hdr)); - ssize_t hdr_amount = sizeof(session->msg_hdr) - - session->msg_hdr_idx; - if (hdr_amount > len) { - hdr_amount = len; - } - if (hdr_amount > 0) { - memcpy(session->msg_hdr + session->msg_hdr_idx, msg, hdr_amount); - session->msg_hdr_idx += hdr_amount; - len -= hdr_amount; - msg += hdr_amount; - } - if (len == 0) { /* no data beyond msg_hdr -> not much to do */ - return kr_ok(); - } - assert(session->msg_hdr_idx == sizeof(session->msg_hdr)); - session->msg_hdr_idx = 0; - uint16_t msg_size = get_msg_size(session->msg_hdr); - uint16_t msg_id = knot_wire_get_id(session->msg_hdr + 2); - if (msg_size < KNOT_WIRE_HEADER_SIZE) { - /* better kill the connection; we would probably get out of sync */ - uv_timer_t *timer = &session->timeout; - uv_timer_stop(timer); - while (session->waiting.len > 0) { - struct qr_task *task = session->waiting.at[0]; - if (session->outgoing) { - qr_task_finalize(task, KR_STATE_FAIL); - } else { - assert(task->ctx->source.session == session); - task->ctx->source.session = NULL; - } - array_del(session->waiting, 0); - session_del_tasks(session, task); - qr_task_unref(task); - } - while (session->tasks.len > 0) { - struct qr_task *task = session->tasks.at[0]; - if (session->outgoing) { - qr_task_finalize(task, KR_STATE_FAIL); - } else { - assert(task->ctx->source.session == session); - task->ctx->source.session = NULL; - } - session_del_tasks(session, task); - } - session_close(session); - - return kr_ok(); - } - - /* get task */ - if (!session->outgoing) { - /* This is a new query, create a new task that we can use - * to buffer incoming message until it's complete. */ - struct sockaddr *addr = &(session->peer.ip); - assert(addr->sa_family != AF_UNSPEC); - struct request_ctx *ctx = request_create(worker, - (uv_handle_t *)handle, - addr); - if (!ctx) { - return kr_error(ENOMEM); - } - task = qr_task_create(ctx); - if (!task) { - request_free(ctx); - return kr_error(ENOMEM); } + qr_task_step(task, NULL, NULL); } else { - /* Start of response from upstream. - * The session task list must contain a task - * with the same msg id. */ - task = find_task(session, msg_id); - /* FIXME: on high load over one connection, it's likely - * that we will get multiple matches sooner or later (!) */ - if (task) { - /* Make sure we can process maximum packet sizes over TCP for outbound queries. - * Previous packet is allocated with mempool, so there's no need to free it manually. */ - if (task->pktbuf->max_size < KNOT_WIRE_MAX_PKTSIZE) { - knot_mm_t *pool = &task->pktbuf->mm; - pkt_buf = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, pool); - if (!pkt_buf) { - return kr_error(ENOMEM); - } - task->pktbuf = pkt_buf; - } - knot_pkt_clear(task->pktbuf); - assert(task->leading == false); - } else { - session->bytes_to_skip = msg_size - 2; - ssize_t min_len = MIN(session->bytes_to_skip, len); - len -= min_len; - msg += min_len; - session->bytes_to_skip -= min_len; - if (len < 0 || session->bytes_to_skip < 0) { - /* Something gone wrong. - * Better kill the connection */ - return kr_error(EILSEQ); - } - if (len == 0) { - return submitted; - } - assert(session->bytes_to_skip == 0); - int ret = worker_process_tcp(worker, handle, msg, len); - if (ret < 0) { - submitted = ret; - } else { - submitted += ret; - } - return submitted; - } + assert(task->ctx->source.session == session); + task->ctx->source.session = NULL; } - - pkt_buf = task->pktbuf; - knot_wire_set_id(pkt_buf->wire, msg_id); - pkt_buf->size = 2; - task->bytes_remaining = msg_size - 2; - assert(session->buffering == NULL); - session->buffering = task; + qr_task_unref(task); } - /* At this point session must have either created new task - * or it's already assigned. */ - assert(task); - assert(len > 0); - - /* Message is too long, can't process it. */ - ssize_t to_read = MIN(len, task->bytes_remaining); - if (pkt_buf->size + to_read > pkt_buf->max_size) { - // TODO reallocate pkt_buf - pkt_buf->size = 0; - len -= to_read; - msg += to_read; - session->bytes_to_skip = task->bytes_remaining - to_read; - task->bytes_remaining = 0; - if (session->buffering) { - if (!session->outgoing) { - qr_task_complete(session->buffering); - } - session->buffering = NULL; - } - if (len > 0) { - int ret = worker_process_tcp(worker, handle, msg, len); - if (ret < 0) { - submitted = ret; - } else { - submitted += ret; + while (!session_tasklist_is_empty(session)) { + struct qr_task *task = session_tasklist_get_first(session); + session_tasklist_del_index(session, 0); + if (session_is_outgoing(session)) { + if (task->ctx->req.options.FORWARD) { + struct kr_request *req = &task->ctx->req; + struct kr_rplan *rplan = &req->rplan; + struct kr_query *qry = array_tail(rplan->pending); + qry->flags.TCP = false; } - } - return submitted; - } - /* Buffer message and check if it's complete */ - memcpy(pkt_buf->wire + pkt_buf->size, msg, to_read); - pkt_buf->size += to_read; - task->bytes_remaining -= to_read; - len -= to_read; - msg += to_read; - if (task->bytes_remaining == 0) { - /* Message was assembled, clear temporary. */ - session->buffering = NULL; - session->msg_hdr_idx = 0; - const struct sockaddr *addr = NULL; - knot_pkt_t *pkt = pkt_buf; - if (session->outgoing) { - addr = &session->peer.ip; - assert ((task->pending_count == 1) && (task->pending[0] == session->handle)); - task->pending_count = 0; - session_del_tasks(session, task); - } - /* Parse the packet and start resolving complete query */ - int ret = parse_packet(pkt); - if (ret == 0) { - if (session->outgoing) { - /* To prevent slow lorris attack restart watchdog only after - * the whole message was successfully assembled and parsed */ - if (session->tasks.len > 0 || session->waiting.len > 0) { - uv_timer_stop(&session->timeout); - timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0); - } - } else { - /* Start only new queries, - * not subrequests that are already pending */ - ret = request_start(task->ctx, pkt); - if (ret != 0) { - /* Allocation of answer buffer has failed. - * We can't notify client about failure, - * so just end the task processing. */ - qr_task_complete(task); - goto next_msg; - } - - ret = qr_task_register(task, session); - if (ret != 0) { - /* Answer buffer has been allocated, - * but task can't be attached to the given - * session due to memory problems. - * Finalize the task, otherwise it becomes orphaned. */ - knot_pkt_init_response(task->ctx->req.answer, pkt); - qr_task_finalize(task, KR_STATE_FAIL); - goto next_msg; - } - submitted += 1; - if (task->leading) { - assert(false); - } - } - } else if (session->outgoing) { - /* Drop malformed packet and retry resolution */ - pkt = NULL; - ret = 0; + qr_task_step(task, NULL, NULL); } else { - qr_task_complete(task); - } - /* Only proceed if the message is valid, or it's an invalid response to - * an outbound query which needs to be treated as a timeout. */ - if (ret == 0) { - /* since there can be next dns message, we must to proceed - * even if qr_task_step() returns error */ - qr_task_step(task, addr, pkt); - } -next_msg: - if (len > 0) { - /* TODO: this is simple via iteration; recursion doesn't really help */ - ret = worker_process_tcp(worker, handle, msg, len); - if (ret < 0) { - return ret; - } - submitted += ret; + assert(task->ctx->source.session == session); + task->ctx->source.session = NULL; } } - assert(submitted >= 0); - return submitted; + session_close(session); + return kr_ok(); } struct qr_task *worker_resolve_start(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options) @@ -2394,6 +1861,11 @@ int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query) return qr_task_step(task, NULL, query); } +int worker_task_numrefs(const struct qr_task *task) +{ + return task->refs; +} + struct kr_request *worker_task_request(struct qr_task *task) { if (!task || !task->ctx) { @@ -2408,9 +1880,82 @@ int worker_task_finalize(struct qr_task *task, int state) return qr_task_finalize(task, state); } -void worker_session_close(struct session *session) + int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source, + knot_pkt_t *packet) + { + return qr_task_step(task, packet_source, packet); + } + +void worker_task_complete(struct qr_task *task) { - session_close(session); + return qr_task_complete(task); +} + +void worker_task_ref(struct qr_task *task) +{ + qr_task_ref(task); +} + +void worker_task_unref(struct qr_task *task) +{ + qr_task_unref(task); +} + +void worker_task_timeout_inc(struct qr_task *task) +{ + task->timeouts += 1; +} + +struct session *worker_session_borrow(struct worker_ctx *worker) +{ + struct session *s = NULL; + if (worker->pool_sessions.len > 0) { + s = array_tail(worker->pool_sessions); + array_pop(worker->pool_sessions); + kr_asan_custom_unpoison(session, s); + } else { + s = session_new(); + } + return s; +} + +void worker_session_release(struct worker_ctx *worker, uv_handle_t *handle) +{ + if (!worker || !handle) { + return; + } + struct session *s = handle->data; + if (!s) { + return; + } + assert(session_is_empty(s)); + if (worker->pool_sessions.len < MP_FREELIST_SIZE) { + session_clear(s); + array_push(worker->pool_sessions, s); + kr_asan_custom_poison(session, s); + } else { + session_free(s); + } +} + +knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task) +{ + return task->pktbuf; +} + +struct request_ctx *worker_task_get_request(struct qr_task *task) +{ + return task->ctx; +} + +struct session *worker_request_get_source_session(struct request_ctx *ctx) +{ + return ctx->source.session; +} + +void worker_request_set_source_session(struct request_ctx *ctx, struct session *session) +{ + ctx->source.session = session; } /** Reserve worker buffers */ @@ -2445,12 +1990,20 @@ static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) } \ array_clear(list) +#define reclaim_freelist_custom(list, type, cb) \ + for (unsigned i = 0; i < list.len; ++i) { \ + void *elm = list.at[i]; \ + kr_asan_custom_unpoison(type, elm); \ + cb(elm); \ + } \ + array_clear(list) + void worker_reclaim(struct worker_ctx *worker) { reclaim_freelist(worker->pool_mp, struct mempool, mp_delete); reclaim_freelist(worker->pool_ioreqs, uv_reqs_t, free); reclaim_freelist(worker->pool_iohandles, uv_handles_t, free); - reclaim_freelist(worker->pool_sessions, struct session, session_free); + reclaim_freelist_custom(worker->pool_sessions, session, session_free); mp_delete(worker->pkt_pool.ctx); worker->pkt_pool.ctx = NULL; trie_free(worker->subreq_out); diff --git a/daemon/worker.h b/daemon/worker.h index 3acecfd0e..8b90bf84c 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -37,30 +37,17 @@ struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool, /** * Process an incoming packet (query from a client or answer from upstream). * - * @param worker the singleton worker - * @param handle socket through which the request came - * @param query the packet, or NULL on an error from the transport layer - * @param addr the address from which the packet came (or NULL, possibly, on error) + * @param session session the where packet came from + * @param query the packet, or NULL on an error from the transport layer * @return 0 or an error code */ -int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, - const struct sockaddr* addr); - -/** - * Process incoming DNS message fragment(s) that arrived over a stream (TCP, TLS). - * - * If the fragment contains only a partial message, it is buffered. - * If the fragment contains a complete query or completes current fragment, execute it. - * @return the number of newly-completed requests (>=0) or an error code - */ -int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle, - const uint8_t *msg, ssize_t len); +int worker_submit(struct session *session, knot_pkt_t *query); /** * End current DNS/TCP session, this disassociates pending tasks from this session * which may be freely closed afterwards. */ -int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle); +int worker_end_tcp(struct session *s); /** * Start query resolution with given query. @@ -83,16 +70,48 @@ struct kr_request *worker_task_request(struct qr_task *task); /** Collect worker mempools */ void worker_reclaim(struct worker_ctx *worker); -/** Closes given session */ -void worker_session_close(struct session *session); +struct session *worker_session_borrow(struct worker_ctx *worker); + +void worker_session_release(struct worker_ctx *worker, uv_handle_t *handle); void *worker_iohandle_borrow(struct worker_ctx *worker); void worker_iohandle_release(struct worker_ctx *worker, void *h); +ssize_t worker_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len); + +ssize_t worker_gnutls_client_push(gnutls_transport_ptr_t h, const void *buf, size_t len); + +int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source, + knot_pkt_t *packet); + +int worker_task_numrefs(const struct qr_task *task); + /** Finalize given task */ int worker_task_finalize(struct qr_task *task, int state); +void worker_task_complete(struct qr_task *task); + +void worker_task_ref(struct qr_task *task); + +void worker_task_unref(struct qr_task *task); + +void worker_task_timeout_inc(struct qr_task *task); + +int worker_add_tcp_connected(struct worker_ctx *worker, + const struct sockaddr *addr, + struct session *session); +int worker_del_tcp_connected(struct worker_ctx *worker, + const struct sockaddr *addr); + +knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task); + +struct request_ctx *worker_task_get_request(struct qr_task *task); + +struct session *worker_request_get_source_session(struct request_ctx *); + +void worker_request_set_source_session(struct request_ctx *, struct session *session); + /** @cond internal */ /** Number of request within timeout window. */ @@ -107,9 +126,6 @@ typedef array_t(void *) mp_freelist_t; /** List of query resolution tasks. */ typedef array_t(struct qr_task *) qr_tasklist_t; -/** Session list. */ -typedef array_t(struct session *) qr_sessionlist_t; - /** \details Worker state is meant to persist during the whole life of daemon. */ struct worker_ctx { struct engine *engine; diff --git a/lib/defines.h b/lib/defines.h index 659558837..84da059e3 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -91,8 +91,12 @@ void __asan_poison_memory_region(void const volatile *addr, size_t size); void __asan_unpoison_memory_region(void const volatile *addr, size_t size); #define kr_asan_poison(addr, size) __asan_poison_memory_region((addr), (size)) #define kr_asan_unpoison(addr, size) __asan_unpoison_memory_region((addr), (size)) +#define kr_asan_custom_poison(fn, addr) fn ##_poison((addr)) +#define kr_asan_custom_unpoison(fn, addr) fn ##_unpoison((addr)) #else #define kr_asan_poison(addr, size) #define kr_asan_unpoison(addr, size) +#define kr_asan_custom_poison(fn, addr) +#define kr_asan_custom_unpoison(fn, addr) #endif /* @endcond */ -- GitLab