Skip to content
Snippets Groups Projects
Commit 2c847686 authored by Marek Vavruša's avatar Marek Vavruša Committed by Marek Vavruša
Browse files

daemon/worker: partially implemented multiplexed resolution

the worker now creates a resolution context copy,
and keeps it if the query requires iterative queries.
the worker_exec() is now a reentrant function that gets
called with incoming data until the resolution is done,
and it sends the answer
parent f40fdc87
No related branches found
No related tags found
No related merge requests found
kresolved_SOURCES := \
daemon/layer/query.c \
daemon/io.c \
daemon/network.c \
daemon/engine.c \
......
......@@ -56,7 +56,7 @@ static void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm);
/* Resolve */
int ret = worker_exec(worker, answer, query);
int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query);
if (ret == KNOT_EOK && answer->size > 0) {
udp_send(handle, answer, addr);
}
......@@ -134,7 +134,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm);
/* Resolve */
int ret = worker_exec(worker, answer, query);
int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query);
if (ret == KNOT_EOK && answer->size > 0) {
tcp_send((uv_handle_t *)handle, answer);
}
......@@ -193,3 +193,31 @@ void tcp_unbind(struct endpoint *ep)
tcp_unbind_handle((uv_handle_t *)&ep->tcp);
uv_close((uv_handle_t *)&ep->tcp, NULL);
}
uv_handle_t *io_create(uv_loop_t *loop, int type)
{
uv_handle_t *handle = NULL;
if (type == SOCK_DGRAM) {
handle = (uv_handle_t *)udp_create(loop);
if (handle) {
uv_udp_recv_start((uv_udp_t *)handle, &buf_get, &udp_recv);
}
} else {
handle = (uv_handle_t *)tcp_create(loop);
if (handle) {
uv_read_start((uv_stream_t*)handle, buf_get, tcp_recv);
}
}
return handle;
}
uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect)
{
uv_connect_t* connect = malloc(sizeof(uv_connect_t));
if (uv_tcp_connect(connect, (uv_tcp_t *)handle, addr, on_connect) != 0) {
free(connect);
return NULL;
}
return connect;
}
......@@ -17,9 +17,14 @@
#pragma once
#include <uv.h>
#include <libknot/packet/pkt.h>
struct endpoint;
int udp_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr);
int udp_bind(struct endpoint *ep, struct sockaddr *addr);
void udp_unbind(struct endpoint *ep);
int tcp_bind(struct endpoint *ep, struct sockaddr *addr);
void tcp_unbind(struct endpoint *ep);
\ No newline at end of file
void tcp_unbind(struct endpoint *ep);
uv_handle_t *io_create(uv_loop_t *loop, int type);
uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect);
\ No newline at end of file
/* Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <libknot/errcode.h>
#include "daemon/layer/query.h"
#include "lib/resolve.h"
static int reset(knot_layer_t *ctx)
{
return KNOT_STATE_CONSUME;
}
static int begin(knot_layer_t *ctx, void *module_param)
{
ctx->data = module_param;
return reset(ctx);
}
static int input_query(knot_layer_t *ctx, knot_pkt_t *pkt)
{
assert(pkt && ctx);
/* Check if at least header is parsed. */
if (pkt->parsed < pkt->size) {
return KNOT_STATE_FAIL;
}
/* Accept only queries. */
if (knot_wire_get_qr(pkt->wire)) {
return KNOT_STATE_NOOP; /* Ignore. */
}
/* No authoritative service. */
if (!knot_wire_get_rd(pkt->wire)) {
return KNOT_STATE_FAIL;
}
return KNOT_STATE_PRODUCE;
}
static int output_answer(knot_layer_t *ctx, knot_pkt_t *pkt)
{
assert(pkt && ctx);
/* Prepare for query processing. */
int ret = kr_resolve(ctx->data, pkt,
knot_pkt_qname(pkt),
knot_pkt_qclass(pkt),
knot_pkt_qtype(pkt));
if (ret != KNOT_EOK) {
return KNOT_STATE_FAIL;
}
return KNOT_STATE_DONE;
}
static int output_error(knot_layer_t *ctx, knot_pkt_t *pkt)
{
knot_wire_set_rcode(pkt->wire, KNOT_RCODE_SERVFAIL);
return KNOT_STATE_DONE;
}
/** Module implementation. */
static const knot_layer_api_t LAYER_QUERY_MODULE = {
&begin,
NULL,
&reset,
&input_query,
&output_answer,
&output_error
};
const knot_layer_api_t *layer_query_module(void)
{
return &LAYER_QUERY_MODULE;
}
/* Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "lib/layer.h"
/* Processing module implementation. */
const knot_layer_api_t *layer_query_module(void);
#define LAYER_QUERY layer_query_module()
......@@ -18,38 +18,138 @@
#include <libknot/packet/pkt.h>
#include <libknot/internal/net.h>
#include <libknot/internal/mempool.h>
#include "daemon/worker.h"
#include "daemon/engine.h"
#include "daemon/layer/query.h"
#include "daemon/io.h"
int worker_exec(struct worker_ctx *worker, knot_pkt_t *answer, knot_pkt_t *query)
/** @internal Query resolution task. */
struct qr_task
{
if (worker == NULL) {
return kr_error(EINVAL);
}
struct kr_request req;
knot_pkt_t *pending;
uv_handle_t *handle;
};
static int parse_query(knot_pkt_t *query)
{
/* Parse query packet. */
int ret = knot_pkt_parse(query, 0);
if (ret != KNOT_EOK) {
return kr_error(EPROTO); /* Ignore malformed query. */
}
/* Process query packet. */
knot_layer_t proc;
memset(&proc, 0, sizeof(knot_layer_t));
proc.mm = worker->mm;
knot_layer_begin(&proc, LAYER_QUERY, &worker->engine->resolver);
int state = knot_layer_consume(&proc, query);
/* Check if at least header is parsed. */
if (query->parsed < query->size) {
return kr_error(EMSGSIZE);
}
/* Build an answer. */
if (state & (KNOT_STATE_PRODUCE|KNOT_STATE_FAIL)) {
knot_pkt_init_response(answer, query);
state = knot_layer_produce(&proc, answer);
/* Accept only queries, no authoritative service. */
if (knot_wire_get_qr(query->wire) || !knot_wire_get_rd(query->wire)) {
return kr_error(EINVAL); /* Ignore. */
}
/* Cleanup. */
knot_layer_finish(&proc);
return kr_ok();
}
static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle)
{
mm_ctx_t pool;
mm_ctx_mempool(&pool, MM_DEFAULT_BLKSIZE);
/* Create worker task */
struct engine *engine = worker->engine;
struct qr_task *task = mm_alloc(&pool, sizeof(*task));
if (!task) {
mp_delete(pool.ctx);
return NULL;
}
task->req.pool = pool;
task->handle = handle;
#warning TODO: devise a better scheme to manage answer buffer, it needs copy each time now
/* Create buffers */
knot_pkt_t *pending = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool);
knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, &task->req.pool);
if (!pending || !answer) {
mp_delete(pool.ctx);
return NULL;
}
task->req.answer = answer;
task->pending = pending;
/* Start resolution */
kr_resolve_begin(&task->req, &engine->resolver, answer);
return task;
}
static int qr_task_finalize(struct qr_task *task, knot_pkt_t *dst, int state)
{
knot_pkt_t *answer = task->req.answer;
kr_resolve_finish(&task->req, state);
memcpy(dst->wire, answer->wire, answer->size);
dst->size = answer->size;
#warning TODO: send answer asynchronously
mp_delete(task->req.pool.ctx);
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
}
static void qr_task_on_connect(uv_connect_t *connect, int status)
{
#warning TODO: if not connected, retry
#warning TODO: if connected, send pending query
}
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query)
{
if (!worker) {
return kr_error(EINVAL);
}
/* Parse query */
int ret = parse_query(query);
if (ret != 0) {
return ret;
}
/* Get pending request or start new */
struct qr_task *task = handle->data;
if (!task) {
task = qr_task_create(worker, handle);
if (!task) {
return kr_error(ENOMEM);
}
}
/* Consume input and produce next query */
int proto = 0;
struct sockaddr *addr = NULL;
#warning TODO: it shouldnt be needed to provide NULL answer if I/O fails
int state = kr_resolve_consume(&task->req, query);
while (state == KNOT_STATE_PRODUCE) {
state = kr_resolve_produce(&task->req, &addr, &proto, task->pending);
}
if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
return qr_task_finalize(task, answer, state);
}
/* Create connection for iterative query */
uv_handle_t *next_handle = io_create(handle->loop, proto);
#warning TODO: improve error checking
next_handle->data = task;
if (proto == SOCK_STREAM) {
uv_connect_t *connect = io_connect(next_handle, addr, qr_task_on_connect);
if (!connect) {
#warning TODO: close next_handle
return kr_error(ENOMEM);
}
} else {
/* Fake connection as libuv doesn't support connected UDP */
uv_connect_t fake_connect;
fake_connect.handle = (uv_stream_t *)next_handle;
qr_task_on_connect(&fake_connect, 0);
}
return kr_ok();
}
......@@ -33,8 +33,9 @@ struct worker_ctx {
* Resolve query.
*
* @param worker
* @param handle
* @param answer
* @param query
* @return 0, error code
*/
int worker_exec(struct worker_ctx *worker, knot_pkt_t *answer, knot_pkt_t *query);
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query);
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment