Skip to content
Snippets Groups Projects
Commit 9719c8c2 authored by Marek Vavruša's avatar Marek Vavruša
Browse files

daemon/io: reassemble DNS/TCP message fragments

parent 480d94bc
No related branches found
No related tags found
No related merge requests found
......@@ -86,25 +86,15 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
/* Check for originator connection close / not enough bytes */
if (nread < 2) {
if (!handle->data) {
/* @todo Notify the endpoint if master socket */
/* Check for originator connection close. */
if (nread <= 0) {
if (handle->data) {
worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
}
worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
return;
}
/** @todo This is not going to work if the packet is fragmented in the stream ! */
uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base);
if (nbytes + 2 < nread) {
worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
return;
}
knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, &worker->pkt_pool);
query->max_size = sizeof(worker->wire_buf);
int ret = worker_exec(worker, (uv_handle_t *)handle, query, NULL);
int ret = worker_process_tcp(worker, (uv_handle_t *)handle, (const uint8_t *)buf->base, nread);
if (ret == 0) {
/* Push - pull, stop reading from this handle until
* the task is finished. Since the handle has no track of the
......
......@@ -79,6 +79,7 @@ struct qr_task
} source;
uint16_t iter_count;
uint16_t refs;
uint16_t bytes_remaining;
};
/* Convenience macros */
......@@ -143,6 +144,7 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
task->req.answer = answer;
task->pktbuf = pktbuf;
task->iohandle = NULL;
task->bytes_remaining = 0;
task->iter_count = 0;
task->refs = 1;
task->worker = worker;
......@@ -435,6 +437,67 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer
return qr_task_step(task, addr, query);
}
/* Return DNS/TCP message size. */
static int msg_size(const uint8_t *msg, size_t len)
{
if (len < 2) {
return kr_error(EMSGSIZE);
}
uint16_t nbytes = wire_read_u16(msg);
if (nbytes > len - 2) {
return kr_error(EMSGSIZE);
}
return nbytes;
}
int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len)
{
if (!worker || !handle || !msg) {
return kr_error(EINVAL);
}
int nbytes = msg_size(msg, len);
struct qr_task *task = handle->data;
const bool start_assembly = (task && task->bytes_remaining == 0);
/* Message is a query (we have no context to buffer it) or complete. */
if (!task || (start_assembly && nbytes == len - 2)) {
if (nbytes <= 0) {
return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
}
knot_pkt_t *pkt_nocopy = knot_pkt_new((void *)(msg + 2), nbytes, &worker->pkt_pool);
return worker_exec(worker, handle, pkt_nocopy, NULL);
}
/* Starting a new message assembly */
knot_pkt_t *pkt_buf = task->pktbuf;
if (start_assembly) {
if (nbytes <= 0) {
return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
}
knot_pkt_clear(pkt_buf);
pkt_buf->size = 0;
/* Cut off message length */
task->bytes_remaining = nbytes;
len -= 2;
msg += 2;
}
/* Message is too long, can't process it. */
if (len > pkt_buf->max_size - pkt_buf->size) {
task->bytes_remaining = 0;
return worker_exec(worker, handle, NULL, NULL);
}
/* Buffer message and check if it's complete */
memcpy(pkt_buf->wire + pkt_buf->size, msg, len);
pkt_buf->size += len;
if (len >= task->bytes_remaining) {
task->bytes_remaining = 0;
return worker_exec(worker, handle, pkt_buf, NULL);
}
/* Return number of bytes remaining to receive. */
task->bytes_remaining -= len;
return task->bytes_remaining;
}
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton)
{
if (!worker || !query) {
......
......@@ -57,6 +57,14 @@ typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, v
*/
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
/**
* Process incoming DNS/TCP message fragment.
* If the fragment contains only a partial message, it is buffered.
* If the fragment contains a complete query or completes current fragment, execute it.
* @return 0, number of bytes remaining to assemble, or an error code
*/
int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len);
/**
* Schedule query for resolution.
* @return 0 or an error code
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment