diff --git a/daemon/engine.h b/daemon/engine.h index e82474cd9054552c1afcf84cb2353a2912ad7e37..13dc097b13455287bffe4f3883924edff6181ce2 100644 --- a/daemon/engine.h +++ b/daemon/engine.h @@ -26,6 +26,9 @@ #ifndef MP_FREELIST_SIZE #define MP_FREELIST_SIZE 32 /**< Maximum length of the worker mempool freelist */ #endif +#ifndef RECVMMSG_BATCH +#define RECVMMSG_BATCH 8 +#endif /* * @internal These are forward decls to allow building modules with engine but without Lua. diff --git a/daemon/worker.c b/daemon/worker.c index 63f4dad13b045c2e0c9d427a45970f2950ee4d7c..58ab23e1948bd08b161dddbbc311378cf67b3ff4 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -26,6 +26,37 @@ #include "daemon/engine.h" #include "daemon/io.h" +/* @internal IO request entry. */ +struct ioreq +{ + union { + uv_udp_send_t send; + uv_write_t write; + uv_connect_t connect; + } as; +}; + +static inline struct ioreq *ioreq_take(struct worker_ctx *worker) +{ + struct ioreq *req = NULL; + if (worker->ioreqs.len > 0) { + req = array_tail(worker->ioreqs); + array_pop(worker->ioreqs); + } else { + req = malloc(sizeof(*req)); + } + return req; +} + +static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req) +{ + if (!req || worker->ioreqs.len < MP_FREELIST_SIZE) { + array_push(worker->ioreqs, req); + } else { + free(req); + } +} + /** @internal Query resolution task. */ struct qr_task { @@ -75,7 +106,7 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha pool.ctx = array_tail(worker->pools); array_pop(worker->pools); } else { /* No mempool on the freelist, create new one */ - pool.ctx = mp_new (16 * CPU_PAGE_SIZE); + pool.ctx = mp_new (20 * CPU_PAGE_SIZE); } /* Create worker task */ @@ -178,15 +209,34 @@ static int qr_task_on_send(struct qr_task *task, int status) return status; } +static void on_send(uv_udp_send_t *req, int status) +{ + struct qr_task *task = req->data; + qr_task_on_send(task, status); + ioreq_release(task->worker, (struct ioreq *)req); +} + +static void on_write(uv_write_t *req, int status) +{ + struct qr_task *task = req->data; + qr_task_on_send(task, status); + ioreq_release(task->worker, (struct ioreq *)req); +} + static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt) { int ret = 0; if (!handle) { return qr_task_on_send(task, kr_error(EIO)); } + struct ioreq *req = ioreq_take(task->worker); + if (!req) { + return qr_task_on_send(task, kr_error(ENOMEM)); + } if (handle->type == UV_UDP) { uv_buf_t buf = { (char *)pkt->wire, pkt->size }; - ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr); + req->as.send.data = task; + ret = uv_udp_send(&req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send); if (handle != task->source.handle) task->worker->stats.udp += 1; } else { @@ -195,20 +245,26 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad { (char *)&pkt_size, sizeof(pkt_size) }, { (char *)pkt->wire, pkt->size } }; - ret = uv_try_write((uv_stream_t *)handle, buf, 2); + req->as.write.data = task; + ret = uv_write(&req->as.write, (uv_stream_t *)handle, buf, 2, &on_write); if (handle != task->source.handle) task->worker->stats.tcp += 1; } - return qr_task_on_send(task, (ret >= 0) ? 0 : -1); + if (ret != 0) { + ioreq_release(task->worker, req); + } + return ret; } -static void qr_task_on_connect(uv_connect_t *connect, int status) +static void on_connect(uv_connect_t *req, int status) { + struct qr_task *task = req->data; if (status == 0) { - struct qr_task *task = connect->data; - qr_task_send(task, (uv_handle_t *)connect->handle, NULL, task->next_query); + qr_task_send(task, (uv_handle_t *)req->handle, NULL, task->next_query); + ioreq_release(task->worker, (struct ioreq *)req); + } else { /* Must not recycle, as 'task' may be freed. */ + free(req); } - free(connect); } static int qr_task_finalize(struct qr_task *task, int state) @@ -257,14 +313,12 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet) /* Connect or issue query datagram */ task->next_handle->data = task; if (sock_type == SOCK_STREAM) { - /* connect handle must be persistent even if the task mempool drops, - * as it is referenced internally in the libuv event loop */ - uv_connect_t *connect = malloc(sizeof(*connect)); - if (!connect || uv_tcp_connect(connect, (uv_tcp_t *)task->next_handle, addr, qr_task_on_connect) != 0) { - free(connect); + struct ioreq *req = ioreq_take(task->worker); + if (!req || uv_tcp_connect(&req->as.connect, (uv_tcp_t *)task->next_handle, addr, on_connect) != 0) { + ioreq_release(task->worker, req); return qr_task_step(task, NULL); } - connect->data = task; + req->as.connect.data = task; } else { if (qr_task_send(task, task->next_handle, addr, next_query) != 0) { return qr_task_step(task, NULL); @@ -323,11 +377,14 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) return array_reserve(worker->pools, ring_maxlen); } +#define reclaim_freelist(list, cb) \ + for (unsigned i = 0; i < list.len; ++i) { \ + cb(list.at[i]); \ + } \ + array_clear(list) + void worker_reclaim(struct worker_ctx *worker) { - mp_freelist_t *pools = &worker->pools; - for (unsigned i = 0; i < pools->len; ++i) { - mp_delete(pools->at[i]); - } - array_clear(*pools); + reclaim_freelist(worker->pools, mp_delete); + reclaim_freelist(worker->ioreqs, free); } diff --git a/daemon/worker.h b/daemon/worker.h index ca627d5f446f64b927ec2df0437142830c2db3a0..233c865b70f51df20715d489f3d62a64f89e9e1f 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -32,13 +32,18 @@ struct worker_ctx { struct engine *engine; uv_loop_t *loop; mm_ctx_t *mm; - uint8_t wire_buf[4 * KNOT_WIRE_MAX_PKTSIZE]; +#if __linux__ + uint8_t wire_buf[RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE]; +#else + uint8_t wire_buf[KNOT_WIRE_MAX_PKTSIZE]; +#endif struct { size_t concurrent; size_t udp; size_t tcp; } stats; mp_freelist_t pools; + mp_freelist_t ioreqs; }; /**