diff --git a/daemon/io.c b/daemon/io.c index 494df3cfd3d80f0a75ecebde9fc62e0f5d0c776b..5597c5a59138551a41d7085ceaba1553b6ad4c57 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -54,17 +54,17 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, uv_loop_t *loop = handle->loop; struct worker_ctx *worker = loop->data; + /* UDP requests are oneshot, always close afterwards */ + if (handle->data && !uv_is_closing((uv_handle_t *)handle)) { /* Do not free master socket */ + uv_close((uv_handle_t *)handle, handle_free); + } + /* Check the incoming wire length. */ if (nread > KNOT_WIRE_HEADER_SIZE) { knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm); worker_exec(worker, (uv_handle_t *)handle, query, addr); knot_pkt_free(&query); } - - /* UDP requests are oneshot, always close afterwards */ - if (handle->data) { /* Do not free master socket */ - uv_close((uv_handle_t *)handle, handle_free); - } } int udp_bind(struct endpoint *ep, struct sockaddr *addr) @@ -82,15 +82,9 @@ int udp_bind(struct endpoint *ep, struct sockaddr *addr) void udp_unbind(struct endpoint *ep) { uv_udp_t *handle = &ep->udp; - uv_udp_recv_stop(handle); uv_close((uv_handle_t *)handle, NULL); } -static void tcp_unbind_handle(uv_handle_t *handle) -{ - uv_read_stop((uv_stream_t *)handle); -} - static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { uv_loop_t *loop = handle->loop; @@ -130,7 +124,7 @@ static void tcp_accept(uv_stream_t *master, int status) return; } - uv_read_start(client, handle_getbuf, tcp_recv); + io_start_read((uv_handle_t *)client); } int tcp_bind(struct endpoint *ep, struct sockaddr *addr) @@ -153,7 +147,6 @@ int tcp_bind(struct endpoint *ep, struct sockaddr *addr) void tcp_unbind(struct endpoint *ep) { - tcp_unbind_handle((uv_handle_t *)&ep->tcp); uv_close((uv_handle_t *)&ep->tcp, NULL); } @@ -163,7 +156,6 @@ uv_handle_t *io_create(uv_loop_t *loop, int type) uv_udp_t *handle = handle_alloc(loop, sizeof(*handle)); if (handle) { uv_udp_init(loop, handle); - uv_udp_recv_start(handle, &handle_getbuf, &udp_recv); } return (uv_handle_t *)handle; } else { @@ -174,3 +166,21 @@ uv_handle_t *io_create(uv_loop_t *loop, int type) return (uv_handle_t *)handle; } } + +int io_start_read(uv_handle_t *handle) +{ + if (handle->type == UV_UDP) { + return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv); + } else { + return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv); + } +} + +int io_stop_read(uv_handle_t *handle) +{ + if (handle->type == UV_UDP) { + return uv_udp_recv_stop((uv_udp_t *)handle); + } else { + return uv_read_stop((uv_stream_t *)handle); + } +} \ No newline at end of file diff --git a/daemon/io.h b/daemon/io.h index e8579cea17956e3919803ce83059066b876fc4a5..815387f0a7c348c19243a12a3920877f0e14fbe0 100644 --- a/daemon/io.h +++ b/daemon/io.h @@ -25,3 +25,5 @@ void udp_unbind(struct endpoint *ep); int tcp_bind(struct endpoint *ep, struct sockaddr *addr); void tcp_unbind(struct endpoint *ep); uv_handle_t *io_create(uv_loop_t *loop, int type); +int io_start_read(uv_handle_t *handle); +int io_stop_read(uv_handle_t *handle); \ No newline at end of file diff --git a/daemon/worker.c b/daemon/worker.c index 06c2d9e8858c3a9def7eec87a5c8e16e4d95eb8f..732b309774861e779b39cfb5ce7977ebb22495e3 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -29,6 +29,8 @@ struct qr_task { struct kr_request req; knot_pkt_t *next_query; + uv_handle_t *next_handle; + uv_timer_t timeout; union { uv_write_t tcp_send; uv_udp_send_t udp_send; @@ -90,20 +92,40 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha task->next_query = next_query; /* Start resolution */ + uv_timer_init(handle->loop, &task->timeout); + task->timeout.data = task; kr_resolve_begin(&task->req, &engine->resolver, answer); return task; } +static void qr_task_close(uv_handle_t *handle) +{ + struct qr_task *task = handle->data; + mp_delete(task->req.pool.ctx); +} + +static void qr_task_timeout(uv_timer_t *req) +{ + struct qr_task *task = req->data; + if (!uv_is_closing(task->next_handle)) { + io_stop_read(task->next_handle); + uv_close(task->next_handle, (uv_close_cb) free); + qr_task_step(task, NULL); + } +} + static void qr_task_on_send(uv_req_t* req, int status) { struct qr_task *task = req->data; if (task) { - /* Failed to send, invalidate */ - if (status != 0) { - qr_task_step(task, NULL); - } - if (task->req.overlay.state == KNOT_STATE_NOOP) { - mp_delete(task->req.pool.ctx); + /* Start reading answer */ + if (task->req.overlay.state != KNOT_STATE_NOOP) { + if (status == 0 && task->next_handle) { + io_start_read(task->next_handle); + } + } else { + /* Finalize task */ + uv_close((uv_handle_t *)&task->timeout, qr_task_close); } } } @@ -141,14 +163,18 @@ static void qr_task_on_connect(uv_connect_t *connect, int status) static int qr_task_finalize(struct qr_task *task, int state) { kr_resolve_finish(&task->req, state); + uv_timer_stop(&task->timeout); qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer); return state == KNOT_STATE_DONE ? 0 : kr_error(EIO); } static int qr_task_step(struct qr_task *task, knot_pkt_t *packet) { + /* Cancel timeout if active */ + uv_timer_stop(&task->timeout); + task->next_handle = NULL; + /* Consume input and produce next query */ - assert(task); int sock_type = -1; struct sockaddr *addr = NULL; knot_pkt_t *next_query = task->next_query; @@ -164,27 +190,30 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet) /* Create connection for iterative query */ uv_handle_t *source_handle = task->source.handle; - uv_handle_t *next_handle = io_create(source_handle->loop, sock_type); - if (next_handle == NULL) { + task->next_handle = io_create(source_handle->loop, sock_type); + if (task->next_handle == NULL) { return qr_task_finalize(task, KNOT_STATE_FAIL); } /* Connect or issue query datagram */ - next_handle->data = task; + task->next_handle->data = task; if (sock_type == SOCK_STREAM) { uv_connect_t *connect = &task->ioreq.connect; - if (uv_tcp_connect(connect, (uv_tcp_t *)next_handle, addr, qr_task_on_connect) != 0) { - uv_close(next_handle, (uv_close_cb) free); + if (uv_tcp_connect(connect, (uv_tcp_t *)task->next_handle, addr, qr_task_on_connect) != 0) { + uv_close(task->next_handle, (uv_close_cb) free); return qr_task_step(task, NULL); } connect->data = task; } else { - if (qr_task_send(task, next_handle, addr, next_query) != 0) { - uv_close(next_handle, (uv_close_cb) free); + if (qr_task_send(task, task->next_handle, addr, next_query) != 0) { + uv_close(task->next_handle, (uv_close_cb) free); return qr_task_step(task, NULL); } } + /* Start next timeout */ + uv_timer_start(&task->timeout, qr_task_timeout, KR_CONN_RTT_MAX, 0); + return kr_ok(); }