Skip to content
Snippets Groups Projects
Commit 561ef072 authored by Marek Vavruša's avatar Marek Vavruša
Browse files

Merge branch 'subreq-deduping'

parents 8ea9b7df 9a1caee6
Branches
Tags
No related merge requests found
......@@ -18,13 +18,13 @@
/* Magic defaults */
#ifndef LRU_RTT_SIZE
#define LRU_RTT_SIZE 4096 /**< NS RTT cache size */
#define LRU_RTT_SIZE 65536 /**< NS RTT cache size */
#endif
#ifndef LRU_REP_SIZE
#define LRU_REP_SIZE (LRU_RTT_SIZE / 2) /**< NS reputation cache size */
#define LRU_REP_SIZE (LRU_RTT_SIZE / 4) /**< NS reputation cache size */
#endif
#ifndef MP_FREELIST_SIZE
#define MP_FREELIST_SIZE 32 /**< Maximum length of the worker mempool freelist */
#define MP_FREELIST_SIZE 64 /**< Maximum length of the worker mempool freelist */
#endif
#ifndef RECVMMSG_BATCH
#define RECVMMSG_BATCH 4
......
......@@ -56,6 +56,7 @@ struct qr_task
struct kr_request req;
struct worker_ctx *worker;
knot_pkt_t *pktbuf;
array_t(struct qr_task *) waiting;
uv_handle_t *pending[MAX_PENDING];
uint16_t pending_count;
uint16_t addrlist_count;
......@@ -74,7 +75,8 @@ struct qr_task
uint16_t iter_count;
uint16_t refs;
uint16_t bytes_remaining;
uint16_t finished;
bool finished;
bool leading;
};
/* Convenience macros */
......@@ -246,12 +248,14 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
}
task->req.answer = answer;
task->pktbuf = pktbuf;
array_init(task->waiting);
task->addrlist = NULL;
task->pending_count = 0;
task->bytes_remaining = 0;
task->iter_count = 0;
task->refs = 1;
task->finished = false;
task->leading = false;
task->worker = worker;
task->source.handle = handle;
uv_timer_init(worker->loop, &task->retry);
......@@ -310,6 +314,8 @@ static void qr_task_complete(uv_handle_t *handle)
struct worker_ctx *worker = task->worker;
/* Kill pending I/O requests */
ioreq_killall(task);
assert(task->waiting.len == 0);
assert(task->leading == false);
/* Run the completion callback. */
if (task->on_complete) {
task->on_complete(worker, &task->req, task->baton);
......@@ -475,16 +481,16 @@ static void on_retransmit(uv_timer_t *req)
}
}
static int qr_task_finalize(struct qr_task *task, int state)
/** @internal Get key from current outstanding subrequest. */
static int subreq_key(char *dst, struct qr_task *task)
{
kr_resolve_finish(&task->req, state);
task->finished = true;
/* Send back answer */
(void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
assert(task);
knot_pkt_t *pkt = task->pktbuf;
assert(knot_wire_get_qr(pkt->wire) == false);
return kr_rrmap_key(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
}
static void cancel_subrequests(struct qr_task *task)
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
{
/* Close pending I/O requests */
if (uv_is_active((uv_handle_t *)&task->retry))
......@@ -492,6 +498,71 @@ static void cancel_subrequests(struct qr_task *task)
if (uv_is_active((uv_handle_t *)&task->timeout))
uv_timer_stop(&task->timeout);
ioreq_killall(task);
/* Clear from outstanding table. */
if (!task->leading)
return;
char key[RRMAP_KEYSIZE];
int ret = subreq_key(key, task);
if (ret > 0) {
assert(map_get(&task->worker->outstanding, key) == task);
map_del(&task->worker->outstanding, key);
}
/* Notify waiting tasks. */
struct kr_query *leader_qry = TAIL(task->req.rplan.pending);
for (size_t i = task->waiting.len; i --> 0;) {
struct qr_task *follower = task->waiting.at[i];
struct kr_query *qry = TAIL(follower->req.rplan.pending);
/* Reuse MSGID and 0x20 secret */
if (qry) {
qry->id = leader_qry->id;
qry->secret = leader_qry->secret;
leader_qry->secret = 0; /* Next will be already decoded */
}
qr_task_step(follower, packet_source, pkt);
qr_task_unref(follower);
}
task->waiting.len = 0;
task->leading = false;
}
static void subreq_lead(struct qr_task *task)
{
assert(task);
char key[RRMAP_KEYSIZE];
if (subreq_key(key, task) > 0) {
assert(map_contains(&task->worker->outstanding, key) == false);
map_set(&task->worker->outstanding, key, task);
task->leading = true;
}
}
static bool subreq_enqueue(struct qr_task *task)
{
assert(task);
char key[RRMAP_KEYSIZE];
if (subreq_key(key, task) > 0) {
struct qr_task *leader = map_get(&task->worker->outstanding, key);
if (leader) {
/* Enqueue itself to leader for this subrequest. */
int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, mm_reserve, &leader->req.pool);
if (ret == 0) {
array_push(leader->waiting, task);
qr_task_ref(task);
return true;
}
}
}
return false;
}
static int qr_task_finalize(struct qr_task *task, int state)
{
assert(task && task->leading == false);
kr_resolve_finish(&task->req, state);
task->finished = true;
/* Send back answer */
(void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
}
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
......@@ -501,7 +572,7 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
return kr_error(ESTALE);
}
/* Close pending I/O requests */
cancel_subrequests(task);
subreq_finalize(task, packet_source, packet);
/* Consume input and produce next query */
int sock_type = -1;
task->addrlist = NULL;
......@@ -532,11 +603,20 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
/* Start fast retransmit with UDP, otherwise connect. */
if (sock_type == SOCK_DGRAM) {
/* If such subrequest is outstanding, enqueue to it. */
if (subreq_enqueue(task)) {
return kr_ok(); /* Will be notified when outstanding subrequest finishes. */
}
/* Start transmitting */
if (retransmit(task)) {
uv_timer_start(&task->retry, on_retransmit, KR_CONN_RETRY, KR_CONN_RETRY);
} else {
return qr_task_step(task, NULL, NULL);
}
/* Announce and start subrequest.
* @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
*/
subreq_lead(task);
} else {
struct ioreq *conn = ioreq_take(task->worker);
if (!conn) {
......@@ -549,7 +629,6 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
}
conn->as.connect.data = task;
if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
DEBUG_MSG("task conn_start %p => failed\n", task);
ioreq_release(task->worker, conn);
return qr_task_step(task, NULL, NULL);
}
......@@ -559,9 +638,12 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
/* Start next step with timeout, fatal if can't start a timer. */
int ret = uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0);
if (ret != 0)
if (ret != 0) {
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KNOT_STATE_FAIL);
return kr_ok();
}
return ret;
}
static int parse_packet(knot_pkt_t *query)
......@@ -695,6 +777,7 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
worker->pkt_pool.alloc = (mm_alloc_t) mp_alloc;
worker->outstanding = map_make();
return kr_ok();
}
......@@ -712,6 +795,7 @@ void worker_reclaim(struct worker_ctx *worker)
reclaim_freelist(worker->ioreqs, struct ioreq, free);
mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL;
map_clear(&worker->outstanding);
}
#undef DEBUG_MSG
......@@ -20,6 +20,7 @@
#include "daemon/engine.h"
#include "lib/generic/array.h"
#include "lib/generic/map.h"
/* @cond internal Freelist of available mempools. */
typedef array_t(void *) mp_freelist_t;
......@@ -46,6 +47,7 @@ struct worker_ctx {
size_t dropped;
size_t timeout;
} stats;
map_t outstanding;
mp_freelist_t pools;
mp_freelist_t ioreqs;
mm_ctx_t pkt_pool;
......
......@@ -304,6 +304,25 @@ int kr_bitcmp(const char *a, const char *b, int bits)
return ret;
}
int kr_rrmap_key(char *key, const knot_dname_t *owner, uint16_t type, uint8_t rank)
{
if (!key || !owner) {
return kr_error(EINVAL);
}
key[0] = (rank << 2) | 0x01; /* Must be non-zero */
uint8_t *key_buf = (uint8_t *)key + 1;
int ret = knot_dname_to_wire(key_buf, owner, KNOT_DNAME_MAXLEN);
if (ret <= 0) {
return ret;
}
knot_dname_to_lower(key_buf);
key_buf += ret - 1;
/* Must convert to string, as the key must not contain 0x00 */
ret = u16tostr(key_buf, type);
key_buf[ret] = '\0';
return (char *)&key_buf[ret] - key;
}
int kr_rrmap_add(map_t *stash, const knot_rrset_t *rr, uint8_t rank, mm_ctx_t *pool)
{
if (!stash || !rr) {
......@@ -311,27 +330,20 @@ int kr_rrmap_add(map_t *stash, const knot_rrset_t *rr, uint8_t rank, mm_ctx_t *p
}
/* Stash key = {[1] flags, [1-255] owner, [5] type, [1] \x00 } */
char key[9 + KNOT_DNAME_MAXLEN];
char key[RRMAP_KEYSIZE];
uint8_t extra_flags = 0;
uint16_t rrtype = rr->type;
key[0] = (rank << 2) | 0x01; /* Must be non-zero */
/* Stash RRSIGs in a special cache, flag them and set type to its covering RR.
* This way it the stash won't merge RRSIGs together. */
if (rr->type == KNOT_RRTYPE_RRSIG) {
rrtype = knot_rrsig_type_covered(&rr->rrs, 0);
key[0] |= KEY_FLAG_RRSIG;
extra_flags |= KEY_FLAG_RRSIG;
}
uint8_t *key_buf = (uint8_t *)key + 1;
int ret = knot_dname_to_wire(key_buf, rr->owner, KNOT_DNAME_MAXLEN);
int ret = kr_rrmap_key(key, rr->owner, rrtype, rank);
if (ret <= 0) {
return ret;
return kr_error(EILSEQ);
}
knot_dname_to_lower(key_buf);
key_buf += ret - 1;
/* Must convert to string, as the key must not contain 0x00 */
ret = u16tostr(key_buf, rrtype);
key_buf[ret] = '\0';
key[0] |= extra_flags;
/* Check if already exists */
knot_rrset_t *stashed = map_get(stash, key);
......
......@@ -117,6 +117,11 @@ int kr_bitcmp(const char *a, const char *b, int bits);
#define KEY_FLAG_RRSIG 0x02
#define KEY_FLAG_RANK(key) (key[0] >> 2)
#define KEY_COVERING_RRSIG(key) (key[0] & KEY_FLAG_RRSIG)
/* Stash key = {[1] flags, [1-255] owner, [5] type, [1] \x00 } */
#define RRMAP_KEYSIZE (9 + KNOT_DNAME_MAXLEN)
/** @internal Create unique string key for RR. */
int kr_rrmap_key(char *key, const knot_dname_t *owner, uint16_t type, uint8_t rank);
/** @internal Merges RRSets with matching owner name and type together.
* @note RRSIG RRSets are merged according the type covered fields.
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment