Skip to content
Snippets Groups Projects
Commit 9c1c7ca4 authored by Marek Vavruša's avatar Marek Vavruša
Browse files

daemon/worker: timeouts for queries/connections

fixes #22
parent 79105855
Branches
Tags
No related merge requests found
......@@ -54,17 +54,17 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
/* UDP requests are oneshot, always close afterwards */
if (handle->data && !uv_is_closing((uv_handle_t *)handle)) { /* Do not free master socket */
uv_close((uv_handle_t *)handle, handle_free);
}
/* Check the incoming wire length. */
if (nread > KNOT_WIRE_HEADER_SIZE) {
knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
worker_exec(worker, (uv_handle_t *)handle, query, addr);
knot_pkt_free(&query);
}
/* UDP requests are oneshot, always close afterwards */
if (handle->data) { /* Do not free master socket */
uv_close((uv_handle_t *)handle, handle_free);
}
}
int udp_bind(struct endpoint *ep, struct sockaddr *addr)
......@@ -82,15 +82,9 @@ int udp_bind(struct endpoint *ep, struct sockaddr *addr)
void udp_unbind(struct endpoint *ep)
{
uv_udp_t *handle = &ep->udp;
uv_udp_recv_stop(handle);
uv_close((uv_handle_t *)handle, NULL);
}
static void tcp_unbind_handle(uv_handle_t *handle)
{
uv_read_stop((uv_stream_t *)handle);
}
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
uv_loop_t *loop = handle->loop;
......@@ -130,7 +124,7 @@ static void tcp_accept(uv_stream_t *master, int status)
return;
}
uv_read_start(client, handle_getbuf, tcp_recv);
io_start_read((uv_handle_t *)client);
}
int tcp_bind(struct endpoint *ep, struct sockaddr *addr)
......@@ -153,7 +147,6 @@ int tcp_bind(struct endpoint *ep, struct sockaddr *addr)
void tcp_unbind(struct endpoint *ep)
{
tcp_unbind_handle((uv_handle_t *)&ep->tcp);
uv_close((uv_handle_t *)&ep->tcp, NULL);
}
......@@ -163,7 +156,6 @@ uv_handle_t *io_create(uv_loop_t *loop, int type)
uv_udp_t *handle = handle_alloc(loop, sizeof(*handle));
if (handle) {
uv_udp_init(loop, handle);
uv_udp_recv_start(handle, &handle_getbuf, &udp_recv);
}
return (uv_handle_t *)handle;
} else {
......@@ -174,3 +166,21 @@ uv_handle_t *io_create(uv_loop_t *loop, int type)
return (uv_handle_t *)handle;
}
}
int io_start_read(uv_handle_t *handle)
{
if (handle->type == UV_UDP) {
return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
} else {
return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
}
}
int io_stop_read(uv_handle_t *handle)
{
if (handle->type == UV_UDP) {
return uv_udp_recv_stop((uv_udp_t *)handle);
} else {
return uv_read_stop((uv_stream_t *)handle);
}
}
\ No newline at end of file
......@@ -25,3 +25,5 @@ void udp_unbind(struct endpoint *ep);
int tcp_bind(struct endpoint *ep, struct sockaddr *addr);
void tcp_unbind(struct endpoint *ep);
uv_handle_t *io_create(uv_loop_t *loop, int type);
int io_start_read(uv_handle_t *handle);
int io_stop_read(uv_handle_t *handle);
\ No newline at end of file
......@@ -29,6 +29,8 @@ struct qr_task
{
struct kr_request req;
knot_pkt_t *next_query;
uv_handle_t *next_handle;
uv_timer_t timeout;
union {
uv_write_t tcp_send;
uv_udp_send_t udp_send;
......@@ -90,20 +92,40 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
task->next_query = next_query;
/* Start resolution */
uv_timer_init(handle->loop, &task->timeout);
task->timeout.data = task;
kr_resolve_begin(&task->req, &engine->resolver, answer);
return task;
}
static void qr_task_close(uv_handle_t *handle)
{
struct qr_task *task = handle->data;
mp_delete(task->req.pool.ctx);
}
static void qr_task_timeout(uv_timer_t *req)
{
struct qr_task *task = req->data;
if (!uv_is_closing(task->next_handle)) {
io_stop_read(task->next_handle);
uv_close(task->next_handle, (uv_close_cb) free);
qr_task_step(task, NULL);
}
}
static void qr_task_on_send(uv_req_t* req, int status)
{
struct qr_task *task = req->data;
if (task) {
/* Failed to send, invalidate */
if (status != 0) {
qr_task_step(task, NULL);
}
if (task->req.overlay.state == KNOT_STATE_NOOP) {
mp_delete(task->req.pool.ctx);
/* Start reading answer */
if (task->req.overlay.state != KNOT_STATE_NOOP) {
if (status == 0 && task->next_handle) {
io_start_read(task->next_handle);
}
} else {
/* Finalize task */
uv_close((uv_handle_t *)&task->timeout, qr_task_close);
}
}
}
......@@ -141,14 +163,18 @@ static void qr_task_on_connect(uv_connect_t *connect, int status)
static int qr_task_finalize(struct qr_task *task, int state)
{
kr_resolve_finish(&task->req, state);
uv_timer_stop(&task->timeout);
qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
}
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
{
/* Cancel timeout if active */
uv_timer_stop(&task->timeout);
task->next_handle = NULL;
/* Consume input and produce next query */
assert(task);
int sock_type = -1;
struct sockaddr *addr = NULL;
knot_pkt_t *next_query = task->next_query;
......@@ -164,27 +190,30 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
/* Create connection for iterative query */
uv_handle_t *source_handle = task->source.handle;
uv_handle_t *next_handle = io_create(source_handle->loop, sock_type);
if (next_handle == NULL) {
task->next_handle = io_create(source_handle->loop, sock_type);
if (task->next_handle == NULL) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
/* Connect or issue query datagram */
next_handle->data = task;
task->next_handle->data = task;
if (sock_type == SOCK_STREAM) {
uv_connect_t *connect = &task->ioreq.connect;
if (uv_tcp_connect(connect, (uv_tcp_t *)next_handle, addr, qr_task_on_connect) != 0) {
uv_close(next_handle, (uv_close_cb) free);
if (uv_tcp_connect(connect, (uv_tcp_t *)task->next_handle, addr, qr_task_on_connect) != 0) {
uv_close(task->next_handle, (uv_close_cb) free);
return qr_task_step(task, NULL);
}
connect->data = task;
} else {
if (qr_task_send(task, next_handle, addr, next_query) != 0) {
uv_close(next_handle, (uv_close_cb) free);
if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
uv_close(task->next_handle, (uv_close_cb) free);
return qr_task_step(task, NULL);
}
}
/* Start next timeout */
uv_timer_start(&task->timeout, qr_task_timeout, KR_CONN_RTT_MAX, 0);
return kr_ok();
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment