Commit e638f9fb authored by Marek Vavrusa's avatar Marek Vavrusa
Browse files

daemon/worker: deduplicate inbound queries

many clients do frequent retransmits of the query
to avoid network losses and get better service,
but then fail to work properly when a resolver
answers SERVFAIL to some of them because of the
time limit and some of them NOERROR. 
it's also a good idea to avoid wasting time
tracking pending tasks to solve the same thing.
parent 4775ec71
...@@ -200,6 +200,13 @@ static inline void pool_release(struct worker_ctx *worker, struct mempool *mp) ...@@ -200,6 +200,13 @@ static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
} }
} }
/** @internal Get key from current outgoing subrequest. */
static int subreq_key(char *dst, knot_pkt_t *pkt)
{
assert(pkt);
return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
}
static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr) static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr)
{ {
/* How much can client handle? */ /* How much can client handle? */
...@@ -274,7 +281,7 @@ static void qr_task_free(struct qr_task *task) ...@@ -274,7 +281,7 @@ static void qr_task_free(struct qr_task *task)
} }
} }
/* Start reading again if the session is throttled and /* Start reading again if the session is throttled and
* the number of outstanding requests is below watermark. */ * the number of outgoing requests is below watermark. */
uv_handle_t *handle = task->source.handle; uv_handle_t *handle = task->source.handle;
if (handle && session->tasks.len < task->worker->tcp_pipeline_max/2) { if (handle && session->tasks.len < task->worker->tcp_pipeline_max/2) {
if (!uv_is_closing(handle) && session->throttled) { if (!uv_is_closing(handle) && session->throttled) {
...@@ -330,6 +337,11 @@ static int qr_task_start(struct qr_task *task, knot_pkt_t *query) ...@@ -330,6 +337,11 @@ static int qr_task_start(struct qr_task *task, knot_pkt_t *query)
if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) { if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
task->req.options |= QUERY_NO_THROTTLE; task->req.options |= QUERY_NO_THROTTLE;
} }
/* Track outstanding inbound queries as well for deduplication. */
char key[KR_RRKEY_LEN];
if (subreq_key(key, query) > 0) {
map_set(&task->worker->outstanding, key, task);
}
return 0; return 0;
} }
...@@ -368,6 +380,12 @@ static void qr_task_complete(struct qr_task *task) ...@@ -368,6 +380,12 @@ static void qr_task_complete(struct qr_task *task)
if (task->on_complete) { if (task->on_complete) {
task->on_complete(worker, &task->req, task->baton); task->on_complete(worker, &task->req, task->baton);
} }
char key[KR_RRKEY_LEN];
/* Clear outstanding query. */
int ret = subreq_key(key, task->req.answer);
if (ret > 0) {
map_del(&task->worker->outstanding, key);
}
/* Release primary reference to task. */ /* Release primary reference to task. */
qr_task_unref(task); qr_task_unref(task);
} }
...@@ -556,15 +574,6 @@ static int timer_start(struct qr_task *task, uv_timer_cb cb, uint64_t timeout, u ...@@ -556,15 +574,6 @@ static int timer_start(struct qr_task *task, uv_timer_cb cb, uint64_t timeout, u
return 0; return 0;
} }
/** @internal Get key from current outstanding subrequest. */
static int subreq_key(char *dst, struct qr_task *task)
{
assert(task);
knot_pkt_t *pkt = task->pktbuf;
assert(knot_wire_get_qr(pkt->wire) == false);
return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
}
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt) static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
{ {
/* Close pending timer */ /* Close pending timer */
...@@ -576,14 +585,14 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_ ...@@ -576,14 +585,14 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_
task->timeout = NULL; task->timeout = NULL;
} }
ioreq_killall(task); ioreq_killall(task);
/* Clear from outstanding table. */ /* Clear from outgoing table. */
if (!task->leading) if (!task->leading)
return; return;
char key[KR_RRKEY_LEN]; char key[KR_RRKEY_LEN];
int ret = subreq_key(key, task); int ret = subreq_key(key, task->pktbuf);
if (ret > 0) { if (ret > 0) {
assert(map_get(&task->worker->outstanding, key) == task); assert(map_get(&task->worker->outgoing, key) == task);
map_del(&task->worker->outstanding, key); map_del(&task->worker->outgoing, key);
} }
/* Notify waiting tasks. */ /* Notify waiting tasks. */
struct kr_query *leader_qry = array_tail(task->req.rplan.pending); struct kr_query *leader_qry = array_tail(task->req.rplan.pending);
...@@ -607,9 +616,9 @@ static void subreq_lead(struct qr_task *task) ...@@ -607,9 +616,9 @@ static void subreq_lead(struct qr_task *task)
{ {
assert(task); assert(task);
char key[KR_RRKEY_LEN]; char key[KR_RRKEY_LEN];
if (subreq_key(key, task) > 0) { if (subreq_key(key, task->pktbuf) > 0) {
assert(map_contains(&task->worker->outstanding, key) == false); assert(map_contains(&task->worker->outgoing, key) == false);
map_set(&task->worker->outstanding, key, task); map_set(&task->worker->outgoing, key, task);
task->leading = true; task->leading = true;
} }
} }
...@@ -618,8 +627,8 @@ static bool subreq_enqueue(struct qr_task *task) ...@@ -618,8 +627,8 @@ static bool subreq_enqueue(struct qr_task *task)
{ {
assert(task); assert(task);
char key[KR_RRKEY_LEN]; char key[KR_RRKEY_LEN];
if (subreq_key(key, task) > 0) { if (subreq_key(key, task->pktbuf) > 0) {
struct qr_task *leader = map_get(&task->worker->outstanding, key); struct qr_task *leader = map_get(&task->worker->outgoing, key);
if (leader) { if (leader) {
/* Enqueue itself to leader for this subrequest. */ /* Enqueue itself to leader for this subrequest. */
int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, kr_memreserve, &leader->req.pool); int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, kr_memreserve, &leader->req.pool);
...@@ -681,9 +690,9 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour ...@@ -681,9 +690,9 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
/* Start fast retransmit with UDP, otherwise connect. */ /* Start fast retransmit with UDP, otherwise connect. */
int ret = 0; int ret = 0;
if (sock_type == SOCK_DGRAM) { if (sock_type == SOCK_DGRAM) {
/* If such subrequest is outstanding, enqueue to it. */ /* If there is already outgoing query, enqueue to it. */
if (subreq_enqueue(task)) { if (subreq_enqueue(task)) {
return kr_ok(); /* Will be notified when outstanding subrequest finishes. */ return kr_ok(); /* Will be notified when outgoing query finishes. */
} }
/* Start transmitting */ /* Start transmitting */
if (retransmit(task)) { if (retransmit(task)) {
...@@ -762,6 +771,28 @@ int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *ms ...@@ -762,6 +771,28 @@ int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *ms
if (msg) worker->stats.dropped += 1; if (msg) worker->stats.dropped += 1;
return kr_error(EINVAL); /* Ignore. */ return kr_error(EINVAL); /* Ignore. */
} }
/* De-duplicate inbound requests.
* Many clients do frequent retransmits of the query
* in order to avoid network losses and get better service,
* but fail to work properly when resolver answers them all
* but some of them SERVFAIL because of the time limit and some
* of them succeed. It's also a good idea to avoid wasting time
* tracking pending tasks to solve the same thing. */
char key[KR_RRKEY_LEN];
if (subreq_key(key, msg) > 0) {
struct qr_task *task = map_get(&worker->outstanding, key);
if (task && task->source.handle == handle && task->req.qsource.addr &&
addr->sa_family == task->source.addr.ip4.sin_family &&
knot_wire_get_id(msg->wire) == knot_wire_get_id(task->req.answer->wire)) {
/* Query probably matches, check if it comes from the same origin. */
size_t addr_len = sizeof(struct sockaddr_in);
if (addr->sa_family == AF_INET6)
addr_len = sizeof(struct sockaddr_in6);
if (memcmp(&task->source.addr, addr, addr_len) == 0) {
return kr_error(EEXIST); /* Ignore query */
}
}
}
task = qr_task_create(worker, handle, addr); task = qr_task_create(worker, handle, addr);
if (!task) { if (!task) {
return kr_error(ENOMEM); return kr_error(ENOMEM);
...@@ -962,6 +993,7 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) ...@@ -962,6 +993,7 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool)); memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t)); worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc; worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc;
worker->outgoing = map_make();
worker->outstanding = map_make(); worker->outstanding = map_make();
worker->tcp_pipeline_max = MAX_PIPELINED; worker->tcp_pipeline_max = MAX_PIPELINED;
return kr_ok(); return kr_ok();
...@@ -982,6 +1014,7 @@ void worker_reclaim(struct worker_ctx *worker) ...@@ -982,6 +1014,7 @@ void worker_reclaim(struct worker_ctx *worker)
reclaim_freelist(worker->pool_sessions, struct session, session_free); reclaim_freelist(worker->pool_sessions, struct session, session_free);
mp_delete(worker->pkt_pool.ctx); mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL; worker->pkt_pool.ctx = NULL;
map_clear(&worker->outgoing);
map_clear(&worker->outstanding); map_clear(&worker->outstanding);
} }
......
...@@ -50,6 +50,7 @@ struct worker_ctx { ...@@ -50,6 +50,7 @@ struct worker_ctx {
size_t dropped; size_t dropped;
size_t timeout; size_t timeout;
} stats; } stats;
map_t outgoing;
map_t outstanding; map_t outstanding;
mp_freelist_t pool_mp; mp_freelist_t pool_mp;
mp_freelist_t pool_ioreq; mp_freelist_t pool_ioreq;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment