Skip to content
Snippets Groups Projects
Commit 8b4bb443 authored by Marek Vavruša's avatar Marek Vavruša
Browse files

daemon/worker: asynchronous I/O requests

this can coalesce sends/writes in future versions of libuv
parent b3f528f7
Branches
Tags
No related merge requests found
......@@ -26,6 +26,37 @@
#include "daemon/engine.h"
#include "daemon/io.h"
/* @internal IO request entry. */
struct ioreq
{
union {
uv_udp_send_t send;
uv_write_t write;
uv_connect_t connect;
} as;
};
static inline struct ioreq *ioreq_take(struct worker_ctx *worker)
{
struct ioreq *req = NULL;
if (worker->ioreqs.len > 0) {
req = array_tail(worker->ioreqs);
array_pop(worker->ioreqs);
} else {
req = malloc(sizeof(*req));
}
return req;
}
static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req)
{
if (!req || worker->ioreqs.len < MP_FREELIST_SIZE) {
array_push(worker->ioreqs, req);
} else {
free(req);
}
}
/** @internal Query resolution task. */
struct qr_task
{
......@@ -178,15 +209,31 @@ static int qr_task_on_send(struct qr_task *task, int status)
return status;
}
static void on_send(uv_udp_send_t *req, int status)
{
struct qr_task *task = req->data;
qr_task_on_send(task, status);
ioreq_release(task->worker, (struct ioreq *)req);
}
static void on_write(uv_write_t *req, int status)
{
struct qr_task *task = req->data;
qr_task_on_send(task, status);
ioreq_release(task->worker, (struct ioreq *)req);
}
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
{
int ret = 0;
if (!handle) {
struct ioreq *req = ioreq_take(task->worker);
if (!handle || !req) {
return qr_task_on_send(task, kr_error(EIO));
}
if (handle->type == UV_UDP) {
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr);
req->as.send.data = task;
ret = uv_udp_send(&req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
if (handle != task->source.handle)
task->worker->stats.udp += 1;
} else {
......@@ -195,20 +242,21 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
{ (char *)&pkt_size, sizeof(pkt_size) },
{ (char *)pkt->wire, pkt->size }
};
ret = uv_try_write((uv_stream_t *)handle, buf, 2);
req->as.write.data = task;
ret = uv_write(&req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
if (handle != task->source.handle)
task->worker->stats.tcp += 1;
}
return qr_task_on_send(task, (ret >= 0) ? 0 : -1);
return ret;
}
static void qr_task_on_connect(uv_connect_t *connect, int status)
static void on_connect(uv_connect_t *req, int status)
{
struct qr_task *task = req->data;
if (status == 0) {
struct qr_task *task = connect->data;
qr_task_send(task, (uv_handle_t *)connect->handle, NULL, task->next_query);
qr_task_send(task, (uv_handle_t *)req->handle, NULL, task->next_query);
}
free(connect);
ioreq_release(task->worker, (struct ioreq *)req);
}
static int qr_task_finalize(struct qr_task *task, int state)
......@@ -257,14 +305,12 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
/* Connect or issue query datagram */
task->next_handle->data = task;
if (sock_type == SOCK_STREAM) {
/* connect handle must be persistent even if the task mempool drops,
* as it is referenced internally in the libuv event loop */
uv_connect_t *connect = malloc(sizeof(*connect));
if (!connect || uv_tcp_connect(connect, (uv_tcp_t *)task->next_handle, addr, qr_task_on_connect) != 0) {
free(connect);
struct ioreq *req = ioreq_take(task->worker);
if (!req || uv_tcp_connect(&req->as.connect, (uv_tcp_t *)task->next_handle, addr, on_connect) != 0) {
ioreq_release(task->worker, req);
return qr_task_step(task, NULL);
}
connect->data = task;
req->as.connect.data = task;
} else {
if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
return qr_task_step(task, NULL);
......@@ -323,11 +369,14 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
return array_reserve(worker->pools, ring_maxlen);
}
#define reclaim_freelist(list, cb) \
for (unsigned i = 0; i < list.len; ++i) { \
cb(list.at[i]); \
} \
array_clear(list)
void worker_reclaim(struct worker_ctx *worker)
{
mp_freelist_t *pools = &worker->pools;
for (unsigned i = 0; i < pools->len; ++i) {
mp_delete(pools->at[i]);
}
array_clear(*pools);
reclaim_freelist(worker->pools, mp_delete);
reclaim_freelist(worker->ioreqs, free);
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment