Skip to content
Snippets Groups Projects
  • Daniel Kahn Gillmor's avatar
    listen using TLS on specific sockets · 1e0a8b9d
    Daniel Kahn Gillmor authored
    kresd has --tls/-t by analogy with --addr/-a where the daemon opens
    the socket itself.
    
    This changeset adds equivalent functionality for inherited sockets:
    --tlsfd/-T by analogy with --fd/-Sa
    1e0a8b9d
io.c 10.60 KiB
/*  Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

#include <string.h>
#include <libknot/errcode.h>
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
#include <assert.h>

#include "daemon/io.h"
#include "daemon/network.h"
#include "daemon/worker.h"
#include "daemon/tls.h"

#define negotiate_bufsize(func, handle, bufsize_want) do { \
    int bufsize = 0; func(handle, &bufsize); \
	if (bufsize < bufsize_want) { \
		bufsize = bufsize_want; \
		func(handle, &bufsize); \
	} \
} while (0)

static void check_bufsize(uv_handle_t* handle)
{
	/* We want to buffer at least N waves in advance.
	 * This is magic presuming we can pull in a whole recvmmsg width in one wave.
	 * Linux will double this the bufsize wanted.
	 */
	const int bufsize_want = RECVMMSG_BATCH * 65535 * 2;
	negotiate_bufsize(uv_recv_buffer_size, handle, bufsize_want);
	negotiate_bufsize(uv_send_buffer_size, handle, bufsize_want);
}

#undef negotiate_bufsize

static void session_clear(struct session *s)
{
	assert(s->outgoing || s->tasks.len == 0);
	array_clear(s->tasks);
	tls_free(s->tls_ctx);
	memset(s, 0, sizeof(*s));
}

void session_free(struct session *s)
{
	if (s) {
		session_clear(s);
		free(s);
	}
}

struct session *session_new(void)
{
	return calloc(1, sizeof(struct session));
}

static struct session *session_borrow(struct worker_ctx *worker)
{
	struct session *s = NULL;
	if (worker->pool_sessions.len > 0) {
		s = array_tail(worker->pool_sessions);
		array_pop(worker->pool_sessions);
		kr_asan_unpoison(s, sizeof(*s));
	} else {
		s = session_new();
	}
	return s;
}

static void session_release(struct worker_ctx *worker, struct session *s)
{
	if (!s) {
		return;
	}
	if (worker->pool_sessions.len < MP_FREELIST_SIZE) {
		session_clear(s);
		array_push(worker->pool_sessions, s);
		kr_asan_poison(s, sizeof(*s));
	} else {
		session_free(s);
	}
}

static uv_stream_t *handle_alloc(uv_loop_t *loop)
{
	uv_stream_t *handle = calloc(1, sizeof(*handle));
	if (!handle) {
		return NULL;
	}

	return handle;
}

static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
	/* Worker has single buffer which is reused for all incoming
	 * datagrams / stream reads, the content of the buffer is
	 * guaranteed to be unchanged only for the duration of
	 * udp_read() and tcp_read().
	 */
	struct session *session = handle->data;
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
	buf->base = (char *)worker->wire_buf;
	/* Limit TCP stream buffer size to 4K for granularity in batches of incoming queries. */
	if (handle->type == UV_TCP) {
		buf->len = MIN(suggested_size, 4096);
	/* Regular buffer size for subrequests. */
	} else if (session->outgoing) {
		buf->len = suggested_size;
	/* Use recvmmsg() on master sockets if possible. */
	} else {
		buf->len = sizeof(worker->wire_buf);
	}
}

void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
	const struct sockaddr *addr, unsigned flags)
{
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
	if (nread <= 0) {
		if (nread < 0) { /* Error response, notify resolver */
			worker_submit(worker, (uv_handle_t *)handle, NULL, addr);
		} /* nread == 0 is for freeing buffers, we don't need to do this */
		return;
	}

	knot_pkt_t *query = knot_pkt_new(buf->base, nread, &worker->pkt_pool);
	if (query) {
		query->max_size = KNOT_WIRE_MAX_PKTSIZE;
		worker_submit(worker, (uv_handle_t *)handle, query, addr);
	}
	mp_flush(worker->pkt_pool.ctx);
}

static int udp_bind_finalize(uv_handle_t *handle)
{
	check_bufsize((uv_handle_t *)handle);
	/* Handle is already created, just create context. */
	handle->data = session_new();
	assert(handle->data);
	return io_start_read((uv_handle_t *)handle);
}

int udp_bind(uv_udp_t *handle, struct sockaddr *addr)
{
	unsigned flags = UV_UDP_REUSEADDR;
	if (addr->sa_family == AF_INET6) {
		flags |= UV_UDP_IPV6ONLY;
	}
	int ret = uv_udp_bind(handle, addr, flags);
	if (ret != 0) {
		return ret;
	}
	return udp_bind_finalize((uv_handle_t *)handle);
}

int udp_bindfd(uv_udp_t *handle, int fd)
{
	if (!handle) {
		return kr_error(EINVAL);
	}

	int ret = uv_udp_open(handle, (uv_os_sock_t) fd);
	if (ret != 0) {
		return ret;
	}
	return udp_bind_finalize((uv_handle_t *)handle);
}

static void tcp_timeout(uv_handle_t *timer)
{
	uv_handle_t *handle = timer->data;
	uv_close(handle, io_free);
}

static void tcp_timeout_trigger(uv_timer_t *timer)
{
	uv_handle_t *handle = timer->data;
	struct session *session = handle->data;
	if (session->tasks.len > 0) {
		uv_timer_again(timer);
	} else {
		uv_close((uv_handle_t *)timer, tcp_timeout);
	}
}

static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
	uv_loop_t *loop = handle->loop;
	struct session *s = handle->data;
	struct worker_ctx *worker = loop->data;
	/* TCP pipelining is rather complicated and requires cooperation from the worker
	 * so the whole message reassembly and demuxing logic is inside worker */
	int ret = 0;
	if (s->has_tls) {
		ret = tls_process(worker, handle, (const uint8_t *)buf->base, nread);
	} else {
		ret = worker_process_tcp(worker, handle, (const uint8_t *)buf->base, nread);
	}
	if (ret < 0) {
		worker_end_tcp(worker, (uv_handle_t *)handle);
		/* Exceeded per-connection quota for outstanding requests
		 * stop reading from stream and close after last message is processed. */
		if (!s->outgoing && !uv_is_closing((uv_handle_t *)&s->timeout)) {
			uv_timer_stop(&s->timeout);
			if (s->tasks.len == 0) {
				uv_close((uv_handle_t *)&s->timeout, tcp_timeout);
			} else { /* If there are tasks running, defer until they finish. */
				uv_timer_start(&s->timeout, tcp_timeout_trigger, 1, KR_CONN_RTT_MAX/2);
			}
		}
	/* Connection spawned more than one request, reset its deadline for next query. */
	} else if (ret > 0 && !s->outgoing) {
		uv_timer_again(&s->timeout);
	}
	mp_flush(worker->pkt_pool.ctx);
}

static void _tcp_accept(uv_stream_t *master, int status, bool tls)
{
	if (status != 0) {
		return;
	}
	uv_stream_t *client = handle_alloc(master->loop);
	if (!client) {
		return;
	}
	memset(client, 0, sizeof(*client));
	io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM);
	if (uv_accept(master, client) != 0) {
		uv_close((uv_handle_t *)client, io_free);
		return;
	}

	/* Set deadlines for TCP connection and start reading.
	 * It will re-check every half of a request time limit if the connection
	 * is idle and should be terminated, this is an educated guess. */
	struct session *session = client->data;
	session->has_tls = tls;
	if (tls && !session->tls_ctx) {
		session->tls_ctx = tls_new(master->loop->data);
	}
	uv_timer_t *timer = &session->timeout;
	uv_timer_init(master->loop, timer);
	timer->data = client;
	uv_timer_start(timer, tcp_timeout_trigger, KR_CONN_RTT_MAX/2, KR_CONN_RTT_MAX/2);
	io_start_read((uv_handle_t *)client);
}

static void tcp_accept(uv_stream_t *master, int status)
{
	_tcp_accept(master, status, false);
}

static void tls_accept(uv_stream_t *master, int status)
{
	_tcp_accept(master, status, true);
}

static int set_tcp_option(uv_handle_t *handle, int option, int val)
{
	uv_os_fd_t fd = 0;
	if (uv_fileno(handle, &fd) == 0) {
		return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val));
	}
	return 0; /* N/A */
}

static int tcp_bind_finalize(uv_handle_t *handle)
{
	/* TCP_FASTOPEN enables 1 RTT connection resumptions. */
#ifdef TCP_FASTOPEN
# ifdef __linux__
	(void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */
# else
	(void) set_tcp_option(handle, TCP_FASTOPEN, 1);  /* Accepts on/off */
# endif
#endif

	handle->data = NULL;
	return 0;
}

static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb connection)
{
	unsigned flags = 0;
	if (addr->sa_family == AF_INET6) {
		flags |= UV_TCP_IPV6ONLY;
	}

	int ret = uv_tcp_bind(handle, addr, flags);
	if (ret != 0) {
		return ret;
	}

	/* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
#ifdef TCP_DEFER_ACCEPT
	if (set_tcp_option((uv_handle_t *)handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) {
		kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno));
	}
#endif

	ret = uv_listen((uv_stream_t *)handle, 16, connection);
	if (ret != 0) {
		return ret;
	}

	return tcp_bind_finalize((uv_handle_t *)handle);
}

int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr)
{
	return _tcp_bind(handle, addr, tcp_accept);
}

int tcp_bind_tls(uv_tcp_t *handle, struct sockaddr *addr)
{
	return _tcp_bind(handle, addr, tls_accept);
}

static int _tcp_bindfd(uv_tcp_t *handle, int fd, uv_connection_cb connection)
{
	if (!handle) {
		return kr_error(EINVAL);
	}

	int ret = uv_tcp_open(handle, (uv_os_sock_t) fd);
	if (ret != 0) {
		return ret;
	}

	ret = uv_listen((uv_stream_t *)handle, 16, connection);
	if (ret != 0) {
		return ret;
	}
	return tcp_bind_finalize((uv_handle_t *)handle);
}

int tcp_bindfd(uv_tcp_t *handle, int fd)
{
	return _tcp_bindfd(handle, fd, tcp_accept);
}

int tcp_bindfd_tls(uv_tcp_t *handle, int fd)
{
	return _tcp_bindfd(handle, fd, tls_accept);
}

void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
{
	if (type == SOCK_DGRAM) {
		uv_udp_init(loop, (uv_udp_t *)handle);
	} else {
		uv_tcp_init(loop, (uv_tcp_t *)handle);
		uv_tcp_nodelay((uv_tcp_t *)handle, 1);
	}

	struct worker_ctx *worker = loop->data;
	handle->data = session_borrow(worker);
	assert(handle->data);
}

void io_deinit(uv_handle_t *handle)
{
	if (!handle) {
		return;
	}
	uv_loop_t *loop = handle->loop;
	if (loop && loop->data) {
		struct worker_ctx *worker = loop->data;
		session_release(worker, handle->data);
	} else {
		session_free(handle->data);
	}
	handle->data = NULL;
}

void io_free(uv_handle_t *handle)
{
	if (!handle) {
		return;
	}
	io_deinit(handle);
	free(handle);
}

int io_start_read(uv_handle_t *handle)
{
	if (handle->type == UV_UDP) {
		return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
	} else {
		return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
	}
}

int io_stop_read(uv_handle_t *handle)
{
	if (handle->type == UV_UDP) {
		return uv_udp_recv_stop((uv_udp_t *)handle);
	} else {
		return uv_read_stop((uv_stream_t *)handle);
	}
}