Skip to content
Snippets Groups Projects
Verified Commit 82967074 authored by Grigorii Demidov's avatar Grigorii Demidov Committed by Vladimír Čunát
Browse files

daemon/session: migrate from array_t to trie_t & queue_t; daemon/worker: some simplifications

parent 3559f6df
No related branches found
No related tags found
1 merge request!675daemon: attempt of refactoring
......@@ -8,9 +8,7 @@
#include "daemon/tls.h"
#include "daemon/worker.h"
#include "daemon/io.h"
/** List of tasks. */
typedef array_t(struct qr_task *) session_tasklist_t;
#include "lib/generic/queue.h"
/* Per-session (TCP or UDP) persistent structure,
* that exists between remote counterpart and a local socket.
......@@ -24,8 +22,8 @@ struct session {
struct tls_ctx_t *tls_ctx; /**< server side tls-related data. */
struct tls_client_ctx_t *tls_client_ctx; /**< client side tls-related data. */
session_tasklist_t tasks; /**< list of tasks which assotiated with given session. */
session_tasklist_t waiting; /**< list of tasks been waiting for IO (subset of taska). */
trie_t *tasks; /**< list of tasks assotiated with given session. */
queue_t(struct qr_task *) waiting; /**< list of tasks waiting for sending to upstream. */
uint8_t *wire_buf; /**< Buffer for DNS message. */
ssize_t wire_buf_size; /**< Buffer size. */
......@@ -54,7 +52,7 @@ static void on_session_timer_close(uv_handle_t *timer)
void session_free(struct session *session)
{
if (session) {
assert(session->tasks.len == 0 && session->waiting.len == 0);
assert(session_is_empty(session));
session_clear(session);
free(session);
}
......@@ -62,12 +60,13 @@ void session_free(struct session *session)
void session_clear(struct session *session)
{
assert(session->tasks.len == 0 && session->waiting.len == 0);
assert(session_is_empty(session));
if (session->handle && session->handle->type == UV_TCP) {
free(session->wire_buf);
}
array_clear(session->tasks);
array_clear(session->waiting);
trie_clear(session->tasks);
trie_free(session->tasks);
queue_deinit(session->waiting);
tls_free(session->tls_ctx);
tls_client_ctx_free(session->tls_client_ctx);
memset(session, 0, sizeof(*session));
......@@ -75,8 +74,7 @@ void session_clear(struct session *session)
void session_close(struct session *session)
{
assert(session->tasks.len == 0 && session->waiting.len == 0);
assert(session_is_empty(session));
if (session->sflags.closing) {
return;
}
......@@ -84,8 +82,7 @@ void session_close(struct session *session)
uv_handle_t *handle = session->handle;
io_stop_read(handle);
session->sflags.closing = true;
if (session->sflags.outgoing &&
session->peer.ip.sa_family != AF_UNSPEC) {
if (session->peer.ip.sa_family != AF_UNSPEC) {
struct worker_ctx *worker = handle->loop->data;
struct sockaddr *peer = &session->peer.ip;
worker_del_tcp_connected(worker, peer);
......@@ -111,98 +108,124 @@ int session_start_read(struct session *session)
return io_start_read(session->handle);
}
int session_waitinglist_add(struct session *session, struct qr_task *task)
int session_waitinglist_push(struct session *session, struct qr_task *task)
{
for (int i = 0; i < session->waiting.len; ++i) {
if (session->waiting.at[i] == task) {
return i;
}
}
int ret = array_push(session->waiting, task);
if (ret >= 0) {
worker_task_ref(task);
}
return ret;
queue_push(session->waiting, task);
worker_task_ref(task);
return kr_ok();
}
int session_waitinglist_del(struct session *session, struct qr_task *task)
struct qr_task *session_waitinglist_get(const struct session *session)
{
int ret = kr_error(ENOENT);
for (int i = 0; i < session->waiting.len; ++i) {
if (session->waiting.at[i] == task) {
array_del(session->waiting, i);
worker_task_unref(task);
ret = kr_ok();
break;
}
}
return ret;
return queue_head(session->waiting);
}
int session_waitinglist_del_index(struct session *session, int index)
struct qr_task *session_waitinglist_pop(struct session *session, bool deref)
{
int ret = kr_error(ENOENT);
if (index < session->waiting.len) {
struct qr_task *task = session->waiting.at[index];
array_del(session->waiting, index);
worker_task_unref(task);
ret = kr_ok();
struct qr_task *t = session_waitinglist_get(session);
queue_pop(session->waiting);
if (deref) {
worker_task_unref(t);
}
return ret;
return t;
}
int session_tasklist_add(struct session *session, struct qr_task *task)
{
for (int i = 0; i < session->tasks.len; ++i) {
if (session->tasks.at[i] == task) {
return i;
}
trie_t *t = session->tasks;
uint16_t task_msg_id = 0;
const char *key = NULL;
size_t key_len = 0;
if (session->sflags.outgoing) {
knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
task_msg_id = knot_wire_get_id(pktbuf->wire);
key = (const char *)&task_msg_id;
key_len = sizeof(task_msg_id);
} else {
key = (const char *)task;
key_len = sizeof(task);
}
trie_val_t *v = trie_get_ins(t, key, key_len);
if (unlikely(!v)) {
assert(false);
return kr_error(ENOMEM);
}
int ret = array_push(session->tasks, task);
if (ret >= 0) {
if (*v == NULL) {
*v = task;
worker_task_ref(task);
} else if (*v != task) {
assert(false);
return kr_error(ENOMEM);
}
return ret;
return kr_ok();
}
int session_tasklist_del(struct session *session, struct qr_task *task)
{
int ret = kr_error(ENOENT);
for (int i = 0; i < session->tasks.len; ++i) {
if (session->tasks.at[i] == task) {
array_del(session->tasks, i);
worker_task_unref(task);
ret = kr_ok();
break;
}
trie_t *t = session->tasks;
uint16_t task_msg_id = 0;
const char *key = NULL;
size_t key_len = 0;
trie_val_t val;
if (session->sflags.outgoing) {
knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
task_msg_id = knot_wire_get_id(pktbuf->wire);
key = (const char *)&task_msg_id;
key_len = sizeof(task_msg_id);
} else {
key = (const char *)task;
key_len = sizeof(task);
}
int ret = trie_del(t, key, key_len, &val);
if (ret == kr_ok()) {
assert(val == task);
worker_task_unref(val);
}
return ret;
}
int session_tasklist_del_index(struct session *session, int index)
struct qr_task *session_tasklist_get_first(struct session *session)
{
int ret = kr_error(ENOENT);
if (index < session->tasks.len) {
struct qr_task *task = session->tasks.at[index];
array_del(session->tasks, index);
worker_task_unref(task);
ret = kr_ok();
trie_val_t *val = trie_get_first(session->tasks, NULL, NULL);
return val ? (struct qr_task *) *val : NULL;
}
struct qr_task *session_tasklist_del_first(struct session *session, bool deref)
{
trie_val_t val = NULL;
int res = trie_del_first(session->tasks, NULL, NULL, &val);
if (res != kr_ok()) {
val = NULL;
} else if (deref) {
worker_task_unref(val);
}
return (struct qr_task *)val;
}
struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16_t msg_id)
{
trie_t *t = session->tasks;
assert(session->sflags.outgoing);
struct qr_task *ret = NULL;
const char *key = (const char *)&msg_id;
size_t key_len = sizeof(msg_id);
trie_val_t val;
int res = trie_del(t, key, key_len, &val);
if (res == kr_ok()) {
ret = val;
assert(worker_task_numrefs(ret) > 1);
worker_task_unref(ret);
}
return ret;
}
struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id)
struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id)
{
trie_t *t = session->tasks;
assert(session->sflags.outgoing);
struct qr_task *ret = NULL;
const session_tasklist_t *tasklist = &session->tasks;
for (size_t i = 0; i < tasklist->len; ++i) {
struct qr_task *task = tasklist->at[i];
knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
uint16_t task_msg_id = knot_wire_get_id(pktbuf->wire);
if (task_msg_id == msg_id) {
ret = task;
break;
}
trie_val_t *val = trie_get_try(t, (char *)&msg_id, sizeof(msg_id));
if (val) {
ret = *val;
}
return ret;
}
......@@ -249,6 +272,11 @@ uv_handle_t *session_get_handle(struct session *session)
return session->handle;
}
struct session *session_get(uv_handle_t *h)
{
return h->data;
}
struct session *session_new(uv_handle_t *handle)
{
if (!handle) {
......@@ -259,6 +287,8 @@ struct session *session_new(uv_handle_t *handle)
return NULL;
}
queue_init(session->waiting);
session->tasks = trie_create(NULL);
if (handle->type == UV_TCP) {
uint8_t *wire_buf = malloc(KNOT_WIRE_MAX_PKTSIZE);
if (!wire_buf) {
......@@ -293,12 +323,12 @@ struct session *session_new(uv_handle_t *handle)
size_t session_tasklist_get_len(const struct session *session)
{
return session->tasks.len;
return trie_weight(session->tasks);
}
size_t session_waitinglist_get_len(const struct session *session)
{
return session->waiting.len;
return queue_len(session->waiting);
}
bool session_tasklist_is_empty(const struct session *session)
......@@ -327,30 +357,10 @@ void session_set_has_tls(struct session *session, bool has_tls)
session->sflags.has_tls = has_tls;
}
struct qr_task *session_waitinglist_get_first(const struct session *session)
{
struct qr_task *t = NULL;
if (session->waiting.len > 0) {
t = session->waiting.at[0];
}
return t;
}
struct qr_task *session_tasklist_get_first(const struct session *session)
{
struct qr_task *t = NULL;
if (session->tasks.len > 0) {
t = session->tasks.at[0];
}
return t;
}
void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt)
{
while (session->waiting.len > 0) {
struct qr_task *task = session->waiting.at[0];
session_tasklist_del(session, task);
array_del(session->waiting, 0);
while (!session_waitinglist_is_empty(session)) {
struct qr_task *task = session_waitinglist_pop(session, false);
assert(worker_task_numrefs(task) > 1);
if (increase_timeout_cnt) {
worker_task_timeout_inc(task);
......@@ -362,10 +372,8 @@ void session_waitinglist_retry(struct session *session, bool increase_timeout_cn
void session_waitinglist_finalize(struct session *session, int status)
{
while (session->waiting.len > 0) {
struct qr_task *t = session->waiting.at[0];
array_del(session->waiting, 0);
session_tasklist_del(session, t);
while (!session_waitinglist_is_empty(session)) {
struct qr_task *t = session_waitinglist_pop(session, false);
if (session->sflags.outgoing) {
worker_task_finalize(t, status);
} else {
......@@ -379,9 +387,9 @@ void session_waitinglist_finalize(struct session *session, int status)
void session_tasklist_finalize(struct session *session, int status)
{
while (session->tasks.len > 0) {
struct qr_task *t = session->tasks.at[0];
array_del(session->tasks, 0);
while (session_tasklist_get_len(session) > 0) {
struct qr_task *t = session_tasklist_del_first(session, false);
assert(worker_task_numrefs(t) > 0);
if (session->sflags.outgoing) {
worker_task_finalize(t, status);
} else {
......@@ -690,8 +698,6 @@ void session_kill_ioreq(struct session *s, struct qr_task *task)
}
/* TCP-specific code now. */
if (s->handle->type != UV_TCP) abort();
session_waitinglist_del(s, task);
session_tasklist_del(s, task);
int res = 0;
......@@ -714,4 +720,3 @@ void session_kill_ioreq(struct session *s, struct qr_task *task)
session_close(s);
}
}
......@@ -18,7 +18,6 @@
#include <stdbool.h>
#include <uv.h>
#include "lib/generic/array.h"
struct qr_task;
struct worker_ctx;
......@@ -47,16 +46,14 @@ int session_start_read(struct session *session);
/** List of tasks been waiting for IO. */
/** Check if list is empty. */
bool session_waitinglist_is_empty(const struct session *session);
/** Add task to the end of the list. */
int session_waitinglist_push(struct session *session, struct qr_task *task);
/** Get the first element. */
struct qr_task *session_waitinglist_get_first(const struct session *session);
struct qr_task *session_waitinglist_get(const struct session *session);
/** Get the first element and remove it from the list. */
struct qr_task *session_waitinglist_pop(struct session *session, bool deref);
/** Get the list length. */
size_t session_waitinglist_get_len(const struct session *session);
/** Add task to the list. */
int session_waitinglist_add(struct session *session, struct qr_task *task);
/** Remove task from the list. */
int session_waitinglist_del(struct session *session, struct qr_task *task);
/** Remove task from the list by index. */
int session_waitinglist_del_index(struct session *session, int index);
/** Retry resolution for each task in the list. */
void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt);
/** Finalize all tasks in the list. */
......@@ -66,17 +63,19 @@ void session_waitinglist_finalize(struct session *session, int status);
/** Check if list is empty. */
bool session_tasklist_is_empty(const struct session *session);
/** Get the first element. */
struct qr_task *session_tasklist_get_first(const struct session *session);
struct qr_task *session_tasklist_get_first(struct session *session);
/** Get the first element and remove it from the list. */
struct qr_task *session_tasklist_del_first(struct session *session, bool deref);
/** Get the list length. */
size_t session_tasklist_get_len(const struct session *session);
/** Add task to the list. */
int session_tasklist_add(struct session *session, struct qr_task *task);
/** Remove task from the list. */
int session_tasklist_del(struct session *session, struct qr_task *task);
/** Remove task from the list by index. */
int session_tasklist_del_index(struct session *session, int index);
/** Remove task with given msg_id, session_flags(session)->outgoing must be true. */
struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16_t msg_id);
/** Find task with given msg_id */
struct qr_task* session_tasklist_find(const struct session *session, uint16_t msg_id);
struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id);
/** Finalize all tasks in the list. */
void session_tasklist_finalize(struct session *session, int status);
......@@ -103,6 +102,7 @@ struct tls_common_ctx *session_tls_get_common_ctx(const struct session *session)
/** Get pointer to underlying libuv handle for IO operations. */
uv_handle_t *session_get_handle(struct session *session);
struct session *session_get(uv_handle_t *h);
/** Start session timer. */
int session_timer_start(struct session *session, uv_timer_cb cb,
......@@ -139,4 +139,3 @@ knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm);
int session_discard_packet(struct session *session, const knot_pkt_t *pkt);
void session_kill_ioreq(struct session *s, struct qr_task *task);
This diff is collapsed.
......@@ -100,6 +100,9 @@ struct session *worker_request_get_source_session(struct request_ctx *);
void worker_request_set_source_session(struct request_ctx *, struct session *session);
uint16_t worker_task_pkt_get_msgid(struct qr_task *task);
void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid);
/** @cond internal */
/** Number of request within timeout window. */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment