From c99d8e6b1b9983b1a2ee907fc72f6f30ba673d61 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Vladim=C3=ADr=20=C4=8Cun=C3=A1t?= <vladimir.cunat@nic.cz>
Date: Fri, 12 Jul 2019 16:22:57 +0200
Subject: [PATCH] daemon: use sendmmsg towards UDP clients

---
 .gitlab-ci.yml     |   3 +-
 daemon/main.c      |  38 +++++++----
 daemon/meson.build |   1 +
 daemon/session.h   |   2 +
 daemon/udp_queue.c | 163 +++++++++++++++++++++++++++++++++++++++++++++
 daemon/udp_queue.h |  28 ++++++++
 daemon/worker.c    |  34 ++++++++--
 daemon/worker.h    |   2 +
 meson.build        |  14 ++++
 meson_options.txt  |  12 ++++
 10 files changed, 275 insertions(+), 22 deletions(-)
 create mode 100644 daemon/udp_queue.c
 create mode 100644 daemon/udp_queue.h

diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 04eb74194..b998a21f9 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -55,7 +55,8 @@ archive:
 build:
   <<: *build
   script:
-    - meson build_ci --default-library=static --prefix=$PREFIX -Dwerror=true -Dextra_tests=enabled
+      # sendmmsg: deckard can't handle that syscall
+    - meson build_ci --default-library=static --prefix=$PREFIX -Dwerror=true -Dextra_tests=enabled -Dsendmmsg=disabled
     - ninja -C build_ci
     - ninja -C build_ci install >/dev/null
     - ${MESON_TEST} --suite unit --suite config
diff --git a/daemon/main.c b/daemon/main.c
index 9ab8f1019..0678f7e88 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -14,6 +14,22 @@
     along with this program.  If not, see <https://www.gnu.org/licenses/>.
  */
 
+#include "kresconfig.h"
+
+#include "contrib/ccan/asprintf/asprintf.h"
+#include "contrib/cleanup.h"
+#include "contrib/ucw/mempool.h"
+#include "daemon/engine.h"
+#include "daemon/io.h"
+#include "daemon/network.h"
+#include "daemon/tls.h"
+#include "daemon/udp_queue.h"
+#include "daemon/worker.h"
+#include "lib/defines.h"
+#include "lib/dnssec.h"
+#include "lib/dnssec/ta.h"
+#include "lib/resolve.h"
+
 #include <arpa/inet.h>
 #include <assert.h>
 #include <getopt.h>
@@ -23,8 +39,6 @@
 #include <string.h>
 #include <unistd.h>
 
-#include "kresconfig.h"
-
 #include <lua.h>
 #include <uv.h>
 #if SYSTEMD_VERSION > 0
@@ -32,18 +46,6 @@
 #endif
 #include <libknot/error.h>
 
-#include <contrib/cleanup.h>
-#include <contrib/ucw/mempool.h>
-#include <contrib/ccan/asprintf/asprintf.h>
-#include "lib/defines.h"
-#include "lib/resolve.h"
-#include "lib/dnssec.h"
-#include "daemon/io.h"
-#include "daemon/network.h"
-#include "daemon/worker.h"
-#include "daemon/engine.h"
-#include "daemon/tls.h"
-#include "lib/dnssec/ta.h"
 
 /* @internal Array of ip address shorthand. */
 typedef array_t(char*) addr_array_t;
@@ -809,6 +811,14 @@ int main(int argc, char **argv)
 		goto cleanup;
 	}
 
+	ret = udp_queue_init_global(loop);
+	if (ret) {
+		kr_log_error("[system] failed to initialize UDP queue: %s\n",
+				kr_strerror(ret));
+		ret = EXIT_FAILURE;
+		goto cleanup;
+	}
+
 	/* Start the scripting engine */
 	if (engine_load_sandbox(&engine) != 0) {
 		ret = EXIT_FAILURE;
diff --git a/daemon/meson.build b/daemon/meson.build
index 4b9805275..a7db4d361 100644
--- a/daemon/meson.build
+++ b/daemon/meson.build
@@ -16,6 +16,7 @@ kresd_src = files([
   'tls.c',
   'tls_ephemeral_credentials.c',
   'tls_session_ticket-srv.c',
+  'udp_queue.c',
   'worker.c',
   'zimport.c',
 ])
diff --git a/daemon/session.h b/daemon/session.h
index 56f7eb4aa..7b261a4c1 100644
--- a/daemon/session.h
+++ b/daemon/session.h
@@ -16,6 +16,8 @@
 
 #pragma once
 
+#include <libknot/packet/pkt.h>
+
 #include <stdbool.h>
 #include <uv.h>
 
diff --git a/daemon/udp_queue.c b/daemon/udp_queue.c
new file mode 100644
index 000000000..6d1fec0d8
--- /dev/null
+++ b/daemon/udp_queue.c
@@ -0,0 +1,163 @@
+/*  Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "kresconfig.h"
+#include "daemon/udp_queue.h"
+
+#if !ENABLE_SENDMMSG
+int udp_queue_init_global(uv_loop_t *loop)
+{
+	return 0;
+}
+#else
+
+#include "daemon/session.h"
+#include "daemon/worker.h"
+#include "lib/generic/array.h"
+#include "lib/utils.h"
+
+struct qr_task;
+
+#include <assert.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+
+/* LATER: it might be useful to have this configurable during runtime,
+ * but the structures below would have to change a little (broken up). */
+#define UDP_QUEUE_LEN 64
+
+/** A queue of up to UDP_QUEUE_LEN messages, meant for the same socket. */
+typedef struct {
+	int len; /**< The number of messages in the queue: 0..UDP_QUEUE_LEN */
+	struct mmsghdr msgvec[UDP_QUEUE_LEN]; /**< Parameter for sendmmsg() */
+	struct {
+		struct qr_task *task; /**< Links for completion callbacks. */
+		struct iovec msg_iov[1]; /**< storage for .msgvec[i].msg_iov */
+	} items[UDP_QUEUE_LEN];
+} udp_queue_t;
+
+static udp_queue_t * udp_queue_create()
+{
+	udp_queue_t *q = calloc(1, sizeof(*q));
+	for (int i = 0; i < UDP_QUEUE_LEN; ++i) {
+		struct msghdr *mhi = &q->msgvec[i].msg_hdr;
+		/* These shall remain always the same. */
+		mhi->msg_iov = q->items[i].msg_iov;
+		mhi->msg_iovlen = 1;
+		/* msg_name and msg_namelen will be per-call,
+		 * and the rest is OK to remain zeroed all the time. */
+	}
+	return q;
+}
+
+/** Global state for udp_queue_*.  Note: we never free the pointed-to memory. */
+struct {
+	/** Singleton map: fd -> udp_queue_t, as a simple array of pointers. */
+	udp_queue_t **udp_queues;
+	int udp_queues_len;
+
+	/** List of FD numbers that might have a non-empty queue. */
+	array_t(int) waiting_fds;
+
+	uv_check_t check_handle;
+} static state = {0};
+
+/** Empty the given queue.  The queue is assumed to exist (but may be empty). */
+static void udp_queue_send(int fd)
+{
+	udp_queue_t *const q = state.udp_queues[fd];
+	if (!q->len) return;
+	int sent_len = sendmmsg(fd, q->msgvec, q->len, 0);
+	/* ATM we don't really do anything about failures. */
+	int err = sent_len < 0 ? errno : EAGAIN /* unknown error, really */;
+	if (unlikely(sent_len != q->len)) {
+		if (err != EWOULDBLOCK) {
+			kr_log_error("ERROR: udp sendmmsg() sent %d / %d; %s\n",
+					sent_len, q->len, strerror(err));
+		} else {
+			const uint64_t stamp_now = kr_now();
+			static uint64_t stamp_last = 0;
+			if (stamp_now > stamp_last + 60*1000) {
+				kr_log_info("WARNING: dropped UDP reply packet(s) due to network overload (reported at most once per minute)\n");
+				stamp_last = stamp_now;
+			}
+		}
+	}
+	for (int i = 0; i < q->len; ++i) {
+		qr_task_on_send(q->items[i].task, NULL, i < sent_len ? 0 : err);
+	}
+	q->len = 0;
+}
+
+/** Periodical callback to send all queued packets. */
+static void udp_queue_check(uv_check_t *handle)
+{
+	for (int i = 0; i < state.waiting_fds.len; ++i) {
+		udp_queue_send(state.waiting_fds.at[i]);
+	}
+	state.waiting_fds.len = 0;
+}
+
+int udp_queue_init_global(uv_loop_t *loop)
+{
+	int ret = uv_check_init(loop, &state.check_handle);
+	if (!ret) ret = uv_check_start(&state.check_handle, udp_queue_check);
+	return ret;
+}
+
+void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
+{
+	if (fd < 0) {
+		kr_log_error("ERROR: called udp_queue_push(fd = %d, ...)\n", fd);
+		abort();
+	}
+	/* Get a valid correct queue. */
+	if (fd >= state.udp_queues_len) {
+		const int new_len = fd + 1;
+		state.udp_queues = realloc(state.udp_queues,
+					sizeof(state.udp_queues[0]) * new_len);
+		if (!state.udp_queues) abort();
+		memset(state.udp_queues + state.udp_queues_len, 0,
+			sizeof(state.udp_queues[0]) * (new_len - state.udp_queues_len));
+		state.udp_queues_len = new_len;
+	}
+	if (unlikely(state.udp_queues[fd] == NULL))
+		state.udp_queues[fd] = udp_queue_create();
+	udp_queue_t *const q = state.udp_queues[fd];
+
+	/* Append to the queue */
+	struct sockaddr *sa = (struct sockaddr *)/*const-cast*/req->qsource.addr;
+	q->msgvec[q->len].msg_hdr.msg_name = sa;
+	q->msgvec[q->len].msg_hdr.msg_namelen = kr_sockaddr_len(sa);
+	q->items[q->len].task = task;
+	q->items[q->len].msg_iov[0] = (struct iovec){
+		.iov_base = req->answer->wire,
+		.iov_len  = req->answer->size,
+	};
+	if (q->len == 0)
+		array_push(state.waiting_fds, fd);
+	++(q->len);
+
+	if (q->len >= UDP_QUEUE_LEN) {
+		assert(q->len == UDP_QUEUE_LEN);
+		udp_queue_send(fd);
+		/* We don't need to search state.waiting_fds;
+		 * anyway, it's more efficient to let the hook do that. */
+	}
+}
+
+#endif
+
diff --git a/daemon/udp_queue.h b/daemon/udp_queue.h
new file mode 100644
index 000000000..c1730c056
--- /dev/null
+++ b/daemon/udp_queue.h
@@ -0,0 +1,28 @@
+/*  Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <uv.h>
+struct kr_request;
+struct qr_task;
+
+/** Initialize the global state for udp_queue. */
+int udp_queue_init_global(uv_loop_t *loop);
+
+/** Send req->answer via UDP, possibly not immediately. */
+void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task);
+
diff --git a/daemon/worker.c b/daemon/worker.c
index 313fe86d1..0cbbcc7c0 100644
--- a/daemon/worker.c
+++ b/daemon/worker.c
@@ -14,6 +14,9 @@
     along with this program.  If not, see <https://www.gnu.org/licenses/>.
  */
 
+#include "kresconfig.h"
+#include "daemon/worker.h"
+
 #include <uv.h>
 #include <lua.h>
 #include <lauxlib.h>
@@ -29,15 +32,16 @@
 #include <sys/types.h>
 #include <unistd.h>
 #include <gnutls/gnutls.h>
-#include "lib/utils.h"
-#include "lib/layer.h"
-#include "daemon/worker.h"
+
 #include "daemon/bindings/api.h"
 #include "daemon/engine.h"
 #include "daemon/io.h"
+#include "daemon/session.h"
 #include "daemon/tls.h"
+#include "daemon/udp_queue.h"
 #include "daemon/zimport.h"
-#include "daemon/session.h"
+#include "lib/layer.h"
+#include "lib/utils.h"
 
 
 /* Magic defaults for the worker. */
@@ -510,7 +514,7 @@ static void qr_task_complete(struct qr_task *task)
 }
 
 /* This is called when we send subrequest / answer */
-static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
+int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
 {
 
 	if (task->finished) {
@@ -1180,10 +1184,26 @@ static int qr_task_finalize(struct qr_task *task, int state)
 	/* Send back answer */
 	assert(!session_flags(source_session)->closing);
 	assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
-	int res = qr_task_send(task, source_session,
+
+	int ret;
+	const uv_handle_t *src_handle = session_get_handle(source_session);
+	if (src_handle->type != UV_UDP && src_handle->type != UV_TCP) {
+		assert(false);
+		ret = kr_error(EINVAL);
+	} else if (src_handle->type == UV_UDP && ENABLE_SENDMMSG) {
+		/* TODO: this is an ugly way of getting the FD number, as we're
+		 * touching a private field of UV.  We might want to e.g. pass
+		 * a pointer to struct endpoint in kr_request::qsource. */
+		const int fd = ((const uv_udp_t *)src_handle)->io_watcher.fd;
+		udp_queue_push(fd, &ctx->req, task);
+		ret = 0;
+	} else {
+		ret = qr_task_send(task, source_session,
 			       (struct sockaddr *)&ctx->source.addr,
 			        ctx->req.answer);
-	if (res != kr_ok()) {
+	}
+
+	if (ret != kr_ok()) {
 		(void) qr_task_on_send(task, NULL, kr_error(EIO));
 		/* Since source session is erroneous detach all tasks. */
 		while (!session_tasklist_is_empty(source_session)) {
diff --git a/daemon/worker.h b/daemon/worker.h
index 4e823c2c0..7b1a84c08 100644
--- a/daemon/worker.h
+++ b/daemon/worker.h
@@ -118,6 +118,8 @@ uint64_t worker_task_creation_time(struct qr_task *task);
 void worker_task_subreq_finalize(struct qr_task *task);
 bool worker_task_finished(struct qr_task *task);
 
+/** To be called after sending a DNS message.  It mainly deals with cleanups. */
+int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status);
 
 /** Various worker statistics.  Sync with wrk_stats() */
 struct worker_stats {
diff --git a/meson.build b/meson.build
index 06868beaa..f0ca01b1a 100644
--- a/meson.build
+++ b/meson.build
@@ -83,6 +83,17 @@ verbose_log = get_option('verbose_log') == 'enabled' or get_option('verbose_log'
 user = get_option('user')
 group = get_option('group')
 
+## sendmmsg
+has_sendmmsg = meson.get_compiler('c').has_function('sendmmsg',
+  prefix: '#define _GNU_SOURCE\n#include <sys/socket.h>')
+if get_option('sendmmsg') == 'enabled' and not has_sendmmsg
+  error('missing compiler function: sendmmsg(), use -Dsendmmsg=disabled')
+elif get_option('sendmmsg') == 'auto'
+  sendmmsg = has_sendmmsg
+else
+  sendmmsg = get_option('sendmmsg') == 'enabled'
+endif
+
 ## Systemd
 message('--- systemd socket activation ---')
 libsystemd = dependency('libsystemd', required: false)
@@ -137,6 +148,7 @@ conf_data.set_quoted('libknot_SONAME',
 conf_data.set('SYSTEMD_VERSION',
   libsystemd.found() ? libsystemd.version().to_int() : -1)
 conf_data.set('NOVERBOSELOG', not verbose_log)
+conf_data.set('ENABLE_SENDMMSG', sendmmsg.to_int())
 
 kresconfig = configure_file(
   output: 'kresconfig.h',
@@ -249,6 +261,7 @@ s_build_unit_tests = build_unit_tests ? 'enabled' : 'disabled'
 s_build_config_tests = build_config_tests ? 'enabled' : 'disabled'
 s_build_extra_tests = build_extra_tests ? 'enabled' : 'disabled'
 s_install_kresd_conf = install_kresd_conf ? 'enabled' : 'disabled'
+s_sendmmsg = sendmmsg ? 'enabled': 'disabled'
 message('''
 
 ======================= SUMMARY =======================
@@ -281,6 +294,7 @@ message('''
     user:               @0@'''.format(user) + '''
     group:              @0@'''.format(group) + '''
     install_kresd_conf: @0@'''.format(s_install_kresd_conf) + '''
+    sendmmsg:           @0@'''.format(s_sendmmsg) + '''
 
 =======================================================
 
diff --git a/meson_options.txt b/meson_options.txt
index bdb6fdf9d..e48cf67a3 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -63,6 +63,18 @@ option(
   description: 'group which is used for running kresd',
 )
 
+option(
+  'sendmmsg',
+  type: 'combo',
+  choices: [
+    'auto',
+    'enabled',
+    'disabled',
+  ],
+  value: 'auto',
+  description: 'use sendmmsg syscall towards clients',
+)
+
 ## Systemd
 option(
   'systemd_files',
-- 
GitLab