diff --git a/daemon/engine.h b/daemon/engine.h index 2af6bdbe22ff7814cfdf069887b2735105775e4e..0b32dfd80ca6a2325080c131663f092452364e2e 100644 --- a/daemon/engine.h +++ b/daemon/engine.h @@ -18,13 +18,13 @@ /* Magic defaults */ #ifndef LRU_RTT_SIZE -#define LRU_RTT_SIZE 4096 /**< NS RTT cache size */ +#define LRU_RTT_SIZE 65536 /**< NS RTT cache size */ #endif #ifndef LRU_REP_SIZE -#define LRU_REP_SIZE (LRU_RTT_SIZE / 2) /**< NS reputation cache size */ +#define LRU_REP_SIZE (LRU_RTT_SIZE / 4) /**< NS reputation cache size */ #endif #ifndef MP_FREELIST_SIZE -#define MP_FREELIST_SIZE 32 /**< Maximum length of the worker mempool freelist */ +#define MP_FREELIST_SIZE 64 /**< Maximum length of the worker mempool freelist */ #endif #ifndef RECVMMSG_BATCH #define RECVMMSG_BATCH 4 diff --git a/daemon/worker.c b/daemon/worker.c index 85b1919876dc0a1104d88d5fde7aab4dad0f5887..8872a0be3b330b8b7a25a4cf72d419a9e12ad37b 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -56,6 +56,7 @@ struct qr_task struct kr_request req; struct worker_ctx *worker; knot_pkt_t *pktbuf; + array_t(struct qr_task *) waiting; uv_handle_t *pending[MAX_PENDING]; uint16_t pending_count; uint16_t addrlist_count; @@ -74,7 +75,8 @@ struct qr_task uint16_t iter_count; uint16_t refs; uint16_t bytes_remaining; - uint16_t finished; + bool finished; + bool leading; }; /* Convenience macros */ @@ -246,12 +248,14 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha } task->req.answer = answer; task->pktbuf = pktbuf; + array_init(task->waiting); task->addrlist = NULL; task->pending_count = 0; task->bytes_remaining = 0; task->iter_count = 0; task->refs = 1; task->finished = false; + task->leading = false; task->worker = worker; task->source.handle = handle; uv_timer_init(worker->loop, &task->retry); @@ -310,6 +314,8 @@ static void qr_task_complete(uv_handle_t *handle) struct worker_ctx *worker = task->worker; /* Kill pending I/O requests */ ioreq_killall(task); + assert(task->waiting.len == 0); + assert(task->leading == false); /* Run the completion callback. */ if (task->on_complete) { task->on_complete(worker, &task->req, task->baton); @@ -475,16 +481,16 @@ static void on_retransmit(uv_timer_t *req) } } -static int qr_task_finalize(struct qr_task *task, int state) +/** @internal Get key from current outstanding subrequest. */ +static int subreq_key(char *dst, struct qr_task *task) { - kr_resolve_finish(&task->req, state); - task->finished = true; - /* Send back answer */ - (void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer); - return state == KNOT_STATE_DONE ? 0 : kr_error(EIO); + assert(task); + knot_pkt_t *pkt = task->pktbuf; + assert(knot_wire_get_qr(pkt->wire) == false); + return kr_rrmap_key(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt)); } -static void cancel_subrequests(struct qr_task *task) +static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt) { /* Close pending I/O requests */ if (uv_is_active((uv_handle_t *)&task->retry)) @@ -492,6 +498,71 @@ static void cancel_subrequests(struct qr_task *task) if (uv_is_active((uv_handle_t *)&task->timeout)) uv_timer_stop(&task->timeout); ioreq_killall(task); + /* Clear from outstanding table. */ + if (!task->leading) + return; + char key[RRMAP_KEYSIZE]; + int ret = subreq_key(key, task); + if (ret > 0) { + assert(map_get(&task->worker->outstanding, key) == task); + map_del(&task->worker->outstanding, key); + } + /* Notify waiting tasks. */ + struct kr_query *leader_qry = TAIL(task->req.rplan.pending); + for (size_t i = task->waiting.len; i --> 0;) { + struct qr_task *follower = task->waiting.at[i]; + struct kr_query *qry = TAIL(follower->req.rplan.pending); + /* Reuse MSGID and 0x20 secret */ + if (qry) { + qry->id = leader_qry->id; + qry->secret = leader_qry->secret; + leader_qry->secret = 0; /* Next will be already decoded */ + } + qr_task_step(follower, packet_source, pkt); + qr_task_unref(follower); + } + task->waiting.len = 0; + task->leading = false; +} + +static void subreq_lead(struct qr_task *task) +{ + assert(task); + char key[RRMAP_KEYSIZE]; + if (subreq_key(key, task) > 0) { + assert(map_contains(&task->worker->outstanding, key) == false); + map_set(&task->worker->outstanding, key, task); + task->leading = true; + } +} + +static bool subreq_enqueue(struct qr_task *task) +{ + assert(task); + char key[RRMAP_KEYSIZE]; + if (subreq_key(key, task) > 0) { + struct qr_task *leader = map_get(&task->worker->outstanding, key); + if (leader) { + /* Enqueue itself to leader for this subrequest. */ + int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, mm_reserve, &leader->req.pool); + if (ret == 0) { + array_push(leader->waiting, task); + qr_task_ref(task); + return true; + } + } + } + return false; +} + +static int qr_task_finalize(struct qr_task *task, int state) +{ + assert(task && task->leading == false); + kr_resolve_finish(&task->req, state); + task->finished = true; + /* Send back answer */ + (void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer); + return state == KNOT_STATE_DONE ? 0 : kr_error(EIO); } static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet) @@ -501,7 +572,7 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour return kr_error(ESTALE); } /* Close pending I/O requests */ - cancel_subrequests(task); + subreq_finalize(task, packet_source, packet); /* Consume input and produce next query */ int sock_type = -1; task->addrlist = NULL; @@ -532,11 +603,20 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour /* Start fast retransmit with UDP, otherwise connect. */ if (sock_type == SOCK_DGRAM) { + /* If such subrequest is outstanding, enqueue to it. */ + if (subreq_enqueue(task)) { + return kr_ok(); /* Will be notified when outstanding subrequest finishes. */ + } + /* Start transmitting */ if (retransmit(task)) { uv_timer_start(&task->retry, on_retransmit, KR_CONN_RETRY, KR_CONN_RETRY); } else { return qr_task_step(task, NULL, NULL); } + /* Announce and start subrequest. + * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly. + */ + subreq_lead(task); } else { struct ioreq *conn = ioreq_take(task->worker); if (!conn) { @@ -549,7 +629,6 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour } conn->as.connect.data = task; if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) { - DEBUG_MSG("task conn_start %p => failed\n", task); ioreq_release(task->worker, conn); return qr_task_step(task, NULL, NULL); } @@ -559,9 +638,12 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour /* Start next step with timeout, fatal if can't start a timer. */ int ret = uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0); - if (ret != 0) + if (ret != 0) { + subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KNOT_STATE_FAIL); - return kr_ok(); + } + + return ret; } static int parse_packet(knot_pkt_t *query) @@ -695,6 +777,7 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool)); worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t)); worker->pkt_pool.alloc = (mm_alloc_t) mp_alloc; + worker->outstanding = map_make(); return kr_ok(); } @@ -712,6 +795,7 @@ void worker_reclaim(struct worker_ctx *worker) reclaim_freelist(worker->ioreqs, struct ioreq, free); mp_delete(worker->pkt_pool.ctx); worker->pkt_pool.ctx = NULL; + map_clear(&worker->outstanding); } #undef DEBUG_MSG diff --git a/daemon/worker.h b/daemon/worker.h index fa2824dc5fcd052ba95fd842ae94554254017b52..92a1c2f3b72d74a311f1232f74cca8cab8a58565 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -20,6 +20,7 @@ #include "daemon/engine.h" #include "lib/generic/array.h" +#include "lib/generic/map.h" /* @cond internal Freelist of available mempools. */ typedef array_t(void *) mp_freelist_t; @@ -46,6 +47,7 @@ struct worker_ctx { size_t dropped; size_t timeout; } stats; + map_t outstanding; mp_freelist_t pools; mp_freelist_t ioreqs; mm_ctx_t pkt_pool; diff --git a/lib/utils.c b/lib/utils.c index 77e07bf966cb076427625d70d17423e0af162beb..75cd81a080613a2a79df45115936da4dc991ff55 100644 --- a/lib/utils.c +++ b/lib/utils.c @@ -304,6 +304,25 @@ int kr_bitcmp(const char *a, const char *b, int bits) return ret; } +int kr_rrmap_key(char *key, const knot_dname_t *owner, uint16_t type, uint8_t rank) +{ + if (!key || !owner) { + return kr_error(EINVAL); + } + key[0] = (rank << 2) | 0x01; /* Must be non-zero */ + uint8_t *key_buf = (uint8_t *)key + 1; + int ret = knot_dname_to_wire(key_buf, owner, KNOT_DNAME_MAXLEN); + if (ret <= 0) { + return ret; + } + knot_dname_to_lower(key_buf); + key_buf += ret - 1; + /* Must convert to string, as the key must not contain 0x00 */ + ret = u16tostr(key_buf, type); + key_buf[ret] = '\0'; + return (char *)&key_buf[ret] - key; +} + int kr_rrmap_add(map_t *stash, const knot_rrset_t *rr, uint8_t rank, mm_ctx_t *pool) { if (!stash || !rr) { @@ -311,27 +330,20 @@ int kr_rrmap_add(map_t *stash, const knot_rrset_t *rr, uint8_t rank, mm_ctx_t *p } /* Stash key = {[1] flags, [1-255] owner, [5] type, [1] \x00 } */ - char key[9 + KNOT_DNAME_MAXLEN]; + char key[RRMAP_KEYSIZE]; + uint8_t extra_flags = 0; uint16_t rrtype = rr->type; - key[0] = (rank << 2) | 0x01; /* Must be non-zero */ - /* Stash RRSIGs in a special cache, flag them and set type to its covering RR. * This way it the stash won't merge RRSIGs together. */ if (rr->type == KNOT_RRTYPE_RRSIG) { rrtype = knot_rrsig_type_covered(&rr->rrs, 0); - key[0] |= KEY_FLAG_RRSIG; + extra_flags |= KEY_FLAG_RRSIG; } - - uint8_t *key_buf = (uint8_t *)key + 1; - int ret = knot_dname_to_wire(key_buf, rr->owner, KNOT_DNAME_MAXLEN); + int ret = kr_rrmap_key(key, rr->owner, rrtype, rank); if (ret <= 0) { - return ret; + return kr_error(EILSEQ); } - knot_dname_to_lower(key_buf); - key_buf += ret - 1; - /* Must convert to string, as the key must not contain 0x00 */ - ret = u16tostr(key_buf, rrtype); - key_buf[ret] = '\0'; + key[0] |= extra_flags; /* Check if already exists */ knot_rrset_t *stashed = map_get(stash, key); diff --git a/lib/utils.h b/lib/utils.h index 667ae66bb9cd9aaf298f48b73f2108b01bb880b0..cfa789cedc6a44b01bf8bd4fee38113909a71857 100644 --- a/lib/utils.h +++ b/lib/utils.h @@ -117,6 +117,11 @@ int kr_bitcmp(const char *a, const char *b, int bits); #define KEY_FLAG_RRSIG 0x02 #define KEY_FLAG_RANK(key) (key[0] >> 2) #define KEY_COVERING_RRSIG(key) (key[0] & KEY_FLAG_RRSIG) +/* Stash key = {[1] flags, [1-255] owner, [5] type, [1] \x00 } */ +#define RRMAP_KEYSIZE (9 + KNOT_DNAME_MAXLEN) + +/** @internal Create unique string key for RR. */ +int kr_rrmap_key(char *key, const knot_dname_t *owner, uint16_t type, uint8_t rank); /** @internal Merges RRSets with matching owner name and type together. * @note RRSIG RRSets are merged according the type covered fields.