Skip to content
Snippets Groups Projects
Commit aa95fde8 authored by Marek Vavrusa's avatar Marek Vavrusa
Browse files

Implemented watchdog timers for TCP connections.

TODO:
* watchdog timers for XFR connections
* prevent sweeping connections transferred to XFR
  such a connection should be removed from TCP fdset and enqueued in XFR fdset

refs #1540
parent b623c798
No related branches found
No related tags found
No related merge requests found
......@@ -66,6 +66,33 @@ static int xfr_send_cb(int session, sockaddr_t *addr, uint8_t *msg, size_t msgle
return tcp_send(session, msg, msglen);
}
/*! \brief Sweep TCP connection. */
static void tcp_sweep(fdset_t *set, int fd) {
char r_addr[SOCKADDR_STRLEN] = { '\0' };
int r_port = 0;
struct sockaddr_storage addr;
socklen_t len = sizeof(addr);
getpeername(fd, (struct sockaddr*)&addr, &len);
/* Translate */
if (addr.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&addr;
r_port = ntohs(s->sin_port);
inet_ntop(AF_INET, &s->sin_addr, r_addr, sizeof(r_addr));
} else {
#ifndef DISABLE_IPV6
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
r_port = ntohs(s->sin6_port);
inet_ntop(AF_INET6, &s->sin6_addr, r_addr, sizeof(r_addr));
#endif
}
log_server_notice("Connection with %s:%d was terminated due to "
"inactivity.\n", r_addr, r_port);
fdset_remove(set, fd);
close(fd);
}
/*!
* \brief TCP event handler function.
*
......@@ -74,11 +101,11 @@ static int xfr_send_cb(int session, sockaddr_t *addr, uint8_t *msg, size_t msgle
* \param w Associated I/O event.
* \param revents Returned events.
*/
static void tcp_handle(tcp_worker_t *w, int fd, uint8_t *qbuf, size_t qbuf_maxlen)
static int tcp_handle(tcp_worker_t *w, int fd, uint8_t *qbuf, size_t qbuf_maxlen)
{
if (fd < 0 || !w || !w->ioh) {
dbg_net("tcp: tcp_handle(%p, %d) - invalid parameters\n", w, fd);
return;
return KNOTD_EINVAL;
}
dbg_net("tcp: handling TCP event on fd=%d in thread %p.\n",
......@@ -93,7 +120,7 @@ static void tcp_handle(tcp_worker_t *w, int fd, uint8_t *qbuf, size_t qbuf_maxle
log_server_error("Socket type %d is not supported, "
"IPv6 support is probably disabled.\n",
w->ioh->type);
return;
return KNOTD_ECONNREFUSED;
}
/* Receive data. */
......@@ -102,7 +129,7 @@ static void tcp_handle(tcp_worker_t *w, int fd, uint8_t *qbuf, size_t qbuf_maxle
dbg_net("tcp: client on fd=%d disconnected\n", fd);
fdset_remove(w->fdset, fd);
close(fd);
return;
return KNOTD_ECONNREFUSED;
}
/* Parse query. */
......@@ -118,7 +145,7 @@ static void tcp_handle(tcp_worker_t *w, int fd, uint8_t *qbuf, size_t qbuf_maxle
uint16_t pkt_id = knot_wire_get_id(qbuf);
knot_ns_error_response(ns, pkt_id, KNOT_RCODE_SERVFAIL,
qbuf, &resp_len);
return;
return KNOTD_EOK;
}
int res = knot_ns_parse_packet(qbuf, n, packet, &qtype);
......@@ -133,7 +160,7 @@ static void tcp_handle(tcp_worker_t *w, int fd, uint8_t *qbuf, size_t qbuf_maxle
// knot_response_free(&resp);
knot_packet_free(&packet);
return;
return KNOTD_EOK;
}
/* Handle query. */
......@@ -160,7 +187,7 @@ static void tcp_handle(tcp_worker_t *w, int fd, uint8_t *qbuf, size_t qbuf_maxle
memcpy(&xfr.addr, &addr, sizeof(sockaddr_t));
xfr_request(xfr_h, &xfr);
dbg_net("tcp: enqueued IXFR query on fd=%d\n", fd);
return;
return KNOTD_EOK;
case KNOT_QUERY_AXFR:
res = xfr_request_init(&xfr, XFR_TYPE_AOUT, XFR_FLAG_TCP, packet);
if (res != KNOTD_EOK) {
......@@ -175,7 +202,7 @@ static void tcp_handle(tcp_worker_t *w, int fd, uint8_t *qbuf, size_t qbuf_maxle
memcpy(&xfr.addr, &addr, sizeof(sockaddr_t));
xfr_request(xfr_h, &xfr);
dbg_net("tcp: enqueued AXFR query on fd=%d\n", fd);
return;
return KNOTD_EOK;
/*! \todo Implement query notify/update. */
case KNOT_QUERY_UPDATE:
......@@ -228,7 +255,7 @@ static void tcp_handle(tcp_worker_t *w, int fd, uint8_t *qbuf, size_t qbuf_maxle
qtype, fd, knotd_strerror(res));;
}
return;
return KNOTD_EOK;
}
static int tcp_accept(int fd)
......@@ -239,6 +266,7 @@ static int tcp_accept(int fd)
/* Evaluate connection. */
if (incoming < 0) {
int en = errno;
/*! \todo Better solution so it doesn't block current connections. */
if (en != EINTR) {
log_server_error("Cannot accept connection "
"(%d).\n", errno);
......@@ -438,6 +466,11 @@ int tcp_loop_worker(dthread_t *thread)
dbg_net("tcp: failed to allocate buffers for TCP worker\n");
return KNOTD_EINVAL;
}
/* Next sweep time. */
struct timespec next_sweep;
clock_gettime(CLOCK_MONOTONIC, &next_sweep);
next_sweep.tv_sec += TCP_SWEEP_INTERVAL;
/* Accept clients. */
dbg_net_verb("tcp: worker %p started\n", w);
......@@ -449,8 +482,8 @@ int tcp_loop_worker(dthread_t *thread)
}
/* Wait for events. */
int nfds = fdset_wait(w->fdset, OS_EV_FOREVER);
if (nfds <= 0) {
int nfds = fdset_wait(w->fdset, (TCP_SWEEP_INTERVAL * 1000)/2);
if (nfds < 0) {
continue;
}
......@@ -459,7 +492,7 @@ int tcp_loop_worker(dthread_t *thread)
w, nfds);
fdset_it_t it;
fdset_begin(w->fdset, &it);
while(1) {
while(nfds > 0) {
/* Handle incoming clients. */
if (it.fd == w->pipe[0]) {
......@@ -472,9 +505,22 @@ int tcp_loop_worker(dthread_t *thread)
"client %d\n",
w, client);
fdset_add(w->fdset, client, OS_EV_READ);
fdset_set_watchdog(w->fdset, client,
TCP_HANDSHAKE_WD);
dbg_net("tcp: watchdog for fd=%d set to %ds\n",
client, TCP_HANDSHAKE_WD);
} else {
/* Handle other events. */
tcp_handle(w, it.fd, qbuf, TCP_BUFFER_SIZE);
int ret = tcp_handle(w, it.fd, qbuf,
TCP_BUFFER_SIZE);
if (ret == KNOTD_EOK) {
fdset_set_watchdog(w->fdset, it.fd,
TCP_ACTIVITY_WD);
dbg_net("tcp: watchdog for fd=%d "
"set to %ds\n",
it.fd, TCP_ACTIVITY_WD);
}
}
/* Check if next exists. */
......@@ -483,6 +529,15 @@ int tcp_loop_worker(dthread_t *thread)
}
}
/* Sweep inactive. */
struct timespec now;
if (clock_gettime(CLOCK_MONOTONIC, &now) == 0) {
if (now.tv_sec >= next_sweep.tv_sec) {
fdset_sweep(w->fdset, &tcp_sweep);
memcpy(&next_sweep, &now, sizeof(next_sweep));
next_sweep.tv_sec += TCP_SWEEP_INTERVAL;
}
}
}
/* Stop whole unit. */
......
......@@ -39,6 +39,11 @@
#include "knot/server/server.h"
#include "knot/server/dthreads.h"
/* Constants */
#define TCP_HANDSHAKE_WD 10 /* [secs] for connection to make a request.*/
#define TCP_ACTIVITY_WD 60 /* [secs] of allowed inactivity between requests */
#define TCP_SWEEP_INTERVAL 2 /* [secs] granularity of connection sweeping */
/*!
* \brief Send TCP message.
*
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment