From 9773a3831258700e739012268a1a3e8947592ee9 Mon Sep 17 00:00:00 2001
From: Marek Vavrusa <marek.vavrusa@nic.cz>
Date: Thu, 21 Nov 2013 16:55:26 +0100
Subject: [PATCH] New packet API now in both UDP and TCP handlers.

(just for testing, will be refactored)
---
 src/knot/server/tcp-handler.c | 206 +++++++++-------------------------
 src/knot/server/udp-handler.c | 203 +++++++++------------------------
 2 files changed, 106 insertions(+), 303 deletions(-)

diff --git a/src/knot/server/tcp-handler.c b/src/knot/server/tcp-handler.c
index e76fce0e2d..8ec350a1bb 100644
--- a/src/knot/server/tcp-handler.c
+++ b/src/knot/server/tcp-handler.c
@@ -34,6 +34,7 @@
 
 #include "common/sockaddr.h"
 #include "common/fdset.h"
+#include "common/mempool.h"
 #include "knot/knot.h"
 #include "knot/server/tcp-handler.h"
 #include "knot/server/xfr-handler.h"
@@ -41,6 +42,7 @@
 #include "libknot/nameserver/name-server.h"
 #include "libknot/packet/wire.h"
 #include "libknot/dnssec/cleanup.h"
+#include "libknot/nameserver/ns_proc_query.h"
 
 /*! \brief TCP worker data. */
 typedef struct tcp_worker_t {
@@ -68,27 +70,6 @@ static inline int tcp_throttle() {
 	return (rand() % TCP_THROTTLE_HI) + TCP_THROTTLE_LO;
 }
 
-/*! \brief Send reply. */
-static int tcp_reply(int fd, uint8_t *qbuf, size_t resp_len)
-{
-	dbg_net("tcp: got answer of size %zd.\n",
-		resp_len);
-
-	int res = 0;
-	if (resp_len > 0) {
-		res = tcp_send(fd, qbuf, resp_len);
-	}
-
-	/* Check result. */
-	if (res < 0 || (size_t)res != resp_len) {
-		dbg_net("tcp: %s: failed: %d - %d.\n",
-			  "socket_send()",
-			  res, errno);
-	}
-
-	return res;
-}
-
 /*! \brief Sweep TCP connection. */
 static enum fdset_sweep_state tcp_sweep(fdset_t *set, int i, void *data)
 {
@@ -139,27 +120,18 @@ static enum fdset_sweep_state tcp_sweep(fdset_t *set, int i, void *data)
  *       and ensure that in case of good packet the response
  *       is proper.
  */
-static int tcp_handle(tcp_worker_t *w, int fd, uint8_t *buf[], size_t qbuf_maxlen)
+static int tcp_handle(ns_proc_context_t *query_ctx, int fd,
+                      struct iovec *rx, struct iovec *tx)
 {
-	if (fd < 0 || !w || !w->ioh) {
-		dbg_net("tcp: tcp_handle(%p, %d) - invalid parameters\n", w, fd);
-		return KNOT_EINVAL;
-	}
-
-	dbg_net("tcp: handling TCP event on fd=%d in thread %p.\n",
-	        fd, (void*)pthread_self());
-
-	knot_nameserver_t *ns = w->ioh->server->nameserver;
-
 	/* Check address type. */
 	sockaddr_t addr;
 	sockaddr_prep(&addr);
 
 	/* Receive data. */
-	int n = tcp_recv(fd, buf[QBUF], qbuf_maxlen, &addr);
-	if (n <= 0) {
+	int ret = tcp_recv(fd, rx->iov_base, rx->iov_len, &addr);
+	if (ret <= 0) {
 		dbg_net("tcp: client on fd=%d disconnected\n", fd);
-		if (n == KNOT_EAGAIN) {
+		if (ret == KNOT_EAGAIN) {
 			char r_addr[SOCKADDR_STRLEN];
 			sockaddr_tostr(&addr, r_addr, sizeof(r_addr));
 			int r_port = sockaddr_portnum(&addr);
@@ -170,120 +142,32 @@ static int tcp_handle(tcp_worker_t *w, int fd, uint8_t *buf[], size_t qbuf_maxle
 			rcu_read_unlock();
 		}
 		return KNOT_ECONNREFUSED;
+	} else {
+		rx->iov_len = ret;
 	}
 
-	/* Parse query. */
-	size_t resp_len = qbuf_maxlen; // 64K
-	knot_packet_type_t qtype = KNOT_QUERY_NORMAL;
-	knot_pkt_t *packet = knot_pkt_new(buf[QBUF], n, NULL);
-	if (packet == NULL) {
-		int ret = knot_ns_error_response_from_query_wire(ns, buf[QBUF], n,
-		                                            KNOT_RCODE_SERVFAIL,
-		                                            buf[QRBUF], &resp_len);
-
-		if (ret == KNOT_EOK) {
-			tcp_reply(fd, buf[QRBUF], resp_len);
-		}
+	/* Reset context. */
+	uint16_t tx_len = tx->iov_len;
+	ns_proc_reset(query_ctx);
 
-		return KNOT_EOK;
-	}
+	/* Input packet. */
+	int state = ns_proc_in(rx->iov_base, rx->iov_len, query_ctx);
 
-	int parse_res = knot_ns_parse_packet(packet, &qtype);
-	if (knot_unlikely(parse_res != KNOT_EOK)) {
-		if (parse_res > 0) { /* Returned RCODE */
-			int ret = knot_ns_error_response_from_query(ns, packet,
-			                          parse_res, buf[QRBUF], &resp_len);
+	/* Resolve until NOOP or finished. */
+	while (state == NS_PROC_FULL || state == NS_PROC_FAIL) {
+		state = ns_proc_out(tx->iov_base, &tx_len, query_ctx);
 
-			if (ret == KNOT_EOK) {
-				tcp_reply(fd, buf[QRBUF], resp_len);
+		/* If it has response, send it. */
+		if (state == NS_PROC_FINISH || state == NS_PROC_FULL) {
+			if (tcp_send(fd, tx->iov_base, tx_len) != tx_len) {
+				ret = KNOT_ECONN;
+				break;
 			}
+			tx_len = tx->iov_len; /* Reset size. */
 		}
-		knot_pkt_free(&packet);
-		return KNOT_EOK;
 	}
 
-	/* Handle query. */
-	int xfrt = -1;
-	knot_ns_xfr_t *xfr = NULL;
-	int res = KNOT_ERROR;
-	switch(qtype) {
-
-	/* Query types. */
-	case KNOT_QUERY_NORMAL:
-		//res = knot_ns_answer_normal(ns, packet, qbuf, &resp_len);
-		if (zones_normal_query_answer(ns, packet, &addr,
-		                              buf[QRBUF], &resp_len,
-		                              NS_TRANSPORT_TCP) == KNOT_EOK) {
-			res = KNOT_EOK;
-		}
-		break;
-	case KNOT_QUERY_AXFR:
-	case KNOT_QUERY_IXFR:
-		if (qtype == KNOT_QUERY_IXFR) {
-			xfrt = XFR_TYPE_IOUT;
-		} else {
-			xfrt = XFR_TYPE_AOUT;
-		}
-
-		/* Answer from query. */
-		xfr = xfr_task_create(NULL, xfrt, XFR_FLAG_TCP);
-		if (xfr == NULL) {
-			knot_ns_error_response_from_query(ns, packet,
-			                                  KNOT_RCODE_SERVFAIL,
-			                                  buf[QRBUF], &resp_len);
-			res = KNOT_EOK;
-			break;
-		}
-		xfr->session = fd;
-		xfr->wire = buf[QRBUF];
-		xfr->wire_size = qbuf_maxlen;
-		xfr->query = packet;
-		xfr_task_setaddr(xfr, &addr, NULL);
-		res = xfr_answer(ns, xfr);
-		knot_pkt_free(&packet);
-		return res;
-
-	case KNOT_QUERY_UPDATE:
-		res = zones_process_update(ns, packet, &addr, buf[QRBUF], &resp_len,
-		                           fd, NS_TRANSPORT_TCP);
-		break;
-
-	case KNOT_QUERY_NOTIFY:
-		res = notify_process_request(ns, packet, &addr,
-					     buf[QRBUF], &resp_len);
-		break;
-
-	/* Unhandled opcodes. */
-	case KNOT_RESPONSE_NOTIFY: /*!< Only in UDP. */
-	case KNOT_RESPONSE_NORMAL: /*!< TCP handler doesn't send queries. */
-	case KNOT_RESPONSE_AXFR:   /*!< Processed in XFR handler. */
-	case KNOT_RESPONSE_IXFR:   /*!< Processed in XFR handler. */
-		knot_ns_error_response_from_query(ns, packet,
-		                                  KNOT_RCODE_REFUSED,
-		                                  buf[QRBUF], &resp_len);
-		res = KNOT_EOK;
-		break;
-
-	/* Unknown opcodes. */
-	default:
-		knot_ns_error_response_from_query(ns, packet,
-		                                  KNOT_RCODE_FORMERR,
-		                                  buf[QRBUF], &resp_len);
-		res = KNOT_EOK;
-		break;
-	}
-
-	/* Send answer. */
-	if (res == KNOT_EOK) {
-		tcp_reply(fd, buf[QRBUF], resp_len);
-	} else {
-		dbg_net("tcp: failed to respond to query type=%d on fd=%d - %s\n",
-		        qtype, fd, knot_strerror(res));;
-	}
-
-	knot_pkt_free(&packet);
-
-	return res;
+	return ret;
 }
 
 int tcp_accept(int fd)
@@ -578,19 +462,32 @@ int tcp_loop_worker(dthread_t *thread)
 	}
 #endif /* HAVE_CAP_NG_H */
 
-	uint8_t *buf[NBUFS];
-	for (unsigned i = 0; i < NBUFS; ++i) {
-		buf[i] = malloc(SOCKET_MTU_SZ);
-	}
-
+	/* Create TCP answering context. */
 	tcp_worker_t *w = thread->data;
-	if (w == NULL || buf[QBUF] == NULL || buf[QRBUF] == NULL) {
-		for (unsigned i = 0; i < NBUFS; ++i) {
-			free(buf[i]);
+	int ret = KNOT_EOK;
+	ns_proc_context_t query_ctx;
+	memset(&query_ctx, 0, sizeof(query_ctx));
+	query_ctx.ns = w->ioh->server->nameserver;
+	mm_ctx_mempool(&query_ctx.mm, 2 * sizeof(knot_pkt_t));
+
+	/* Packet size not limited by EDNS. */
+	query_ctx.flags |= NS_PKTSIZE_NOLIMIT;
+
+	/* Create iovec abstraction. */
+	mm_ctx_t *mm = &query_ctx.mm;
+	struct iovec bufs[2];
+	for (unsigned i = 0; i < 2; ++i) {
+		bufs[i].iov_len = KNOT_WIRE_MAX_PKTSIZE;
+		bufs[i].iov_base = mm->alloc(mm->ctx, bufs[i].iov_len);
+		if (bufs[i].iov_base == NULL) {
+			ret = KNOT_ENOMEM;
+			goto finish;
 		}
-		return KNOT_EINVAL;
 	}
 
+	/* Create query processing context. */
+	ns_proc_begin(&query_ctx, NS_PROC_QUERY);
+
 	/* Accept clients. */
 	dbg_net("tcp: worker %p started\n", w);
 	fdset_t *set = &w->set;
@@ -637,7 +534,7 @@ int tcp_loop_worker(dthread_t *thread)
 			if (fd == w->pipe[0]) {
 				tcp_loop_assign(fd, set);
 			} else {
-				int ret = tcp_handle(w, fd, buf, SOCKET_MTU_SZ);
+				int ret = tcp_handle(&query_ctx, fd, &bufs[0], &bufs[1]);
 				if (ret == KNOT_EOK) {
 					/* Update socket activity timer. */
 					fdset_set_watchdog(set, i, max_idle);
@@ -664,12 +561,9 @@ int tcp_loop_worker(dthread_t *thread)
 		}
 	}
 
-	/* Stop whole unit. */
-	for (unsigned i = 0; i < NBUFS; ++i) {
-		free(buf[i]);
-	}
-	dbg_net("tcp: worker %p finished\n", w);
-	return KNOT_EOK;
+finish:
+	mp_delete(mm->ctx);
+	return ret;
 }
 
 int tcp_handler_destruct(dthread_t *thread)
diff --git a/src/knot/server/udp-handler.c b/src/knot/server/udp-handler.c
index 9bf5536d18..8ab9b9ddcc 100644
--- a/src/knot/server/udp-handler.c
+++ b/src/knot/server/udp-handler.c
@@ -105,15 +105,6 @@ static inline void udp_pps_begin() {}
 static inline void udp_pps_sample(unsigned n, unsigned thr_id) {}
 #endif
 
-
-/* Answering context. */
-struct answer_ctx
-{
-	server_t *srv;
-	mm_ctx_t *mm;
-	unsigned slip;
-};
-
 /*! \brief RRL reject procedure. */
 static size_t udp_rrl_reject(const knot_nameserver_t *ns,
                              const knot_pkt_t *packet,
@@ -141,7 +132,7 @@ static size_t udp_rrl_reject(const knot_nameserver_t *ns,
 	return 0; /* Discard response. */
 }
 
-int udp_handle(struct answer_ctx *ans, int fd, sockaddr_t *addr,
+int udp_handle(ns_proc_context_t *query_ctx, int fd, sockaddr_t *addr,
                struct iovec *rx, struct iovec *tx)
 {
 #ifdef DEBUG_ENABLE_BRIEF
@@ -152,138 +143,51 @@ int udp_handle(struct answer_ctx *ans, int fd, sockaddr_t *addr,
 	        strfrom, sockaddr_portnum(addr));
 #endif
 
-#ifdef PACKET_NG
-	ns_proc_context_t query_ctx = {0};
-	memcpy(&query_ctx.mm, ans->mm, sizeof(mm_ctx_t));
-	query_ctx.ns = ans->srv->nameserver;
-
+	/* Reset context. */
 	uint16_t tx_len = tx->iov_len;
-	ns_proc_begin(&query_ctx, NS_PROC_QUERY);
-	int state = ns_proc_in(rx->iov_base, rx->iov_len, &query_ctx);
-	if (state == NS_PROC_FULL) {
-		state = ns_proc_out(tx->iov_base, &tx_len, &query_ctx);
-	}
-	if (state == NS_PROC_FAIL) {
-		state = ns_proc_out(tx->iov_base, &tx_len, &query_ctx);
-	}
-	if (state == NS_PROC_FINISH) {
-		tx->iov_len = tx_len;
-	}
-
-	/*! \todo IXFR query */
-	/*! \todo NOTIFY query */
-	/*! \todo UPDATE */
+	ns_proc_reset(query_ctx);
 
-	ns_proc_finish(&query_ctx);
-	return KNOT_EOK;
-#else
-
-	int res = KNOT_EOK;
-	int rcode = KNOT_RCODE_NOERROR;
-	knot_nameserver_t *ns = ans->srv->nameserver;
-	rrl_table_t *rrl = ans->srv->rrl;
-	knot_packet_type_t qtype = KNOT_QUERY_INVALID;
-
-	/* The packet MUST contain at least DNS header.
-	 * If it doesn't, it's not a DNS packet and we should discard it.
-	 */
-	if (rx->iov_len < KNOT_WIRE_HEADER_SIZE) {
-		return KNOT_EFEWDATA;
-	}
+	/* Input packet. */
+	int state = ns_proc_in(rx->iov_base, rx->iov_len, query_ctx);
 
-#ifdef MIRROR_MODE
-	memcpy(tx->iov_base, rx->iov_base, rx->iov_len);
-	knot_wire_set_qr(tx->iov_base);
-	tx->iov_len = rx->iov_len;
-	return KNOT_EOK;
-#endif
-
-	knot_pkt_t *query = knot_pkt_new(rx->iov_base, rx->iov_len, ans->mm);
-	if (query == NULL) {
-		dbg_net("udp: failed to create packet\n");
-		int ret = knot_ns_error_response_from_query_wire(ns, rx->iov_base, rx->iov_len,
-		                                            KNOT_RCODE_SERVFAIL,
-		                                            tx->iov_base, &tx->iov_len);
-		return ret;
+	/* Process answer. */
+	if (state == NS_PROC_FULL) {
+		state = ns_proc_out(tx->iov_base, &tx_len, query_ctx);
 	}
 
-	/* Parse query. */
-	rcode = knot_ns_parse_packet(query, &qtype);
-	if (rcode < KNOT_RCODE_NOERROR) {
-		dbg_net("udp: failed to parse packet\n");
-		rcode = KNOT_RCODE_SERVFAIL;
+	/* Process error response (if failed). */
+	if (state == NS_PROC_FAIL) {
+		state = ns_proc_out(tx->iov_base, &tx_len, query_ctx);
 	}
 
-	/* Handle query. */
-	switch(qtype) {
-	case KNOT_QUERY_NORMAL:
-		res = zones_normal_query_answer(ns, query, addr, tx->iov_base,
-		                                &tx->iov_len, NS_TRANSPORT_UDP);
-		break;
-	case KNOT_QUERY_AXFR:
-		/* RFC1034, p.28 requires reliable transfer protocol.
-		 * Bind responds with FORMERR.
-		 */
-		res = knot_ns_error_response_from_query(ns, query,
-		                                        KNOT_RCODE_FORMERR, tx->iov_base,
-		                                        &tx->iov_len);
-		break;
-	case KNOT_QUERY_IXFR:
-		/* According to RFC1035, respond with SOA. */
-		res = zones_normal_query_answer(ns, query, addr,
-		                                tx->iov_base, &tx->iov_len,
-		                                NS_TRANSPORT_UDP);
-		break;
-	case KNOT_QUERY_NOTIFY:
-		res = notify_process_request(ns, query, addr,
-		                             tx->iov_base, &tx->iov_len);
-		break;
-
-	case KNOT_QUERY_UPDATE:
-		res = zones_process_update(ns, query, addr, tx->iov_base, &tx->iov_len,
-		                           fd, NS_TRANSPORT_UDP);
-		break;
-
-	/* Do not issue response to incoming response to avoid loops. */
-	case KNOT_RESPONSE_AXFR: /*!< Processed in XFR handler. */
-	case KNOT_RESPONSE_IXFR: /*!< Processed in XFR handler. */
-	case KNOT_RESPONSE_NORMAL:
-	case KNOT_RESPONSE_NOTIFY:
-	case KNOT_RESPONSE_UPDATE:
-		res = KNOT_EOK;
+	/* Send response only if finished successfuly. */
+	if (state == NS_PROC_FINISH) {
+		tx->iov_len = tx_len;
+	} else {
 		tx->iov_len = 0;
-		break;
-	/* Unknown opcodes */
-	default:
-		res = knot_ns_error_response_from_query(ns, query,
-		                                        rcode, tx->iov_base,
-		                                        &tx->iov_len);
-		break;
-	}
-
-	/* Process RRL. */
-	if (knot_unlikely(rrl != NULL) && rrl->rate > 0) {
-		rrl_req_t rrl_rq;
-		memset(&rrl_rq, 0, sizeof(rrl_req_t));
-		rrl_rq.w = tx->iov_base; /* Wire */
-		rrl_rq.query = query;
-
-		rcu_read_lock();
-		rrl_rq.flags = query->flags;
-		if (rrl_query(rrl, addr, &rrl_rq, query->zone) != KNOT_EOK) {
-			tx->iov_len = udp_rrl_reject(ns, query, tx->iov_base,
-			                           	KNOT_WIRE_MAX_PKTSIZE,
-			                           	knot_wire_get_rcode(query->wire),
-			                           	&ans->slip);
-		}
-		rcu_read_unlock();
 	}
 
+	return KNOT_EOK;
 
-	knot_pkt_free(&query);
-
-	return res;
-#endif /* PACKET_NG */
+	/*! \todo Move RRL to IN processing. */
+//	rrl_table_t *rrl = ans->srv->rrl;
+//	/* Process RRL. */
+//	if (knot_unlikely(rrl != NULL) && rrl->rate > 0) {
+//		rrl_req_t rrl_rq;
+//		memset(&rrl_rq, 0, sizeof(rrl_req_t));
+//		rrl_rq.w = tx->iov_base; /* Wire */
+//		rrl_rq.query = query;
+
+//		rcu_read_lock();
+//		rrl_rq.flags = query->flags;
+//		if (rrl_query(rrl, addr, &rrl_rq, query->zone) != KNOT_EOK) {
+//			tx->iov_len = udp_rrl_reject(ns, query, tx->iov_base,
+//			                           	KNOT_WIRE_MAX_PKTSIZE,
+//			                           	knot_wire_get_rcode(query->wire),
+//			                           	&ans->slip);
+//		}
+//		rcu_read_unlock();
+//	}
 }
 
 /* Check for sendmmsg syscall. */
@@ -299,7 +203,7 @@ int udp_handle(struct answer_ctx *ans, int fd, sockaddr_t *addr,
 static void* (*_udp_init)(void) = 0;
 static int (*_udp_deinit)(void *) = 0;
 static int (*_udp_recv)(int, void *) = 0;
-static int (*_udp_handle)(struct answer_ctx *, void *) = 0;
+static int (*_udp_handle)(ns_proc_context_t *, void *) = 0;
 static int (*_udp_send)(void *) = 0;
 
 /* UDP recvfrom() request struct. */
@@ -354,7 +258,7 @@ static int udp_recvfrom_recv(int fd, void *d)
 	return 0;
 }
 
-static int udp_recvfrom_handle(struct answer_ctx *ans, void *d)
+static int udp_recvfrom_handle(ns_proc_context_t *ctx, void *d)
 {
 	struct udp_recvfrom *rq = (struct udp_recvfrom *)d;
 
@@ -364,7 +268,7 @@ static int udp_recvfrom_handle(struct answer_ctx *ans, void *d)
 	rq->iov[TX].iov_len = KNOT_WIRE_MAX_PKTSIZE;
 
 	/* Process received pkt. */
-	int ret = udp_handle(ans, rq->fd, &rq->addr, &rq->iov[RX], &rq->iov[TX]);
+	int ret = udp_handle(ctx, rq->fd, &rq->addr, &rq->iov[RX], &rq->iov[TX]);
 	if (ret != KNOT_EOK) {
 		rq->iov[TX].iov_len = 0;
 	}
@@ -505,7 +409,7 @@ static int udp_recvmmsg_recv(int fd, void *d)
 	return n;
 }
 
-static int udp_recvmmsg_handle(struct answer_ctx *st, void *d)
+static int udp_recvmmsg_handle(ns_proc_context_t *ctx, void *d)
 {
 	struct udp_recvmmsg *rq = (struct udp_recvmmsg *)d;
 
@@ -517,7 +421,7 @@ static int udp_recvmmsg_handle(struct answer_ctx *st, void *d)
 		rx->iov_len = rq->msgs[RX][i].msg_len; /* Received bytes. */
 		rq->addrs[i].len = rq->msgs[RX][i].msg_hdr.msg_namelen;
 
-		ret = udp_handle(st, rq->fd, rq->addrs + i, rx, tx);
+		ret = udp_handle(ctx, rq->fd, rq->addrs + i, rx, tx);
 		if (ret != KNOT_EOK) { /* Do not send. */
 			tx->iov_len = 0;
 		}
@@ -593,15 +497,18 @@ int udp_reader(iohandler_t *h, dthread_t *thread)
 	void *rq = _udp_init();
 	ifacelist_t *ref = NULL;
 
-	/* Create memory pool context. */
-	mm_ctx_t mm;
-	mm_ctx_mempool(&mm, 2 * sizeof(knot_pkt_t));
-
 	/* Create UDP answering context. */
-	struct answer_ctx ans_ctx;
-	ans_ctx.srv = h->server;
-	ans_ctx.slip = 0;
-	ans_ctx.mm = &mm;
+	ns_proc_context_t query_ctx;
+	memset(&query_ctx, 0, sizeof(query_ctx));
+	query_ctx.ns = h->server->nameserver;
+	mm_ctx_mempool(&query_ctx.mm, 2 * sizeof(knot_pkt_t));
+
+	/* Disable transfers over UDP. */
+	query_ctx.flags |= NS_QUERY_NO_AXFR;
+	query_ctx.flags |= NS_QUERY_NO_IXFR;
+
+	/* Create query processing context. */
+	ns_proc_begin(&query_ctx, NS_PROC_QUERY);
 
 	/* Chose select as epoll/kqueue has larger overhead for a
 	 * single or handful of sockets. */
@@ -654,18 +561,20 @@ int udp_reader(iohandler_t *h, dthread_t *thread)
 		for (unsigned fd = minfd; fd <= maxfd; ++fd) {
 			if (FD_ISSET(fd, &rfds)) {
 				while ((rcvd = _udp_recv(fd, rq)) > 0) {
-					_udp_handle(&ans_ctx, rq);
+					_udp_handle(&query_ctx, rq);
 					_udp_send(rq);
-					mp_flush(mm.ctx);
 					udp_pps_sample(rcvd, thr_id);
 				}
 			}
 		}
 	}
 
+	/* Close query processing context. */
+	ns_proc_finish(&query_ctx);
+
 	_udp_deinit(rq);
 	ref_release((ref_t *)ref);
-	mp_delete(mm.ctx);
+	mp_delete(query_ctx.mm.ctx);
 	return KNOT_EOK;
 }
 
-- 
GitLab