Skip to content
Snippets Groups Projects
Verified Commit ab4816f2 authored by Grigorii Demidov's avatar Grigorii Demidov Committed by Vladimír Čunát
Browse files

daemon: logic around struct session was relocated to separate module; input...

daemon: logic around struct session was relocated to separate module; input data buffering scheme was changed (libuv); attempt was made to simplify processing of the stream
parent 776b6ad1
No related branches found
No related tags found
1 merge request!675daemon: attempt of refactoring
......@@ -9,6 +9,7 @@ kresd_SOURCES := \
daemon/tls_ephemeral_credentials.c \
daemon/tls_session_ticket-srv.c \
daemon/zimport.c \
daemon/session.c \
daemon/main.c
kresd_DIST := daemon/lua/kres.lua daemon/lua/kres-gen.lua \
......
......@@ -24,6 +24,7 @@
#include "daemon/network.h"
#include "daemon/worker.h"
#include "daemon/tls.h"
#include "daemon/session.h"
#define negotiate_bufsize(func, handle, bufsize_want) do { \
int bufsize = 0; func(handle, &bufsize); \
......@@ -48,86 +49,35 @@ static void check_bufsize(uv_handle_t* handle)
#undef negotiate_bufsize
static void session_clear(struct session *s)
static uv_stream_t *handle_borrow(uv_loop_t *loop)
{
assert(s->tasks.len == 0 && s->waiting.len == 0);
array_clear(s->tasks);
array_clear(s->waiting);
tls_free(s->tls_ctx);
tls_client_ctx_free(s->tls_client_ctx);
memset(s, 0, sizeof(*s));
}
void session_free(struct session *s)
{
if (s) {
assert(s->tasks.len == 0 && s->waiting.len == 0);
session_clear(s);
free(s);
struct worker_ctx *worker = loop->data;
void *req = worker_iohandle_borrow(worker);
if (!req) {
return NULL;
}
}
struct session *session_new(void)
{
return calloc(1, sizeof(struct session));
}
static struct session *session_borrow(struct worker_ctx *worker)
{
struct session *s = NULL;
if (worker->pool_sessions.len > 0) {
s = array_tail(worker->pool_sessions);
array_pop(worker->pool_sessions);
kr_asan_unpoison(s, sizeof(*s));
} else {
s = session_new();
}
return s;
}
static void session_release(struct worker_ctx *worker, uv_handle_t *handle)
{
if (!worker || !handle) {
return;
}
struct session *s = handle->data;
if (!s) {
return;
}
assert(s->waiting.len == 0 && s->tasks.len == 0);
assert(s->buffering == NULL);
if (!s->outgoing && handle->type == UV_TCP) {
worker_end_tcp(worker, handle); /* to free the buffering task */
}
if (worker->pool_sessions.len < MP_FREELIST_SIZE) {
session_clear(s);
array_push(worker->pool_sessions, s);
kr_asan_poison(s, sizeof(*s));
} else {
session_free(s);
}
return (uv_stream_t *)req;
}
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
/* Worker has single buffer which is reused for all incoming
* datagrams / stream reads, the content of the buffer is
/* UDP sessions use worker buffer for wire data,
* TCP sessions use session buffer for wire data
* (see session_set_handle()).
* TLS sessions use buffer from TLS context.
* The content of the worker buffer is
* guaranteed to be unchanged only for the duration of
* udp_read() and tcp_read().
*/
struct session *session = handle->data;
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
buf->base = (char *)worker->wire_buf;
/* Limit TCP stream buffer size to 4K for granularity in batches of incoming queries. */
if (handle->type == UV_TCP) {
buf->len = MIN(suggested_size, 4096);
/* Regular buffer size for subrequests. */
} else if (session->outgoing) {
buf->len = suggested_size;
/* Use recvmmsg() on master sockets if possible. */
if (!session_has_tls(session)) {
buf->base = (char *) session_wirebuf_get_free_start(session);
buf->len = session_wirebuf_get_free_size(session);
} else {
buf->len = sizeof(worker->wire_buf);
struct tls_common_ctx *ctx = session_tls_get_common_ctx(session);
buf->base = (char *) ctx->recv_buf;
buf->len = sizeof(ctx->recv_buf);
}
}
......@@ -137,29 +87,30 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
struct session *s = handle->data;
if (s->closing) {
if (session_is_closing(s)) {
return;
}
if (nread <= 0) {
if (nread < 0) { /* Error response, notify resolver */
worker_submit(worker, (uv_handle_t *)handle, NULL, addr);
worker_submit(s, NULL);
} /* nread == 0 is for freeing buffers, we don't need to do this */
return;
}
if (addr->sa_family == AF_UNSPEC) {
return;
}
if (s->outgoing) {
assert(s->peer.ip.sa_family != AF_UNSPEC);
if (kr_sockaddr_cmp(&s->peer.ip, addr) != 0) {
struct sockaddr *peer = session_get_peer(s);
if (session_is_outgoing(s)) {
assert(peer->sa_family != AF_UNSPEC);
if (kr_sockaddr_cmp(peer, addr) != 0) {
return;
}
} else {
memcpy(peer, addr, kr_sockaddr_len(addr));
}
knot_pkt_t *query = knot_pkt_new(buf->base, nread, &worker->pkt_pool);
if (query) {
query->max_size = KNOT_WIRE_MAX_PKTSIZE;
worker_submit(worker, (uv_handle_t *)handle, query, addr);
}
ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base, nread);
assert(consumed == nread);
session_wirebuf_process(s);
mp_flush(worker->pkt_pool.ctx);
}
......@@ -167,11 +118,10 @@ static int udp_bind_finalize(uv_handle_t *handle)
{
check_bufsize(handle);
/* Handle is already created, just create context. */
struct session *session = session_new();
assert(session);
session->outgoing = false;
session->handle = handle;
handle->data = session;
struct session *s = session_new();
assert(s);
session_set_outgoing(s, false);
session_set_handle(s, handle);
return io_start_read(handle);
}
......@@ -203,14 +153,14 @@ int udp_bindfd(uv_udp_t *handle, int fd)
static void tcp_timeout_trigger(uv_timer_t *timer)
{
struct session *session = timer->data;
struct session *s = timer->data;
assert(session->outgoing == false);
if (session->tasks.len > 0) {
assert(session_is_outgoing(s) == false);
if (!session_tasklist_is_empty(s)) {
uv_timer_again(timer);
} else if (!session->closing) {
} else if (!session_is_closing(s)) {
uv_timer_stop(timer);
worker_session_close(session);
session_close(s);
}
}
......@@ -218,50 +168,78 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
uv_loop_t *loop = handle->loop;
struct session *s = handle->data;
if (s->closing) {
assert(s && session_get_handle(s) == (uv_handle_t *)handle &&
handle->type == UV_TCP);
if (session_is_closing(s)) {
return;
}
/* nread might be 0, which does not indicate an error or EOF.
* This is equivalent to EAGAIN or EWOULDBLOCK under read(2). */
if (nread == 0) {
return;
}
if (nread == UV_EOF) {
nread = 0;
}
struct worker_ctx *worker = loop->data;
/* TCP pipelining is rather complicated and requires cooperation from the worker
* so the whole message reassembly and demuxing logic is inside worker */
int ret = 0;
if (s->has_tls) {
ret = tls_process(worker, handle, (const uint8_t *)buf->base, nread);
} else {
ret = worker_process_tcp(worker, handle, (const uint8_t *)buf->base, nread);
if (nread < 0 || !buf->base) {
if (kr_verbose_status) {
struct sockaddr *peer = session_get_peer(s);
char peer_str[INET6_ADDRSTRLEN];
inet_ntop(peer->sa_family, kr_inaddr(peer),
peer_str, sizeof(peer_str));
kr_log_verbose("[io] => connection to '%s' closed by peer (%s)\n", peer_str,
uv_strerror(nread));
}
worker_end_tcp(s);
return;
}
ssize_t consumed = 0;
const uint8_t *data = (const uint8_t *)buf->base;
ssize_t data_len = nread;
if (session_has_tls(s)) {
/* buf->base points to start of the tls receive buffer.
Decode data free space in session wire buffer. */
consumed = tls_process_input_data(s, (const uint8_t *)buf->base, nread);
data = session_wirebuf_get_free_start(s);
data_len = consumed;
}
/* data points to start of the free space in session wire buffer.
Simple increase internal counter. */
consumed = session_wirebuf_consume(s, data, data_len);
assert(consumed == data_len);
int ret = session_wirebuf_process(s);
if (ret < 0) {
worker_end_tcp(worker, (uv_handle_t *)handle);
worker_end_tcp(s);
/* Exceeded per-connection quota for outstanding requests
* stop reading from stream and close after last message is processed. */
if (!s->outgoing && !uv_is_closing((uv_handle_t *)&s->timeout)) {
uv_timer_stop(&s->timeout);
if (s->tasks.len == 0) {
worker_session_close(s);
uv_timer_t *t = session_get_timer(s);
if (!session_is_outgoing(s) && !uv_is_closing((uv_handle_t *)t)) {
uv_timer_stop(t);
if (session_tasklist_is_empty(s)) {
session_close(s);
} else { /* If there are tasks running, defer until they finish. */
uv_timer_start(&s->timeout, tcp_timeout_trigger,
uv_timer_start(t, tcp_timeout_trigger,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
}
}
/* Connection spawned at least one request, reset its deadline for next query.
* https://tools.ietf.org/html/rfc7766#section-6.2.3 */
} else if (ret > 0 && !s->outgoing && !s->closing) {
uv_timer_again(&s->timeout);
} else if (ret > 0 && !session_is_outgoing(s) && !session_is_closing(s)) {
uv_timer_t *t = session_get_timer(s);
uv_timer_again(t);
}
mp_flush(worker->pkt_pool.ctx);
}
static void _tcp_accept(uv_stream_t *master, int status, bool tls)
{
if (status != 0) {
if (status != 0) {
return;
}
......@@ -298,37 +276,40 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
/* Set deadlines for TCP connection and start reading.
* It will re-check every half of a request time limit if the connection
* is idle and should be terminated, this is an educated guess. */
struct sockaddr *addr = &(session->peer.ip);
int addr_len = sizeof(union inaddr);
int ret = uv_tcp_getpeername((uv_tcp_t *)client, addr, &addr_len);
if (ret || addr->sa_family == AF_UNSPEC) {
/* close session, close underlying uv handles and
* deallocate (or return to memory pool) memory. */
worker_session_close(session);
struct session *s = client->data;
assert(session_is_outgoing(s) == false);
struct sockaddr *peer = session_get_peer(s);
int peer_len = sizeof(union inaddr);
int ret = uv_tcp_getpeername((uv_tcp_t *)client, peer, &peer_len);
if (ret || peer->sa_family == AF_UNSPEC) {
session_close(s);
return;
}
struct worker_ctx *worker = (struct worker_ctx *)master->loop->data;
const struct engine *engine = worker->engine;
const struct network *net = &engine->net;
uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
uint64_t timeout = KR_CONN_RTT_MAX / 2;
session->has_tls = tls;
session_set_has_tls(s, tls);
if (tls) {
timeout += TLS_MAX_HANDSHAKE_TIME;
if (!session->tls_ctx) {
session->tls_ctx = tls_new(master->loop->data);
if (!session->tls_ctx) {
worker_session_close(session);
struct tls_ctx_t *ctx = session_tls_get_server_ctx(s);
if (!ctx) {
ctx = tls_new(worker);
if (!ctx) {
session_close(s);
return;
}
session->tls_ctx->c.session = session;
session->tls_ctx->c.handshake_state = TLS_HS_IN_PROGRESS;
ctx->c.session = s;
ctx->c.handshake_state = TLS_HS_IN_PROGRESS;
session_tls_set_server_ctx(s, ctx);
}
}
uv_timer_t *timer = &session->timeout;
uv_timer_start(timer, tcp_timeout_trigger, timeout, idle_in_timeout);
uv_timer_t *t = session_get_timer(s);
uv_timer_start(t, tcp_timeout_trigger, timeout, idle_in_timeout);
io_start_read((uv_handle_t *)client);
}
......@@ -444,13 +425,12 @@ int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family)
return ret;
}
struct worker_ctx *worker = loop->data;
struct session *session = session_borrow(worker);
assert(session);
session->handle = handle;
handle->data = session;
session->timeout.data = session;
uv_timer_init(worker->loop, &session->timeout);
return ret;
struct session *s = worker_session_borrow(worker);
assert(s);
session_set_handle(s, handle);
uv_timer_t *t = session_get_timer(s);
t->data = s;
uv_timer_init(worker->loop, t);
}
void io_deinit(uv_handle_t *handle)
......@@ -461,7 +441,7 @@ void io_deinit(uv_handle_t *handle)
uv_loop_t *loop = handle->loop;
if (loop && loop->data) {
struct worker_ctx *worker = loop->data;
session_release(worker, handle);
worker_session_release(worker, handle);
} else {
session_free(handle->data);
}
......
......@@ -25,33 +25,6 @@
struct tls_ctx_t;
struct tls_client_ctx_t;
/* Per-session (TCP or UDP) persistent structure,
* that exists between remote counterpart and a local socket.
*/
struct session {
bool outgoing; /**< True: to upstream; false: from a client. */
bool throttled;
bool has_tls;
bool connected;
bool closing;
union inaddr peer;
uv_handle_t *handle;
uv_timer_t timeout;
struct qr_task *buffering; /**< Worker buffers the incomplete TCP query here. */
struct tls_ctx_t *tls_ctx;
struct tls_client_ctx_t *tls_client_ctx;
uint8_t msg_hdr[4]; /**< Buffer for DNS message header. */
ssize_t msg_hdr_idx; /**< The number of bytes in msg_hdr filled so far. */
qr_tasklist_t tasks;
qr_tasklist_t waiting;
ssize_t bytes_to_skip;
};
void session_free(struct session *s);
struct session *session_new(void);
int udp_bind(uv_udp_t *handle, struct sockaddr *addr);
int udp_bindfd(uv_udp_t *handle, int fd);
int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr);
......
#include <assert.h>
#include <libknot/packet/pkt.h>
#include "lib/defines.h"
#include "daemon/session.h"
#include "daemon/engine.h"
#include "daemon/tls.h"
#include "daemon/worker.h"
#include "daemon/io.h"
/** List of tasks. */
typedef array_t(struct qr_task *) session_tasklist_t;
struct session_flags {
bool outgoing : 1; /**< True: to upstream; false: from a client. */
bool throttled : 1; /**< True: data reading from peer is temporarily stopped. */
bool has_tls : 1; /**< True: given session uses TLS. */
bool connected : 1; /**< True: TCP connection is established. */
bool closing : 1; /**< True: session close sequence is in progress. */
bool wirebuf_error : 1; /**< True: last operation with wirebuf ended up with an error. */
};
/* Per-session (TCP or UDP) persistent structure,
* that exists between remote counterpart and a local socket.
*/
struct session {
struct session_flags sflags; /**< miscellaneous flags. */
union inaddr peer; /**< address of peer; is not set for client's UDP sessions. */
uv_handle_t *handle; /**< libuv handle for IO operations. */
uv_timer_t timeout; /**< libuv handle for timer. */
struct tls_ctx_t *tls_ctx; /**< server side tls-related data. */
struct tls_client_ctx_t *tls_client_ctx; /**< client side tls-related data. */
session_tasklist_t tasks; /**< list of tasks which assotiated with given session. */
session_tasklist_t waiting; /**< list of tasks been waiting for IO (subset of taska). */
uint8_t *wire_buf; /**< Buffer for DNS message. */
ssize_t wire_buf_size; /**< Buffer size. */
ssize_t wire_buf_idx; /**< The number of bytes in wire_buf filled so far. */
};
static void on_session_close(uv_handle_t *handle)
{
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
struct session *session = handle->data;
assert(session->handle == handle);
io_deinit(handle);
worker_iohandle_release(worker, handle);
}
static void on_session_timer_close(uv_handle_t *timer)
{
struct session *session = timer->data;
uv_handle_t *handle = session->handle;
assert(handle && handle->data == session);
assert (session->sflags.outgoing || handle->type == UV_TCP);
if (!uv_is_closing(handle)) {
uv_close(handle, on_session_close);
}
}
void session_free(struct session *s)
{
if (s) {
assert(s->tasks.len == 0 && s->waiting.len == 0);
session_clear(s);
free(s);
}
}
void session_clear(struct session *s)
{
assert(s->tasks.len == 0 && s->waiting.len == 0);
if (s->handle && s->handle->type == UV_TCP) {
free(s->wire_buf);
}
array_clear(s->tasks);
array_clear(s->waiting);
tls_free(s->tls_ctx);
tls_client_ctx_free(s->tls_client_ctx);
memset(s, 0, sizeof(*s));
}
struct session *session_new(void)
{
return calloc(1, sizeof(struct session));
}
void session_close(struct session *session)
{
assert(session->tasks.len == 0 && session->waiting.len == 0);
if (session->sflags.closing) {
return;
}
uv_handle_t *handle = session->handle;
io_stop_read(handle);
session->sflags.closing = true;
if (session->sflags.outgoing &&
session->peer.ip.sa_family != AF_UNSPEC) {
struct worker_ctx *worker = handle->loop->data;
struct sockaddr *peer = &session->peer.ip;
worker_del_tcp_connected(worker, peer);
session->sflags.connected = false;
}
if (!uv_is_closing((uv_handle_t *)&session->timeout)) {
uv_timer_stop(&session->timeout);
if (session->tls_client_ctx) {
tls_close(&session->tls_client_ctx->c);
}
if (session->tls_ctx) {
tls_close(&session->tls_ctx->c);
}
session->timeout.data = session;
uv_close((uv_handle_t *)&session->timeout, on_session_timer_close);
}
}
int session_start_read(struct session *session)
{
return io_start_read(session->handle);
}
int session_waitinglist_add(struct session *session, struct qr_task *task)
{
for (int i = 0; i < session->waiting.len; ++i) {
if (session->waiting.at[i] == task) {
return i;
}
}
int ret = array_push(session->waiting, task);
if (ret >= 0) {
worker_task_ref(task);
}
return ret;
}
int session_waitinglist_del(struct session *session, struct qr_task *task)
{
int ret = kr_error(ENOENT);
for (int i = 0; i < session->waiting.len; ++i) {
if (session->waiting.at[i] == task) {
array_del(session->waiting, i);
worker_task_unref(task);
ret = kr_ok();
break;
}
}
return ret;
}
int session_waitinglist_del_index(struct session *session, int index)
{
int ret = kr_error(ENOENT);
if (index < session->waiting.len) {
struct qr_task *task = session->waiting.at[index];
array_del(session->waiting, index);
worker_task_unref(task);
ret = kr_ok();
}
return ret;
}
int session_tasklist_add(struct session *session, struct qr_task *task)
{
for (int i = 0; i < session->tasks.len; ++i) {
if (session->tasks.at[i] == task) {
return i;
}
}
int ret = array_push(session->tasks, task);
if (ret >= 0) {
worker_task_ref(task);
}
return ret;
}
int session_tasklist_del(struct session *session, struct qr_task *task)
{
int ret = kr_error(ENOENT);
for (int i = 0; i < session->tasks.len; ++i) {
if (session->tasks.at[i] == task) {
array_del(session->tasks, i);
worker_task_unref(task);
ret = kr_ok();
break;
}
}
return ret;
}
int session_tasklist_del_index(struct session *session, int index)
{
int ret = kr_error(ENOENT);
if (index < session->tasks.len) {
struct qr_task *task = session->tasks.at[index];
array_del(session->tasks, index);
worker_task_unref(task);
ret = kr_ok();
}
return ret;
}
struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id)
{
struct qr_task *ret = NULL;
const session_tasklist_t *tasklist = &session->tasks;
for (size_t i = 0; i < tasklist->len; ++i) {
struct qr_task *task = tasklist->at[i];
knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
uint16_t task_msg_id = knot_wire_get_id(pktbuf->wire);
if (task_msg_id == msg_id) {
ret = task;
break;
}
}
return ret;
}
bool session_is_outgoing(const struct session *session)
{
return session->sflags.outgoing;
}
void session_set_outgoing(struct session *session, bool outgoing)
{
session->sflags.outgoing = outgoing;
}
bool session_is_closing(const struct session *session)
{
return session->sflags.closing;
}
void session_set_closing(struct session *session, bool closing)
{
session->sflags.closing = closing;
}
bool session_is_connected(const struct session *session)
{
return session->sflags.connected;
}
void session_set_connected(struct session *session, bool connected)
{
session->sflags.connected = connected;
}
bool session_is_throttled(const struct session *session)
{
return session->sflags.throttled;
}
void session_set_throttled(struct session *session, bool throttled)
{
session->sflags.throttled = throttled;
}
struct sockaddr *session_get_peer(struct session *session)
{
return &session->peer.ip;
}
struct tls_ctx_t *session_tls_get_server_ctx(const struct session *session)
{
return session->tls_ctx;
}
void session_tls_set_server_ctx(struct session *session, struct tls_ctx_t *ctx)
{
session->tls_ctx = ctx;
}
struct tls_client_ctx_t *session_tls_get_client_ctx(const struct session *session)
{
return session->tls_client_ctx;
}
void session_tls_set_client_ctx(struct session *session, struct tls_client_ctx_t *ctx)
{
session->tls_client_ctx = ctx;
}
struct tls_common_ctx *session_tls_get_common_ctx(const struct session *session)
{
struct tls_common_ctx *tls_ctx = session->sflags.outgoing ? &session->tls_client_ctx->c :
&session->tls_ctx->c;
return tls_ctx;
}
uv_handle_t *session_get_handle(struct session *session)
{
return session->handle;
}
int session_set_handle(struct session *session, uv_handle_t *h)
{
if (!h) {
return kr_error(EINVAL);
}
if (h->type == UV_TCP) {
uint8_t *wire_buf = malloc(KNOT_WIRE_MAX_PKTSIZE);
if (!wire_buf) {
return kr_error(ENOMEM);
}
session->wire_buf = wire_buf;
session->wire_buf_size = KNOT_WIRE_MAX_PKTSIZE;
} else if (h->type == UV_UDP) {
assert(h->loop->data);
struct worker_ctx *worker = h->loop->data;
session->wire_buf = worker->wire_buf;
session->wire_buf_size = sizeof(worker->wire_buf);
}
session->handle = h;
h->data = session;
return kr_ok();
}
uv_timer_t *session_get_timer(struct session *session)
{
return &session->timeout;
}
size_t session_tasklist_get_len(const struct session *session)
{
return session->tasks.len;
}
size_t session_waitinglist_get_len(const struct session *session)
{
return session->waiting.len;
}
bool session_tasklist_is_empty(const struct session *session)
{
return session_tasklist_get_len(session) == 0;
}
bool session_waitinglist_is_empty(const struct session *session)
{
return session_waitinglist_get_len(session) == 0;
}
bool session_is_empty(const struct session *session)
{
return session_tasklist_is_empty(session) &&
session_waitinglist_is_empty(session);
}
bool session_has_tls(const struct session *session)
{
return session->sflags.has_tls;
}
void session_set_has_tls(struct session *session, bool has_tls)
{
session->sflags.has_tls = has_tls;
}
struct qr_task *session_waitinglist_get_first(const struct session *session)
{
struct qr_task *t = NULL;
if (session->waiting.len > 0) {
t = session->waiting.at[0];
}
return t;
}
struct qr_task *session_tasklist_get_first(const struct session *session)
{
struct qr_task *t = NULL;
if (session->tasks.len > 0) {
t = session->tasks.at[0];
}
return t;
}
void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt)
{
while (session->waiting.len > 0) {
struct qr_task *task = session->waiting.at[0];
session_tasklist_del(session, task);
array_del(session->waiting, 0);
assert(worker_task_numrefs(task) > 1);
if (increase_timeout_cnt) {
worker_task_timeout_inc(task);
}
worker_task_unref(task);
worker_task_step(task, NULL, NULL);
}
}
void session_waitinglist_finalize(struct session *session, int status)
{
while (session->waiting.len > 0) {
struct qr_task *t = session->waiting.at[0];
array_del(session->waiting, 0);
session_tasklist_del(session, t);
if (session->sflags.outgoing) {
worker_task_finalize(t, status);
} else {
struct request_ctx *ctx = worker_task_get_request(t);
assert(worker_request_get_source_session(ctx) == session);
worker_request_set_source_session(ctx, NULL);
}
worker_task_unref(t);
}
}
void session_tasklist_finalize(struct session *session, int status)
{
while (session->tasks.len > 0) {
struct qr_task *t = session->tasks.at[0];
array_del(session->tasks, 0);
if (session->sflags.outgoing) {
worker_task_finalize(t, status);
} else {
struct request_ctx *ctx = worker_task_get_request(t);
assert(worker_request_get_source_session(ctx) == session);
worker_request_set_source_session(ctx, NULL);
}
worker_task_unref(t);
}
}
void session_tasks_finalize(struct session *session, int status)
{
session_waitinglist_finalize(session, status);
session_tasklist_finalize(session, status);
}
int session_timer_start(struct session *session, uv_timer_cb cb,
uint64_t timeout, uint64_t repeat)
{
uv_timer_t *timer = &session->timeout;
assert(timer->data == session);
int ret = uv_timer_start(timer, cb, timeout, repeat);
if (ret != 0) {
uv_timer_stop(timer);
return kr_error(ENOMEM);
}
return 0;
}
int session_timer_restart(struct session *session)
{
return uv_timer_again(&session->timeout);
}
int session_timer_stop(struct session *session)
{
return uv_timer_stop(&session->timeout);
}
ssize_t session_wirebuf_consume(struct session *session, const uint8_t *data, ssize_t len)
{
if (data != &session->wire_buf[session->wire_buf_idx]) {
/* shouldn't happen */
return kr_error(EINVAL);
}
if (session->wire_buf_idx + len > session->wire_buf_size) {
/* shouldn't happen */
return kr_error(EINVAL);
}
session->wire_buf_idx += len;
return len;
}
knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm)
{
if (session->wire_buf_idx == 0) {
session->sflags.wirebuf_error = false;
return NULL;
}
const uv_handle_t *handle = session->handle;
uint8_t *msg_start = session->wire_buf;
uint16_t msg_size = session->wire_buf_idx;
session->sflags.wirebuf_error = true;
if (!handle) {
return NULL;
} else if (handle->type == UV_TCP) {
if (session->wire_buf_idx < 2) {
session->sflags.wirebuf_error = false;
return NULL;
}
msg_size = knot_wire_read_u16(session->wire_buf);
if (msg_size + 2 > session->wire_buf_idx) {
session->sflags.wirebuf_error = false;
return NULL;
}
msg_start += 2;
}
knot_pkt_t *pkt = knot_pkt_new(msg_start, msg_size, mm);
if (pkt) {
session->sflags.wirebuf_error = false;
}
return pkt;
}
int session_discard_packet(struct session *session, const knot_pkt_t *pkt)
{
uv_handle_t *handle = session->handle;
uint8_t *wirebuf_data_start = session->wire_buf;
size_t wirebuf_msg_data_size = session->wire_buf_idx;
uint8_t *wirebuf_msg_start = session->wire_buf;
size_t wirebuf_msg_size = session->wire_buf_idx;
uint8_t *pkt_msg_start = pkt->wire;
size_t pkt_msg_size = pkt->size;
session->sflags.wirebuf_error = true;
if (!handle) {
return kr_error(EINVAL);
} else if (handle->type == UV_TCP) {
if (session->wire_buf_idx < 2) {
return kr_error(EINVAL);
}
wirebuf_msg_size = knot_wire_read_u16(wirebuf_data_start);
wirebuf_msg_start += 2;
wirebuf_msg_data_size = wirebuf_msg_size + 2;
}
if (wirebuf_msg_start != pkt_msg_start || wirebuf_msg_size != pkt_msg_size) {
return kr_error(EINVAL);
}
if (wirebuf_msg_data_size > session->wire_buf_idx) {
return kr_error(EINVAL);
}
uint16_t wirebuf_data_amount = session->wire_buf_idx - wirebuf_msg_data_size;
if (wirebuf_data_amount) {
if (wirebuf_msg_data_size < wirebuf_data_amount) {
memmove(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size],
wirebuf_data_amount);
} else {
memcpy(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size],
wirebuf_data_amount);
}
}
session->wire_buf_idx = wirebuf_data_amount;
session->sflags.wirebuf_error = false;
return kr_ok();
}
bool session_wirebuf_error(struct session *session)
{
return session->sflags.wirebuf_error;
}
uint8_t *session_wirebuf_get_start(struct session *session)
{
return session->wire_buf;
}
size_t session_wirebuf_get_len(struct session *session)
{
return session->wire_buf_idx;
}
size_t session_wirebuf_get_size(struct session *session)
{
return sizeof(session->wire_buf);
}
uint8_t *session_wirebuf_get_free_start(struct session *session)
{
return &session->wire_buf[session->wire_buf_idx];
}
size_t session_wirebuf_get_free_size(struct session *session)
{
return session->wire_buf_size - session->wire_buf_idx;
}
void session_poison(struct session *session)
{
kr_asan_poison(session, sizeof(*session));
}
void session_unpoison(struct session *session)
{
kr_asan_unpoison(session, sizeof(*session));
}
int session_wirebuf_process(struct session *session)
{
int ret = 0;
if (session->wire_buf_idx == 0) {
return ret;
}
struct worker_ctx *worker = session_get_handle(session)->loop->data;
knot_pkt_t *query = NULL;
while (((query = session_produce_packet(session, &worker->pkt_pool)) != NULL) && (ret < 100)) {
worker_submit(session, query);
if (session_discard_packet(session, query) < 0) {
break;
}
ret += 1;
}
assert(ret < 100);
if (session_wirebuf_error(session)) {
ret = -1;
}
return ret;
}
/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <stdbool.h>
#include <uv.h>
#include "lib/generic/array.h"
struct qr_task;
struct worker_ctx;
struct session;
/* Allocate new session. */
struct session *session_new(void);
/* Clear and free given session. */
void session_free(struct session *s);
/* Clear session. */
void session_clear(struct session *s);
/** Close session. */
void session_close(struct session *session);
/** Start reading from underlying libuv IO handle. */
int session_start_read(struct session *session);
/** List of tasks been waiting for IO. */
/** Check if list is empty. */
bool session_waitinglist_is_empty(const struct session *session);
/** Get the first element. */
struct qr_task *session_waitinglist_get_first(const struct session *session);
/** Get the list length. */
size_t session_waitinglist_get_len(const struct session *session);
/** Add task to the list. */
int session_waitinglist_add(struct session *session, struct qr_task *task);
/** Remove task from the list. */
int session_waitinglist_del(struct session *session, struct qr_task *task);
/** Remove task from the list by index. */
int session_waitinglist_del_index(struct session *session, int index);
/** Retry resolution for each task in the list. */
void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt);
/** Finalize all tasks in the list. */
void session_waitinglist_finalize(struct session *session, int status);
/** List of tasks associated with session. */
/** Check if list is empty. */
bool session_tasklist_is_empty(const struct session *session);
/** Get the first element. */
struct qr_task *session_tasklist_get_first(const struct session *session);
/** Get the list length. */
size_t session_tasklist_get_len(const struct session *session);
/** Add task to the list. */
int session_tasklist_add(struct session *session, struct qr_task *task);
/** Remove task from the list. */
int session_tasklist_del(struct session *session, struct qr_task *task);
/** Remove task from the list by index. */
int session_tasklist_del_index(struct session *session, int index);
/** Find task with given msg_id */
struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id);
/** Finalize all tasks in the list. */
void session_tasklist_finalize(struct session *session, int status);
/** Both of task lists (associated & waiting). */
/** Check if empty. */
bool session_is_empty(const struct session *session);
/** Finalize all tasks. */
void session_tasks_finalize(struct session *session, int status);
/** Operations with flags */
bool session_is_outgoing(const struct session *session);
void session_set_outgoing(struct session *session, bool outgoing);
bool session_is_closing(const struct session *session);
void session_set_closing(struct session *session, bool closing);
bool session_is_connected(const struct session *session);
void session_set_connected(struct session *session, bool connected);
bool session_is_throttled(const struct session *session);
void session_set_throttled(struct session *session, bool throttled);
bool session_has_tls(const struct session *session);
void session_set_has_tls(struct session *session, bool has_tls);
bool session_wirebuf_error(struct session *session);
/** Get peer address. */
struct sockaddr *session_get_peer(struct session *session);
/** Get pointer to server-side tls-related data. */
struct tls_ctx_t *session_tls_get_server_ctx(const struct session *session);
/** Set pointer to server-side tls-related data. */
void session_tls_set_server_ctx(struct session *session, struct tls_ctx_t *ctx);
/** Get pointer to client-side tls-related data. */
struct tls_client_ctx_t *session_tls_get_client_ctx(const struct session *session);
/** Set pointer to client-side tls-related data. */
void session_tls_set_client_ctx(struct session *session, struct tls_client_ctx_t *ctx);
/** Get pointer to that part of tls-related data which has common structure for
* server and client. */
struct tls_common_ctx *session_tls_get_common_ctx(const struct session *session);
/** Get pointer to underlying libuv handle for IO operations. */
uv_handle_t *session_get_handle(struct session *session);
/** Set pointer to libuv handle for IO operations. */
int session_set_handle(struct session *session, uv_handle_t *handle);
/** Get pointer to session timer handle. */
uv_timer_t *session_get_timer(struct session *session);
/** Start session timer. */
int session_timer_start(struct session *session, uv_timer_cb cb,
uint64_t timeout, uint64_t repeat);
/** Restart session timer without changing it parameters. */
int session_timer_restart(struct session *session);
/** Stop session timer. */
int session_timer_stop(struct session *session);
/** Get start of session buffer for wire data. */
uint8_t *session_wirebuf_get_start(struct session *session);
/** Get size of session wirebuffer. */
size_t session_wirebuf_get_size(struct session *session);
/** Get length of data in the session wirebuffer. */
size_t session_wirebuf_get_len(struct session *session);
/** Get start of free space in session wirebuffer. */
uint8_t *session_wirebuf_get_free_start(struct session *session);
/** Get amount of free space in session wirebuffer. */
size_t session_wirebuf_get_free_size(struct session *session);
int session_wirebuf_process(struct session *session);
ssize_t session_wirebuf_consume(struct session *session,
const uint8_t *data, ssize_t len);
/** poison session structure with ASAN. */
void session_poison(struct session *session);
/** unpoison session structure with ASAN. */
void session_unpoison(struct session *session);
knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm);
int session_discard_packet(struct session *session, const knot_pkt_t *pkt);
......@@ -34,6 +34,7 @@
#include "daemon/io.h"
#include "daemon/tls.h"
#include "daemon/worker.h"
#include "daemon/session.h"
#define EPHEMERAL_CERT_EXPIRATION_SECONDS_RENEW_BEFORE 60*60*24*7
#define GNUTLS_PIN_MIN_VERSION 0x030400
......@@ -354,9 +355,10 @@ void tls_close(struct tls_common_ctx *ctx)
assert(ctx->session);
if (ctx->handshake_state == TLS_HS_DONE) {
const struct sockaddr *peer = session_get_peer(ctx->session);
kr_log_verbose("[%s] closing tls connection to `%s`\n",
ctx->client_side ? "tls_client" : "tls",
kr_straddr(&ctx->session->peer.ip));
kr_straddr(peer));
ctx->handshake_state = TLS_HS_CLOSING;
gnutls_bye(ctx->tls_session, GNUTLS_SHUT_RDWR);
}
......@@ -384,12 +386,11 @@ int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb
return kr_error(EINVAL);
}
struct session *session = handle->data;
struct tls_common_ctx *tls_ctx = session->outgoing ? &session->tls_client_ctx->c :
&session->tls_ctx->c;
struct session *s = handle->data;
struct tls_common_ctx *tls_ctx = session_tls_get_common_ctx(s);
assert (tls_ctx);
assert (session->outgoing == tls_ctx->client_side);
assert (session_is_outgoing(s) == tls_ctx->client_side);
const uint16_t pkt_size = htons(pkt->size);
const char *logstring = tls_ctx->client_side ? client_logstring : server_logstring;
......@@ -426,17 +427,16 @@ int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb
return kr_ok();
}
int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *buf, ssize_t nread)
ssize_t tls_process_input_data(struct session *s, const uint8_t *buf, ssize_t nread)
{
struct session *session = handle->data;
struct tls_common_ctx *tls_p = session->outgoing ? &session->tls_client_ctx->c :
&session->tls_ctx->c;
struct tls_common_ctx *tls_p = session_tls_get_common_ctx(s);
if (!tls_p) {
return kr_error(ENOSYS);
}
assert(tls_p->session == session);
assert(tls_p->session == s);
assert(tls_p->recv_buf == buf && nread <= sizeof(tls_p->recv_buf));
const char *logstring = tls_p->client_side ? client_logstring : server_logstring;
tls_p->buf = buf;
......@@ -455,9 +455,13 @@ int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *b
}
/* See https://gnutls.org/manual/html_node/Data-transfer-and-termination.html#Data-transfer-and-termination */
int submitted = 0;
ssize_t submitted = 0;
bool is_retrying = false;
uint64_t retrying_start = 0;
uint8_t *wire_buf = session_wirebuf_get_free_start(s);
size_t wire_buf_size = session_wirebuf_get_free_size(s);
while (true) {
ssize_t count = gnutls_record_recv(tls_p->tls_session, tls_p->recv_buf, sizeof(tls_p->recv_buf));
ssize_t count = gnutls_record_recv(tls_p->tls_session, wire_buf, wire_buf_size);
if (count == GNUTLS_E_AGAIN) {
break; /* No data available */
} else if (count == GNUTLS_E_INTERRUPTED) {
......@@ -479,17 +483,15 @@ int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *b
kr_log_verbose("[%s] gnutls_record_recv failed: %s (%zd)\n",
logstring, gnutls_strerror_name(count), count);
return kr_error(EIO);
}
DEBUG_MSG("[%s] submitting %zd data to worker\n", logstring, count);
int ret = worker_process_tcp(worker, handle, tls_p->recv_buf, count);
if (ret < 0) {
return ret;
}
if (count <= 0) {
} else if (count == 0) {
break;
}
submitted += ret;
DEBUG_MSG("[%s] received %zd data\n", logstring, count);
wire_buf += count;
wire_buf_size -= count;
submitted += count;
}
assert(tls_p->consumed == tls_p->nread);
return submitted;
}
......@@ -1127,13 +1129,13 @@ int tls_client_connect_start(struct tls_client_ctx_t *client_ctx,
return kr_error(EINVAL);
}
assert(session->outgoing && session->handle->type == UV_TCP);
assert(session_is_outgoing(session) && session_get_handle(session)->type == UV_TCP);
struct tls_common_ctx *ctx = &client_ctx->c;
gnutls_session_set_ptr(ctx->tls_session, client_ctx);
gnutls_handshake_set_timeout(ctx->tls_session, ctx->worker->engine->net.tcp.tls_handshake_timeout);
session->tls_client_ctx = client_ctx;
session_tls_set_client_ctx(session, client_ctx);
ctx->handshake_cb = handshake_cb;
ctx->handshake_state = TLS_HS_IN_PROGRESS;
ctx->session = session;
......
......@@ -134,7 +134,7 @@ int tls_write(uv_write_t *req, uv_handle_t* handle, knot_pkt_t * pkt, uv_write_c
/*! Unwrap incoming data from a TLS stream and pass them to TCP session.
* @return the number of newly-completed requests (>=0) or an error code
*/
int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *buf, ssize_t nread);
ssize_t tls_process_input_data(struct session *s, const uint8_t *buf, ssize_t nread);
/*! Set TLS certificate and key from files. */
int tls_certificate_set(struct network *net, const char *tls_cert, const char *tls_key);
......
This diff is collapsed.
......@@ -37,30 +37,17 @@ struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
/**
* Process an incoming packet (query from a client or answer from upstream).
*
* @param worker the singleton worker
* @param handle socket through which the request came
* @param query the packet, or NULL on an error from the transport layer
* @param addr the address from which the packet came (or NULL, possibly, on error)
* @param session session the where packet came from
* @param query the packet, or NULL on an error from the transport layer
* @return 0 or an error code
*/
int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query,
const struct sockaddr* addr);
/**
* Process incoming DNS message fragment(s) that arrived over a stream (TCP, TLS).
*
* If the fragment contains only a partial message, it is buffered.
* If the fragment contains a complete query or completes current fragment, execute it.
* @return the number of newly-completed requests (>=0) or an error code
*/
int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
const uint8_t *msg, ssize_t len);
int worker_submit(struct session *session, knot_pkt_t *query);
/**
* End current DNS/TCP session, this disassociates pending tasks from this session
* which may be freely closed afterwards.
*/
int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle);
int worker_end_tcp(struct session *s);
/**
* Start query resolution with given query.
......@@ -83,16 +70,48 @@ struct kr_request *worker_task_request(struct qr_task *task);
/** Collect worker mempools */
void worker_reclaim(struct worker_ctx *worker);
/** Closes given session */
void worker_session_close(struct session *session);
struct session *worker_session_borrow(struct worker_ctx *worker);
void worker_session_release(struct worker_ctx *worker, uv_handle_t *handle);
void *worker_iohandle_borrow(struct worker_ctx *worker);
void worker_iohandle_release(struct worker_ctx *worker, void *h);
ssize_t worker_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len);
ssize_t worker_gnutls_client_push(gnutls_transport_ptr_t h, const void *buf, size_t len);
int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
knot_pkt_t *packet);
int worker_task_numrefs(const struct qr_task *task);
/** Finalize given task */
int worker_task_finalize(struct qr_task *task, int state);
void worker_task_complete(struct qr_task *task);
void worker_task_ref(struct qr_task *task);
void worker_task_unref(struct qr_task *task);
void worker_task_timeout_inc(struct qr_task *task);
int worker_add_tcp_connected(struct worker_ctx *worker,
const struct sockaddr *addr,
struct session *session);
int worker_del_tcp_connected(struct worker_ctx *worker,
const struct sockaddr *addr);
knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task);
struct request_ctx *worker_task_get_request(struct qr_task *task);
struct session *worker_request_get_source_session(struct request_ctx *);
void worker_request_set_source_session(struct request_ctx *, struct session *session);
/** @cond internal */
/** Number of request within timeout window. */
......@@ -107,9 +126,6 @@ typedef array_t(void *) mp_freelist_t;
/** List of query resolution tasks. */
typedef array_t(struct qr_task *) qr_tasklist_t;
/** Session list. */
typedef array_t(struct session *) qr_sessionlist_t;
/** \details Worker state is meant to persist during the whole life of daemon. */
struct worker_ctx {
struct engine *engine;
......
......@@ -91,8 +91,12 @@ void __asan_poison_memory_region(void const volatile *addr, size_t size);
void __asan_unpoison_memory_region(void const volatile *addr, size_t size);
#define kr_asan_poison(addr, size) __asan_poison_memory_region((addr), (size))
#define kr_asan_unpoison(addr, size) __asan_unpoison_memory_region((addr), (size))
#define kr_asan_custom_poison(fn, addr) fn ##_poison((addr))
#define kr_asan_custom_unpoison(fn, addr) fn ##_unpoison((addr))
#else
#define kr_asan_poison(addr, size)
#define kr_asan_unpoison(addr, size)
#define kr_asan_custom_poison(fn, addr)
#define kr_asan_custom_unpoison(fn, addr)
#endif
/* @endcond */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment