From 854a0cf5b4254a33e0689d265064dcc5eb31f84c Mon Sep 17 00:00:00 2001
From: Libor Peltan <libor.peltan@nic.cz>
Date: Fri, 30 Apr 2021 16:46:09 +0200
Subject: [PATCH] implemented server answering TCP over XDP

---
 Knot.files                    |   2 +
 src/contrib/dynarray.h        |  14 ++-
 src/knot/Makefile.inc         |   2 +
 src/knot/include/module.h     |   2 +-
 src/knot/server/udp-handler.c |  60 +---------
 src/knot/server/xdp-handler.c | 214 ++++++++++++++++++++++++++++++++++
 src/knot/server/xdp-handler.h |  60 ++++++++++
 7 files changed, 293 insertions(+), 61 deletions(-)
 create mode 100644 src/knot/server/xdp-handler.c
 create mode 100644 src/knot/server/xdp-handler.h

diff --git a/Knot.files b/Knot.files
index b3b07ee304..d53806ef8b 100644
--- a/Knot.files
+++ b/Knot.files
@@ -259,6 +259,8 @@ src/knot/server/tcp-handler.c
 src/knot/server/tcp-handler.h
 src/knot/server/udp-handler.c
 src/knot/server/udp-handler.h
+src/knot/server/xdp-handler.c
+src/knot/server/xdp-handler.h
 src/knot/updates/acl.c
 src/knot/updates/acl.h
 src/knot/updates/apply.c
diff --git a/src/contrib/dynarray.h b/src/contrib/dynarray.h
index 4e9f20435f..8065c6f9d1 100644
--- a/src/contrib/dynarray.h
+++ b/src/contrib/dynarray.h
@@ -50,7 +50,7 @@
 	} prefix ## _dynarray_t; \
 	\
 	visibility ntype *prefix ## _dynarray_arr(prefix ## _dynarray_t *dynarray); \
-	visibility void prefix ## _dynarray_add(prefix ## _dynarray_t *dynarray, \
+	visibility ntype *prefix ## _dynarray_add(prefix ## _dynarray_t *dynarray, \
 	                                        ntype const *to_add); \
 	visibility void prefix ## _dynarray_free(prefix ## _dynarray_t *dynarray);
 
@@ -88,11 +88,11 @@
 	} \
 	\
 	_unused_ \
-	visibility void prefix ## _dynarray_add(struct prefix ## _dynarray *dynarray, \
-	                                        ntype const *to_add) \
+	visibility ntype *prefix ## _dynarray_add(struct prefix ## _dynarray *dynarray, \
+	                                          ntype const *to_add) \
 	{ \
 		if (dynarray->capacity < 0) { \
-			return; \
+			return NULL; \
 		} \
 		if (dynarray->capacity == 0) { \
 			dynarray->capacity = sizeof(dynarray->init) / sizeof(*dynarray->init); \
@@ -104,7 +104,7 @@
 			if (new_arr == NULL) { \
 				prefix ## _dynarray_free__(dynarray); \
 				dynarray->capacity = dynarray->size = -1; \
-				return; \
+				return NULL; \
 			} \
 			if (dynarray->capacity > 0) { \
 				memcpy(new_arr, prefix ## _dynarray_arr(dynarray), \
@@ -115,7 +115,9 @@
 			dynarray->capacity = new_capacity; \
 			dynarray->arr = prefix ## _dynarray_arr_arr__; \
 		} \
-		prefix ## _dynarray_arr(dynarray)[dynarray->size++] = *to_add; \
+		ntype *add_to = &prefix ## _dynarray_arr(dynarray)[dynarray->size++]; \
+		*add_to = *to_add; \
+		return add_to; \
 	} \
 	\
 	_unused_ \
diff --git a/src/knot/Makefile.inc b/src/knot/Makefile.inc
index 9f9400d117..4bc38c2036 100644
--- a/src/knot/Makefile.inc
+++ b/src/knot/Makefile.inc
@@ -152,6 +152,8 @@ libknotd_la_SOURCES = \
 	knot/server/tcp-handler.h		\
 	knot/server/udp-handler.c		\
 	knot/server/udp-handler.h		\
+	knot/server/xdp-handler.c		\
+	knot/server/xdp-handler.h		\
 	knot/updates/acl.c			\
 	knot/updates/acl.h			\
 	knot/updates/apply.c			\
diff --git a/src/knot/include/module.h b/src/knot/include/module.h
index 0e9e2c0926..ed5a06cd62 100644
--- a/src/knot/include/module.h
+++ b/src/knot/include/module.h
@@ -403,7 +403,7 @@ typedef struct {
 	int socket;                            /*!< Current network socket. */
 	unsigned thread_id;                    /*!< Current thread id. */
 	void *server;                          /*!< Server object private item. */
-	struct knot_xdp_msg *xdp_msg;          /*!< Possible XDP message context. */
+	const struct knot_xdp_msg *xdp_msg;    /*!< Possible XDP message context. */
 } knotd_qdata_params_t;
 
 /*! Query processing data context. */
diff --git a/src/knot/server/udp-handler.c b/src/knot/server/udp-handler.c
index 2e85b5ccff..32a64bbe66 100644
--- a/src/knot/server/udp-handler.c
+++ b/src/knot/server/udp-handler.c
@@ -39,6 +39,7 @@
 #include "knot/query/layer.h"
 #include "knot/server/server.h"
 #include "knot/server/udp-handler.h"
+#include "knot/server/xdp-handler.h"
 
 /* Buffer identifiers. */
 enum {
@@ -366,80 +367,31 @@ static udp_api_t udp_recvmmsg_api = {
 #endif /* ENABLE_RECVMMSG */
 
 #ifdef ENABLE_XDP
-struct xdp_recvmmsg {
-	knot_xdp_msg_t msgs_rx[XDP_BATCHLEN];
-	knot_xdp_msg_t msgs_tx[XDP_BATCHLEN];
-	uint32_t rcvd;
-};
 
 static void *xdp_recvmmsg_init(void)
 {
-	struct xdp_recvmmsg *rq = malloc(sizeof(*rq));
-	if (rq != NULL) {
-		memset(rq, 0, sizeof(*rq));
-	}
-	return rq;
+	return xdp_handle_init();
 }
 
 static void xdp_recvmmsg_deinit(void *d)
 {
-	struct xdp_recvmmsg *rq = d;
-	free(rq);
+	xdp_handle_free(d);
 }
 
 static int xdp_recvmmsg_recv(int fd, void *d, void *xdp_sock)
 {
 	UNUSED(fd);
-	struct xdp_recvmmsg *rq = d;
-
-	int ret = knot_xdp_recv(xdp_sock, rq->msgs_rx, XDP_BATCHLEN, &rq->rcvd, NULL);
-
-	return ret == KNOT_EOK ? rq->rcvd : ret;
+	return xdp_handle_recv(d, xdp_sock);
 }
 
 static int xdp_recvmmsg_handle(udp_context_t *ctx, void *d, void *xdp_sock)
 {
-	struct xdp_recvmmsg *rq = d;
-
-	knot_xdp_send_prepare(xdp_sock);
-
-	uint32_t responses = 0;
-	for (uint32_t i = 0; i < rq->rcvd; ++i) {
-		if (rq->msgs_rx[i].payload.iov_len == 0) {
-			continue; // Skip marked (zero length) messages.
-		}
-		int ret = knot_xdp_reply_alloc(xdp_sock, &rq->msgs_rx[i], &rq->msgs_tx[i]);
-		if (ret != KNOT_EOK) {
-			break; // Still free all RX buffers.
-		}
-
-		// udp_pktinfo_handle not needed for XDP as one worker is bound
-		// to one interface only.
-
-		udp_handle(ctx, knot_xdp_socket_fd(xdp_sock),
-		           (struct sockaddr_storage *)&rq->msgs_rx[i].ip_from,
-		           &rq->msgs_rx[i].payload, &rq->msgs_tx[i].payload,
-		           &rq->msgs_rx[i]);
-		responses++;
-	}
-
-	knot_xdp_recv_finish(xdp_sock, rq->msgs_rx, rq->rcvd);
-	rq->rcvd = responses;
-
-	return KNOT_EOK;
+	return xdp_handle_msgs(d, xdp_sock, &ctx->layer, ctx->server, ctx->thread_id);
 }
 
 static int xdp_recvmmsg_send(void *d, void *xdp_sock)
 {
-	struct xdp_recvmmsg *rq = d;
-	uint32_t sent = rq->rcvd;
-
-	int ret = knot_xdp_send(xdp_sock, rq->msgs_tx, sent, &sent);
-	knot_xdp_send_finish(xdp_sock);
-
-	memset(rq, 0, sizeof(*rq));
-
-	return ret == KNOT_EOK ? sent : ret;
+	return xdp_handle_send(d, xdp_sock);
 }
 
 static udp_api_t xdp_recvmmsg_api = {
diff --git a/src/knot/server/xdp-handler.c b/src/knot/server/xdp-handler.c
new file mode 100644
index 0000000000..08fe2df3cc
--- /dev/null
+++ b/src/knot/server/xdp-handler.c
@@ -0,0 +1,214 @@
+/*  Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "knot/server/xdp-handler.h"
+
+#include "contrib/ucw/mempool.h"
+#include "knot/server/server.h"
+#include "libknot/error.h"
+#include "libknot/xdp/tcp.h"
+
+#include <stdlib.h>
+
+#define XDP_BATCHLEN      32 // TODO move/dedup
+
+#ifdef ENABLE_XDP
+
+typedef struct xdp_handle_ctx {
+	knot_xdp_msg_t msg_recv[XDP_BATCHLEN];
+	knot_xdp_msg_t msg_send_udp[XDP_BATCHLEN];
+	tcp_relay_dynarray_t tcp_relays;
+	uint32_t msg_recv_count;
+	uint32_t msg_udp_count;
+	knot_tcp_table_t *tcp_table;
+} xdp_handle_ctx_t;
+
+static bool udp_state_active(int state)
+{
+	return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL);
+}
+
+static bool tcp_active_state(int state)
+{
+	return (state == KNOT_STATE_PRODUCE || state == KNOT_STATE_FAIL);
+}
+
+static bool tcp_send_state(int state)
+{
+	return (state != KNOT_STATE_FAIL && state != KNOT_STATE_NOOP);
+}
+
+void xdp_handle_cleanup(xdp_handle_ctx_t *ctx)
+{
+	ctx->msg_recv_count = 0;
+	ctx->msg_udp_count = 0;
+}
+
+void xdp_handle_free(xdp_handle_ctx_t *ctx)
+{
+	xdp_handle_cleanup(ctx);
+	knot_tcp_table_free(ctx->tcp_table);
+	free(ctx);
+}
+
+xdp_handle_ctx_t *xdp_handle_init(void)
+{
+	xdp_handle_ctx_t *ctx = calloc(1, sizeof(*ctx));
+	if (ctx == NULL) {
+		return NULL;
+	}
+
+	xdp_handle_cleanup(ctx);
+
+	ctx->tcp_table = knot_tcp_table_new(1000); // TODO better parametrize
+	if (ctx->tcp_table == NULL) {
+		xdp_handle_free(ctx);
+		return NULL;
+	}
+
+	return ctx;
+}
+
+int xdp_handle_recv(xdp_handle_ctx_t *ctx, knot_xdp_socket_t *xdp_sock)
+{
+	xdp_handle_cleanup(ctx);
+	int ret = knot_xdp_recv(xdp_sock, ctx->msg_recv, sizeof(ctx->msg_recv) / sizeof(ctx->msg_recv[0]),
+	                        &ctx->msg_recv_count, NULL);
+	return ret == KNOT_EOK ? ctx->msg_recv_count : ret;
+}
+
+static void handle_init(knotd_qdata_params_t *params, knot_layer_t *layer, const knot_xdp_msg_t *msg, const struct iovec *payload)
+{
+	params->remote = (struct sockaddr_storage *)&msg->ip_from;
+	params->xdp_msg = msg;
+	if (!(msg->flags & KNOT_XDP_MSG_TCP)) {
+		params->flags = KNOTD_QUERY_FLAG_NO_AXFR | KNOTD_QUERY_FLAG_NO_IXFR | KNOTD_QUERY_FLAG_LIMIT_SIZE;
+	}
+	knot_layer_begin(layer, params);
+	knot_pkt_t *query = knot_pkt_new(payload->iov_base, payload->iov_len, layer->mm);
+	int ret = knot_pkt_parse(query, 0);
+	if (ret != KNOT_EOK && query->parsed > 0) { // parsing failed (e.g. 2x OPT)
+		query->parsed--; // artificially decreasing "parsed" leads to FORMERR
+	}
+	knot_layer_consume(layer, query);
+}
+
+static void handle_finish(knot_layer_t *layer)
+{
+	knot_layer_finish(layer);
+	mp_flush(layer->mm->ctx);
+}
+
+int xdp_handle_msgs(xdp_handle_ctx_t *ctx, knot_xdp_socket_t *sock,
+                    knot_layer_t *layer, server_t *server, unsigned thread_id)
+{
+	knotd_qdata_params_t params = {
+		.socket = knot_xdp_socket_fd(sock),
+		.server = server,
+		.thread_id = thread_id,
+	};
+
+	if (ctx->msg_recv_count > 0) {
+		knot_xdp_send_prepare(sock);
+	}
+
+	// handle UDP messages
+	for (uint32_t i = 0; i < ctx->msg_recv_count; i++) {
+		knot_xdp_msg_t *msg_recv = &ctx->msg_recv[i];
+		knot_xdp_msg_t *msg_send = &ctx->msg_send_udp[ctx->msg_udp_count];
+
+		if ((msg_recv->flags & KNOT_XDP_MSG_TCP) ||
+		    msg_recv->payload.iov_len == 0) {
+			continue;
+		}
+
+		if (knot_xdp_reply_alloc(sock, msg_recv, msg_send) != KNOT_EOK) {
+			continue; // no point in returning error, where handled?
+		}
+		ctx->msg_udp_count++;
+
+		handle_init(&params, layer, msg_recv, &msg_recv->payload);
+
+		knot_pkt_t *ans = knot_pkt_new(msg_send->payload.iov_base, msg_send->payload.iov_len, layer->mm);
+		while (udp_state_active(layer->state)) {
+			knot_layer_produce(layer, ans);
+		}
+		if (layer->state == KNOT_STATE_DONE) {
+			msg_send->payload.iov_len = ans->size;
+		} else {
+			msg_send->payload.iov_len = 0;
+		}
+
+		handle_finish(layer);
+	}
+
+	// handle TCP messages
+	int ret = knot_xdp_tcp_relay(sock, ctx->msg_recv, ctx->msg_recv_count, ctx->tcp_table, NULL, &ctx->tcp_relays,
+				     NULL); // TODO NULL
+	if (ret == KNOT_EOK && ctx->tcp_relays.size > 0) {
+		uint8_t ans_buf[1024]; // TODO 1024
+
+		for (size_t n_tcp_relays = ctx->tcp_relays.size, rli = 0; rli < n_tcp_relays; rli++) { // dynaaray_foreach can't be used because we insert into the dynarray inside the loop
+			knot_tcp_relay_t *rl = tcp_relay_dynarray_arr(&ctx->tcp_relays) + rli;
+			if ((rl->action & XDP_TCP_DATA) && (rl->answer == 0)) {
+				knot_pkt_t *ans = knot_pkt_new(ans_buf, sizeof(ans_buf), layer->mm);
+				handle_init(&params, layer, rl->msg, &rl->data);
+
+				while (tcp_active_state(layer->state)) {
+					knot_layer_produce(layer, ans);
+
+					knot_tcp_relay_t *clone;
+					if (ans->size > 0 && tcp_send_state(layer->state) &&
+					    (clone = tcp_relay_dynarray_add(&ctx->tcp_relays, rl)) != NULL &&
+					    (clone->data.iov_base = malloc(ans->size)) != NULL) {
+						clone->data.iov_len = ans->size;
+						memcpy(clone->data.iov_base, ans->wire, ans->size);
+						clone->answer = XDP_TCP_ANSWER | XDP_TCP_DATA;
+					}
+				}
+				handle_finish(layer);
+			}
+		}
+	}
+	knot_xdp_recv_finish(sock, ctx->msg_recv, ctx->msg_recv_count);
+
+	return KNOT_EOK;
+}
+
+int xdp_handle_send(xdp_handle_ctx_t *ctx, knot_xdp_socket_t *xdp_sock)
+{
+	uint32_t unused = 0;
+
+	int ret = knot_xdp_send(xdp_sock, ctx->msg_send_udp, ctx->msg_udp_count, &unused);
+	if (ret == KNOT_EOK) {
+		if (ctx->tcp_relays.size > 0) {
+			ret = knot_xdp_tcp_send(xdp_sock, tcp_relay_dynarray_arr(&ctx->tcp_relays), ctx->tcp_relays.size);
+		} else {
+			ret = knot_xdp_send_finish(xdp_sock);
+		}
+	}
+
+	dynarray_foreach(tcp_relay, knot_tcp_relay_t, rl, ctx->tcp_relays) {
+		if (rl->answer == (XDP_TCP_ANSWER | XDP_TCP_DATA)) {
+			free(rl->data.iov_base);
+		}
+	}
+	tcp_relay_dynarray_free(&ctx->tcp_relays);
+
+	return ret;
+}
+
+#endif // ENABLE_XDP
diff --git a/src/knot/server/xdp-handler.h b/src/knot/server/xdp-handler.h
new file mode 100644
index 0000000000..8c1be5f9f2
--- /dev/null
+++ b/src/knot/server/xdp-handler.h
@@ -0,0 +1,60 @@
+/*  Copyright (C) 2021 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef ENABLE_XDP
+
+#include "knot/query/layer.h"
+#include "libknot/xdp/xdp.h"
+
+struct xdp_handle_ctx;
+struct server;
+
+/*!
+ * \brief Initialize XDP packet handling context.
+ */
+struct xdp_handle_ctx *xdp_handle_init(void);
+
+/*!
+ * \brief Deinitialize XDP packet handling context.
+ */
+void xdp_handle_free(struct xdp_handle_ctx *ctx);
+
+/*!
+ * \brief Reset XDP packet handling context.
+ */
+void xdp_handle_cleanup(struct xdp_handle_ctx *ctx);
+
+/*!
+ * \brief Receive packets thru XDP socket.
+ */
+int xdp_handle_recv(struct xdp_handle_ctx *ctx, knot_xdp_socket_t *xdp_sock);
+
+/*!
+ * \brief Answer packets including DNS layers.
+ *
+ * \warning In case of TCP, this also sends some packets, e.g. ACK.
+ */
+int xdp_handle_msgs(struct xdp_handle_ctx *ctx, knot_xdp_socket_t *sock,
+                    knot_layer_t *layer, struct server *server, unsigned thread_id);
+
+/*!
+ * \brief Send packets thru XDP socket.
+ */
+int xdp_handle_send(struct xdp_handle_ctx *ctx, knot_xdp_socket_t *xdp_sock);
+
+#endif // ENABLE_XDP
-- 
GitLab