Commit e5665f62 authored by Marek Vavruša's avatar Marek Vavruša
Browse files

Merge branch 'resolve-split-api'

parents 9578293b 1bb9ec0e
......@@ -15,6 +15,7 @@
*.6
*.log
*.inc
*.mdb
.dirstamp
.libs
.deps
......@@ -46,3 +47,4 @@ tmp*
/tests/test_pack
/tests/test_set
/tests/test_utils
/tests/test_zonecut
kresolved_SOURCES := \
daemon/layer/query.c \
daemon/io.c \
daemon/network.c \
daemon/engine.c \
......
......@@ -21,133 +21,99 @@
#include "daemon/network.h"
#include "daemon/worker.h"
#define ENDPOINT_BUFSIZE 512 /**< This is an artificial limit for DNS query. */
static void *buf_alloc(void)
static void *handle_alloc(uv_loop_t *loop, size_t size)
{
struct endpoint_data *data = malloc(sizeof(*data) + ENDPOINT_BUFSIZE);
if (data == NULL) {
return NULL;
uv_handle_t *handle = malloc(size);
if (handle) {
memset(handle, 0, size);
}
data->buf = uv_buf_init((char *)data + sizeof(*data), ENDPOINT_BUFSIZE);
return data;
}
static void buf_get(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
struct endpoint_data *data = handle->data;
*buf = data->buf;
return handle;
}
static void buf_free(uv_handle_t* handle)
static void handle_free(uv_handle_t *handle)
{
free(handle->data);
free(handle);
}
static void udp_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr)
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
uv_buf_t sendbuf = uv_buf_init((char *)answer->wire, answer->size);
uv_udp_try_send(handle, &sendbuf, 1, addr);
/* Worker has single buffer which is reused for all incoming
* datagrams / stream reads, the content of the buffer is
* guaranteed to be unchanged only for the duration of
* udp_read() and tcp_read().
*/
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
buf->base = (char *)worker->bufs.wire;
buf->len = sizeof(worker->bufs.wire);
}
static void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags)
{
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
/* Check the incoming wire length. */
if (nread < KNOT_WIRE_HEADER_SIZE) {
return;
/* UDP requests are oneshot, always close afterwards */
if (handle->data && !uv_is_closing((uv_handle_t *)handle)) { /* Do not free master socket */
uv_close((uv_handle_t *)handle, handle_free);
}
/* Create packets */
knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm);
/* Resolve */
int ret = worker_exec(worker, answer, query);
if (ret == KNOT_EOK && answer->size > 0) {
udp_send(handle, answer, addr);
/* Check the incoming wire length. */
if (nread > KNOT_WIRE_HEADER_SIZE) {
knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
worker_exec(worker, (uv_handle_t *)handle, query, addr);
knot_pkt_free(&query);
}
/* Cleanup */
knot_pkt_free(&query);
knot_pkt_free(&answer);
}
int udp_bind(struct endpoint *ep, struct sockaddr *addr)
{
uv_udp_t *handle = &ep->udp;
int ret = uv_udp_bind(handle, addr, 0);
unsigned flags = UV_UDP_REUSEADDR;
if (addr->sa_family == AF_INET6) {
flags |= UV_UDP_IPV6ONLY;
}
int ret = uv_udp_bind(handle, addr, flags);
if (ret != 0) {
return ret;
}
handle->data = buf_alloc();
if (handle->data == NULL) {
udp_unbind(ep);
return kr_error(ENOMEM);
}
uv_udp_recv_start(handle, &buf_get, &udp_recv);
return 0;
handle->data = NULL;
return io_start_read((uv_handle_t *)handle);
}
void udp_unbind(struct endpoint *ep)
{
uv_udp_t *handle = &ep->udp;
uv_udp_recv_stop(handle);
buf_free((uv_handle_t *)handle);
uv_close((uv_handle_t *)handle, NULL);
}
static void tcp_unbind_handle(uv_handle_t *handle)
{
uv_read_stop((uv_stream_t *)handle);
buf_free(handle);
}
static void tcp_send(uv_handle_t *handle, const knot_pkt_t *answer)
{
uint16_t pkt_size = htons(answer->size);
uv_buf_t buf[2];
buf[0].base = (char *)&pkt_size;
buf[0].len = sizeof(pkt_size);
buf[1].base = (char *)answer->wire;
buf[1].len = answer->size;
uv_try_write((uv_stream_t *)handle, buf, 2);
}
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
/* Check the incoming wire length (malformed, EOF or error). */
if (nread < (ssize_t) sizeof(uint16_t)) {
tcp_unbind_handle((uv_handle_t *)handle);
uv_close((uv_handle_t *)handle, (uv_close_cb) free);
/* Check for connection close */
if (nread <= 0) {
uv_close((uv_handle_t *)handle, handle_free);
return;
} else if (nread < 2) {
/* Not enough bytes to read length */
return;
}
/* Set packet size */
nread = wire_read_u16((const uint8_t *)buf->base);
/* Create packets */
knot_pkt_t *query = knot_pkt_new(buf->base + sizeof(uint16_t), nread, worker->mm);
knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm);
/** @todo This is not going to work if the packet is fragmented in the stream ! */
uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base);
/* Resolve */
int ret = worker_exec(worker, answer, query);
if (ret == KNOT_EOK && answer->size > 0) {
tcp_send((uv_handle_t *)handle, answer);
/* Check if there's enough data and execute */
if (nbytes + 2 < nread) {
return;
}
/* Cleanup */
knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, worker->mm);
worker_exec(worker, (uv_handle_t *)handle, query, NULL);
knot_pkt_free(&query);
knot_pkt_free(&answer);
}
static void tcp_accept(uv_stream_t *master, int status)
......@@ -156,31 +122,23 @@ static void tcp_accept(uv_stream_t *master, int status)
return;
}
uv_tcp_t *client = malloc(sizeof(uv_tcp_t));
if (client == NULL) {
return;
}
uv_tcp_init(master->loop, client);
client->data = buf_alloc();
if (client->data == NULL) {
free(client);
return;
}
if (uv_accept(master, (uv_stream_t*)client) != 0) {
buf_free((uv_handle_t *)client);
free(client);
uv_stream_t *client = (uv_stream_t *)io_create(master->loop, SOCK_STREAM);
if (!client || uv_accept(master, client) != 0) {
handle_free((uv_handle_t *)client);
return;
}
uv_read_start((uv_stream_t*)client, buf_get, tcp_recv);
io_start_read((uv_handle_t *)client);
}
int tcp_bind(struct endpoint *ep, struct sockaddr *addr)
{
uv_tcp_t *handle = &ep->tcp;
int ret = uv_tcp_bind(handle, addr, 0);
unsigned flags = UV_UDP_REUSEADDR;
if (addr->sa_family == AF_INET6) {
flags |= UV_UDP_IPV6ONLY;
}
int ret = uv_tcp_bind(handle, addr, flags);
if (ret != 0) {
return ret;
}
......@@ -191,17 +149,46 @@ int tcp_bind(struct endpoint *ep, struct sockaddr *addr)
return ret;
}
handle->data = buf_alloc();
if (handle->data == NULL) {
tcp_unbind(ep);
return kr_error(ENOMEM);
}
handle->data = NULL;
return 0;
}
void tcp_unbind(struct endpoint *ep)
{
tcp_unbind_handle((uv_handle_t *)&ep->tcp);
uv_close((uv_handle_t *)&ep->tcp, NULL);
}
uv_handle_t *io_create(uv_loop_t *loop, int type)
{
if (type == SOCK_DGRAM) {
uv_udp_t *handle = handle_alloc(loop, sizeof(*handle));
if (handle) {
uv_udp_init(loop, handle);
}
return (uv_handle_t *)handle;
} else {
uv_tcp_t *handle = handle_alloc(loop, sizeof(*handle));
if (handle) {
uv_tcp_init(loop, handle);
}
return (uv_handle_t *)handle;
}
}
int io_start_read(uv_handle_t *handle)
{
if (handle->type == UV_UDP) {
return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
} else {
return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
}
}
int io_stop_read(uv_handle_t *handle)
{
if (handle->type == UV_UDP) {
return uv_udp_recv_stop((uv_udp_t *)handle);
} else {
return uv_read_stop((uv_stream_t *)handle);
}
}
\ No newline at end of file
......@@ -17,9 +17,13 @@
#pragma once
#include <uv.h>
struct endpoint;
#include <libknot/packet/pkt.h>
struct endpoint;
int udp_bind(struct endpoint *ep, struct sockaddr *addr);
void udp_unbind(struct endpoint *ep);
int tcp_bind(struct endpoint *ep, struct sockaddr *addr);
void tcp_unbind(struct endpoint *ep);
\ No newline at end of file
void tcp_unbind(struct endpoint *ep);
uv_handle_t *io_create(uv_loop_t *loop, int type);
int io_start_read(uv_handle_t *handle);
int io_stop_read(uv_handle_t *handle);
\ No newline at end of file
/* Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <libknot/errcode.h>
#include "daemon/layer/query.h"
#include "lib/resolve.h"
static int reset(knot_layer_t *ctx)
{
return KNOT_STATE_CONSUME;
}
static int begin(knot_layer_t *ctx, void *module_param)
{
ctx->data = module_param;
return reset(ctx);
}
static int input_query(knot_layer_t *ctx, knot_pkt_t *pkt)
{
assert(pkt && ctx);
/* Check if at least header is parsed. */
if (pkt->parsed < pkt->size) {
return KNOT_STATE_FAIL;
}
/* Accept only queries. */
if (knot_wire_get_qr(pkt->wire)) {
return KNOT_STATE_NOOP; /* Ignore. */
}
/* No authoritative service. */
if (!knot_wire_get_rd(pkt->wire)) {
return KNOT_STATE_FAIL;
}
return KNOT_STATE_PRODUCE;
}
static int output_answer(knot_layer_t *ctx, knot_pkt_t *pkt)
{
assert(pkt && ctx);
/* Prepare for query processing. */
int ret = kr_resolve(ctx->data, pkt,
knot_pkt_qname(pkt),
knot_pkt_qclass(pkt),
knot_pkt_qtype(pkt));
if (ret != KNOT_EOK) {
return KNOT_STATE_FAIL;
}
return KNOT_STATE_DONE;
}
static int output_error(knot_layer_t *ctx, knot_pkt_t *pkt)
{
knot_wire_set_rcode(pkt->wire, KNOT_RCODE_SERVFAIL);
return KNOT_STATE_DONE;
}
/** Module implementation. */
static const knot_layer_api_t LAYER_QUERY_MODULE = {
&begin,
NULL,
&reset,
&input_query,
&output_answer,
&output_error
};
const knot_layer_api_t *layer_query_module(void)
{
return &LAYER_QUERY_MODULE;
}
......@@ -35,6 +35,7 @@ static void tty_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
cmd[nread - 1] = '\0';
/* Execute */
engine_cmd((struct engine *)stream->data, cmd);
free(buf->base);
}
printf("> ");
......
......@@ -86,7 +86,10 @@ static int insert_endpoint(struct network *net, const char *addr, struct endpoin
array_init(*ep_array);
}
return array_push(*ep_array, ep);
if (array_push(*ep_array, ep) < 0) {
return kr_error(ENOMEM);
}
return kr_ok();
}
/** Open endpoint protocols. */
......
......@@ -27,10 +27,6 @@ enum endpoint_flag {
NET_TCP = 1 << 1
};
struct endpoint_data {
uv_buf_t buf;
};
struct endpoint {
uv_udp_t udp;
uv_tcp_t tcp;
......
......@@ -18,38 +18,231 @@
#include <libknot/packet/pkt.h>
#include <libknot/internal/net.h>
#include <libknot/internal/mempool.h>
#include "daemon/worker.h"
#include "daemon/engine.h"
#include "daemon/layer/query.h"
#include "daemon/io.h"
int worker_exec(struct worker_ctx *worker, knot_pkt_t *answer, knot_pkt_t *query)
/** @internal Query resolution task. */
struct qr_task
{
if (worker == NULL) {
return kr_error(EINVAL);
}
struct kr_request req;
knot_pkt_t *next_query;
uv_handle_t *next_handle;
uv_timer_t timeout;
union {
uv_write_t tcp_send;
uv_udp_send_t udp_send;
uv_connect_t connect;
} ioreq;
struct {
union {
struct sockaddr_in ip4;
struct sockaddr_in6 ip6;
} addr;
uv_handle_t *handle;
} source;
};
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet);
static int parse_query(knot_pkt_t *query)
{
/* Parse query packet. */
int ret = knot_pkt_parse(query, 0);
if (ret != KNOT_EOK) {
return kr_error(EPROTO); /* Ignore malformed query. */
}
/* Process query packet. */
knot_layer_t proc;
memset(&proc, 0, sizeof(knot_layer_t));
proc.mm = worker->mm;
knot_layer_begin(&proc, LAYER_QUERY, &worker->engine->resolver);
int state = knot_layer_consume(&proc, query);
/* Check if at least header is parsed. */
if (query->parsed < query->size) {
return kr_error(EMSGSIZE);
}
return kr_ok();
}
/* Build an answer. */
if (state & (KNOT_STATE_PRODUCE|KNOT_STATE_FAIL)) {
knot_pkt_init_response(answer, query);
state = knot_layer_produce(&proc, answer);
static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr)
{
mm_ctx_t pool;
mm_ctx_mempool(&pool, MM_DEFAULT_BLKSIZE);
/* Create worker task */
struct engine *engine = worker->engine;
struct qr_task *task = mm_alloc(&pool, sizeof(*task));
if (!task) {
mp_delete(pool.ctx);
return NULL;
}
task->req.pool = pool;
task->source.handle = handle;
if (addr) {
memcpy(&task->source.addr, addr, sockaddr_len(addr));
}
/* Create buffers */
knot_pkt_t *next_query = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool);
knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, &task->req.pool);
if (!next_query || !answer) {
mp_delete(pool.ctx);
return NULL;
}
task->req.answer = answer;
task->next_query = next_query;
/* Start resolution */
uv_timer_init(handle->loop, &task->timeout);
task->timeout.data = task;
kr_resolve_begin(&task->req, &engine->resolver, answer);
return task;
}
/* Cleanup. */
knot_layer_finish(&proc);
static void qr_task_close(uv_handle_t *handle)
{
struct qr_task *task = handle->data;
mp_delete(task->req.pool.ctx);
}
static void qr_task_timeout(uv_timer_t *req)
{
struct qr_task *task = req->data;
if (!uv_is_closing(task->next_handle)) {
io_stop_read(task->next_handle);
uv_close(task->next_handle, (uv_close_cb) free);
qr_task_step(task, NULL);
}
}
static void qr_task_on_send(uv_req_t* req, int status)
{
struct qr_task *task = req->data;
if (task) {
/* Start reading answer */
if (task->req.overlay.state != KNOT_STATE_NOOP) {
if (status == 0 && task->next_handle) {
io_start_read(task->next_handle);
}
} else {
/* Finalize task */
uv_close((uv_handle_t *)&task->timeout, qr_task_close);
}
}
}
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
{
if (handle->type == UV_UDP) {
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
uv_udp_send_t *req = &task->ioreq.udp_send;
req->data = task;
return uv_udp_send(req, (uv_udp_t *)handle, &buf, 1, addr, (uv_udp_send_cb)qr_task_on_send);
} else {
uint16_t pkt_size = htons(pkt->size);
uv_buf_t buf[2] = {
{ (char *)&pkt_size, sizeof(pkt_size) },
{ (char *)pkt->wire, pkt->size }
};
uv_write_t *req = &task->ioreq.tcp_send;
req->data = task;
return uv_write(req, (uv_stream_t *)handle, buf, 2, (uv_write_cb)qr_task_on_send);
}
}
static void qr_task_on_connect(uv_connect_t *connect, int status)
{
uv_stream_t *handle = connect->handle;
struct qr_task *task = connect->data;
if (status != 0) { /* Failed to connect */
qr_task_step(task, NULL);
} else {
qr_task_send(task, (uv_handle_t *)handle, NULL, task->next_query);
}
}
static int qr_task_finalize(struct qr_task *task, int state)
{
kr_resolve_finish(&task->req, state);
uv_timer_stop(&task->timeout);
qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
}
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
{
/* Cancel timeout if active */
uv_timer_stop(&task->timeout);
task->next_handle = NULL;
/* Consume input and produce next query */
int sock_type = -1;
struct sockaddr *addr = NULL;
knot_pkt_t *next_query = task->next_query;
int state = kr_resolve_consume(&task->req, packet);
while (state == KNOT_STATE_PRODUCE) {</