From 3a9212523e293643df6503b3a610aa4d5f80066f Mon Sep 17 00:00:00 2001 From: Marek Vavrusa <marek.vavrusa@nic.cz> Date: Wed, 10 Jul 2013 18:56:28 +0200 Subject: [PATCH] Updated to the new fdset API and implemented precise pending xfer counter. refs #71, #65 --- src/common/fdset.c | 17 +- src/common/fdset.h | 1 + src/knot/main.c | 2 +- src/knot/server/server.c | 9 +- src/knot/server/server.h | 3 +- src/knot/server/socket.h | 4 +- src/knot/server/tcp-handler.c | 199 +++++++-------- src/knot/server/xfr-handler.c | 359 ++++++++++++++------------- src/knot/server/xfr-handler.h | 7 +- src/knot/server/zones.c | 24 +- src/knot/server/zones.h | 8 +- src/libknot/nameserver/name-server.h | 2 +- src/tests/common/fdset_tests.c | 59 ++--- 13 files changed, 341 insertions(+), 353 deletions(-) diff --git a/src/common/fdset.c b/src/common/fdset.c index 0604b6b08..00e57048a 100644 --- a/src/common/fdset.c +++ b/src/common/fdset.c @@ -17,6 +17,7 @@ #include <config.h> #include <stdlib.h> #include <string.h> +#include <time.h> #include "common/fdset.h" #include "common.h" @@ -50,12 +51,20 @@ static int fdset_resize(fdset_t *set, unsigned size) int fdset_init(fdset_t *set, unsigned size) { + if (set == NULL) { + return KNOT_EINVAL; + } + memset(set, 0, sizeof(fdset_t)); return fdset_resize(set, size); } int fdset_clear(fdset_t* set) { + if (set == NULL) { + return KNOT_EINVAL; + } + free(set->ctx); free(set->pfd); free(set->tmout); @@ -79,8 +88,10 @@ int fdset_add(fdset_t *set, int fd, unsigned events, void *ctx) set->pfd[i].events = events; set->pfd[i].revents = 0; set->ctx[i] = ctx; + set->tmout[i] = 0; - return KNOT_EOK; + /* Return index to this descriptor. */ + return i; } int fdset_remove(fdset_t *set, unsigned i) @@ -94,8 +105,8 @@ int fdset_remove(fdset_t *set, unsigned i) /* Nothing else if it is the last one. * Move last -> i if some remain. */ - if (set->n > 0) { - unsigned last = set->n; /* Already decremented */ + unsigned last = set->n; /* Already decremented */ + if (i < last) { set->pfd[i] = set->pfd[last]; set->tmout[i] = set->tmout[last]; set->ctx[i] = set->ctx[last]; diff --git a/src/common/fdset.h b/src/common/fdset.h index ed5d85813..d74765ad2 100644 --- a/src/common/fdset.h +++ b/src/common/fdset.h @@ -31,6 +31,7 @@ #include <stddef.h> #include <poll.h> #include <sys/time.h> +#include <signal.h> #define FDSET_INIT_SIZE 256 /* Resize step. */ diff --git a/src/knot/main.c b/src/knot/main.c index 685d5f247..610b21f2a 100644 --- a/src/knot/main.c +++ b/src/knot/main.c @@ -327,7 +327,7 @@ int main(int argc, char **argv) pthread_sigmask(SIG_BLOCK, &sa.sa_mask, NULL); /* Bind to control interface. */ - uint8_t buf[65535]; /*! \todo #2035 should be on heap */ + uint8_t buf[SOCKET_MTU_SZ]; size_t buflen = sizeof(buf); int remote = -1; if (conf()->ctl.iface != NULL) { diff --git a/src/knot/server/server.c b/src/knot/server/server.c index cef38012d..7244d3c01 100644 --- a/src/knot/server/server.c +++ b/src/knot/server/server.c @@ -629,18 +629,15 @@ int server_conf_hook(const struct conf_t *conf, void *data) return ret; } -ref_t *server_set_ifaces(server_t *s, fdset_t **fds, int *count, int type) +ref_t *server_set_ifaces(server_t *s, fdset_t *fds, int type) { iface_t *i = NULL; - *count = 0; rcu_read_lock(); - fdset_destroy(*fds); - *fds = fdset_new(); + fdset_clear(fds); if (s->ifaces) { WALK_LIST(i, s->ifaces->l) { - fdset_add(*fds, i->fd[type], OS_EV_READ); - *count += 1; + fdset_add(fds, i->fd[type], POLLIN, NULL); } } diff --git a/src/knot/server/server.h b/src/knot/server/server.h index 84a32e3f5..a766b0b14 100644 --- a/src/knot/server/server.h +++ b/src/knot/server/server.h @@ -236,11 +236,10 @@ int server_conf_hook(const struct conf_t *conf, void *data); * \brief Update fdsets from current interfaces list. * \param s Server. * \param fds Filedescriptor set. - * \param count Number of ifaces (will be set to N). * \param type I/O type (UDP/TCP). * \return new interface list */ -ref_t *server_set_ifaces(server_t *s, fdset_t **fds, int *count, int type); +ref_t *server_set_ifaces(server_t *s, fdset_t *fds, int type); #endif // _KNOTD_SERVER_H_ diff --git a/src/knot/server/socket.h b/src/knot/server/socket.h index 1849f1d9d..371977a3e 100644 --- a/src/knot/server/socket.h +++ b/src/knot/server/socket.h @@ -39,9 +39,7 @@ #include "common/sockaddr.h" /*! \brief Socket-related constants. */ -typedef enum { - SOCKET_MTU_SZ = 65535, /*!< Maximum MTU size. */ -} socket_const_t; +#define SOCKET_MTU_SZ 65535 /*!< Maximum MTU size. */ /*! * \brief Create socket. diff --git a/src/knot/server/tcp-handler.c b/src/knot/server/tcp-handler.c index 8d52fd347..58cc1bfeb 100644 --- a/src/knot/server/tcp-handler.c +++ b/src/knot/server/tcp-handler.c @@ -39,13 +39,10 @@ #include "libknot/nameserver/name-server.h" #include "libknot/util/wire.h" -/* Defines */ -#define TCP_BUFFER_SIZE 65535 /*! Do not change, as it is used for maximum DNS/TCP packet size. */ - /*! \brief TCP worker data. */ typedef struct tcp_worker_t { iohandler_t *ioh; /*!< Shortcut to I/O handler. */ - fdset_t *fdset; /*!< File descriptor set. */ + fdset_t set; /*!< File descriptor set. */ int pipe[2]; /*!< Master-worker signalization pipes. */ } tcp_worker_t; @@ -83,17 +80,19 @@ static int tcp_reply(int fd, uint8_t *qbuf, size_t resp_len) } /*! \brief Sweep TCP connection. */ -static void tcp_sweep(fdset_t *set, int fd, void* data) +static enum fdset_sweep_state tcp_sweep(fdset_t *set, int i, void *data) { UNUSED(data); + assert(set && i < set->n && i >= 0); + int fd = set->pfd[i].fd; char r_addr[SOCKADDR_STRLEN] = { '\0' }; int r_port = 0; struct sockaddr_storage addr; socklen_t len = sizeof(addr); if (getpeername(fd, (struct sockaddr*)&addr, &len) < 0) { dbg_net("tcp: sweep getpeername() on invalid socket=%d\n", fd); - return; + return FDSET_SWEEP; } /* Translate */ @@ -111,8 +110,8 @@ static void tcp_sweep(fdset_t *set, int fd, void* data) log_server_notice("Connection with '%s@%d' was terminated due to " "inactivity.\n", r_addr, r_port); - fdset_remove(set, fd); close(fd); + return FDSET_SWEEP; } /*! @@ -327,30 +326,28 @@ int tcp_accept(int fd) tcp_worker_t* tcp_worker_create() { tcp_worker_t *w = malloc(sizeof(tcp_worker_t)); - if (w == NULL) { - dbg_net("tcp: out of memory when creating worker\n"); - return NULL; - } + if (w == NULL) + goto cleanup; /* Create signal pipes. */ memset(w, 0, sizeof(tcp_worker_t)); - if (pipe(w->pipe) < 0) { - free(w); - return NULL; - } + if (pipe(w->pipe) < 0) + goto cleanup; /* Create fdset. */ - w->fdset = fdset_new(); - if (!w->fdset) { + if (fdset_init(&w->set, FDSET_INIT_SIZE) != KNOT_EOK) { close(w->pipe[0]); close(w->pipe[1]); - free(w); - return NULL; + goto cleanup; } - fdset_add(w->fdset, w->pipe[0], OS_EV_READ); - + fdset_add(&w->set, w->pipe[0], POLLIN, NULL); return w; + + /* Cleanup */ +cleanup: + free(w); + return NULL; } void tcp_worker_free(tcp_worker_t* w) @@ -359,8 +356,8 @@ void tcp_worker_free(tcp_worker_t* w) return; } - /* Destroy fdset. */ - fdset_destroy(w->fdset); + /* Clear fdset. */ + fdset_clear(&w->set); /* Close pipe write end and worker. */ close(w->pipe[0]); @@ -493,23 +490,22 @@ int tcp_loop_master(dthread_t *thread) tcp_worker_t **workers = h->data; /* Prepare structures for bound sockets. */ - fdset_it_t it; - fdset_t *fds = NULL; ref_t *ref = NULL; - int if_cnt = 0; + fdset_t set; + fdset_init(&set, conf()->ifaces_count); /* Accept connections. */ - int id = 0; - dbg_net("tcp: created 1 master with %d workers, backend is '%s' \n", - unit->size - 1, fdset_method()); + int id = 0, ret = 0; + dbg_net("tcp: created 1 master with %d workers\n", unit->size - 1); for(;;) { /* Check handler state. */ if (knot_unlikely(st->s & ServerReload)) { st->s &= ~ServerReload; ref_release(ref); - ref = server_set_ifaces(h->server, &fds, &if_cnt, IO_TCP); - if (if_cnt == 0) break; + ref = server_set_ifaces(h->server, &set, IO_TCP); + if (set.n == 0) /* Terminate on zero interfaces. */ + break; } /* Check for cancellation. */ @@ -518,35 +514,34 @@ int tcp_loop_master(dthread_t *thread) } /* Wait for events. */ - int nfds = fdset_wait(fds, OS_EV_FOREVER); + int nfds = poll(set.pfd, set.n, -1); if (nfds <= 0) { - if (errno == EINTR) continue; + if (errno == EINTR) + continue; break; } - fdset_begin(fds, &it); - while(nfds > 0) { - /* Accept client. */ - int client = tcp_accept(it.fd); - if (client > -1) { - /* Add to worker in RR fashion. */ - if (write(workers[id]->pipe[1], &client, sizeof(int)) < 0) { - dbg_net("tcp: failed to register fd=%d to worker=%d\n", - client, id); - close(client); - continue; - } - id = get_next_rr(id, unit->size - 1); - } + for (unsigned i = 0; nfds > 0 && i < set.n; ++i) { + /* Skip inactive. */ + if (!(set.pfd[i].revents & POLLIN)) + continue; - if (fdset_next(fds, &it) != 0) { - break; - } + /* Accept client. */ + --nfds; /* One less active event. */ + int client = tcp_accept(set.pfd[i].fd); + if (client < 0) + continue; + + /* Add to worker in RR fashion. */ + id = get_next_rr(id, unit->size - 1); + ret = write(workers[id]->pipe[1], &client, sizeof(int)); + if (ret < 0) + close(client); } } dbg_net("tcp: master thread finished\n"); - fdset_destroy(fds); + fdset_clear(&set); ref_release(ref); return KNOT_EOK; @@ -554,18 +549,6 @@ int tcp_loop_master(dthread_t *thread) int tcp_loop_worker(dthread_t *thread) { - tcp_worker_t *w = thread->data; - if (!w) { - return KNOT_EINVAL; - } - - /* Allocate buffer for requests. */ - uint8_t *qbuf = malloc(TCP_BUFFER_SIZE); - if (qbuf == NULL) { - dbg_net("tcp: failed to allocate buffers for TCP worker\n"); - return KNOT_EINVAL; - } - /* Drop all capabilities on workers. */ #ifdef HAVE_CAP_NG_H if (capng_have_capability(CAPNG_EFFECTIVE, CAP_SETPCAP)) { @@ -574,25 +557,29 @@ int tcp_loop_worker(dthread_t *thread) } #endif /* HAVE_CAP_NG_H */ - /* Next sweep time. */ + uint8_t *qbuf = malloc(SOCKET_MTU_SZ); + tcp_worker_t *w = thread->data; + if (w == NULL || qbuf == NULL) { + free(qbuf); + return KNOT_EINVAL; + } + + /* Accept clients. */ + dbg_net("tcp: worker %p started\n", w); + fdset_t *set = &w->set; timev_t next_sweep; time_now(&next_sweep); next_sweep.tv_sec += TCP_SWEEP_INTERVAL; - - /* Accept clients. */ - dbg_net_verb("tcp: worker %p started\n", w); for (;;) { /* Cancellation point. */ - if (dt_is_cancelled(thread)) { + if (dt_is_cancelled(thread)) break; - } /* Wait for events. */ - int nfds = fdset_wait(w->fdset, (TCP_SWEEP_INTERVAL * 1000)/2); - if (nfds < 0) { + int nfds = poll(set->pfd, set->n, TCP_SWEEP_INTERVAL * 1000); + if (nfds < 0) continue; - } /* Establish timeouts. */ rcu_read_lock(); @@ -601,57 +588,49 @@ int tcp_loop_worker(dthread_t *thread) rcu_read_unlock(); /* Process incoming events. */ - dbg_net_verb("tcp: worker %p registered %d events\n", - w, nfds); - fdset_it_t it; - fdset_begin(w->fdset, &it); - while(nfds > 0) { - - /* Handle incoming clients. */ - if (it.fd == w->pipe[0]) { - int client = 0; - if (read(it.fd, &client, sizeof(int)) < 0) { - continue; - } + unsigned i = 0; + while (nfds > 0 && i < set->n) { + + if (!(set->pfd[i].revents & set->pfd[i].events)) { + /* Skip inactive. */ + ++i; + continue; + } else { + /* One less active event. */ + --nfds; + } - dbg_net_verb("tcp: worker %p registered " - "client %d\n", - w, client); - fdset_add(w->fdset, client, OS_EV_READ); - fdset_set_watchdog(w->fdset, client, max_hs); - dbg_net("tcp: watchdog for fd=%d set to %ds\n", - client, max_hs); + int fd = set->pfd[i].fd; + if (fd == w->pipe[0]) { + /* Register incoming TCP connection. */ + int client, next_id; + if (read(fd, &client, sizeof(int)) == sizeof(int)) { + next_id = fdset_add(set, client, POLLIN, NULL); + fdset_set_tmout(set, next_id, max_hs); + } } else { - /* Handle other events. */ - int ret = tcp_handle(w, it.fd, qbuf, - TCP_BUFFER_SIZE); + /* Process query over TCP. */ + int ret = tcp_handle(w, fd, qbuf, SOCKET_MTU_SZ); if (ret == KNOT_EOK) { - fdset_set_watchdog(w->fdset, it.fd, - max_idle); - dbg_net("tcp: watchdog for fd=%d " - "set to %ds\n", - it.fd, max_idle); + /* Update socket activity timer. */ + fdset_set_tmout(set, i, max_idle); } - /*! \todo Refactor to allow erase on iterator.*/ if (ret == KNOT_ECONNREFUSED) { - fdset_remove(w->fdset, it.fd); - close(it.fd); - break; + fdset_remove(set, i); + close(fd); + continue; /* Stay on the same index. */ } - } - /* Check if next exists. */ - if (fdset_next(w->fdset, &it) != 0) { - break; - } + /* Next active. */ + ++i; } /* Sweep inactive. */ timev_t now; if (time_now(&now) == 0) { if (now.tv_sec >= next_sweep.tv_sec) { - fdset_sweep(w->fdset, &tcp_sweep, NULL); + fdset_sweep(set, &tcp_sweep, NULL); memcpy(&next_sweep, &now, sizeof(next_sweep)); next_sweep.tv_sec += TCP_SWEEP_INTERVAL; } @@ -660,7 +639,7 @@ int tcp_loop_worker(dthread_t *thread) /* Stop whole unit. */ free(qbuf); - dbg_net_verb("tcp: worker %p finished\n", w); + dbg_net("tcp: worker %p finished\n", w); return KNOT_EOK; } diff --git a/src/knot/server/xfr-handler.c b/src/knot/server/xfr-handler.c index b82358796..26f2f2c4f 100644 --- a/src/knot/server/xfr-handler.c +++ b/src/knot/server/xfr-handler.c @@ -44,9 +44,9 @@ #include "libknot/rrset.h" /* Constants */ -#define XFR_CHUNKLEN 3 /*! Number of requests assigned in a single pass. */ +#define XFR_MAX_TASKS 1024 /*! Maximum pending tasks. */ +#define XFR_CHUNKLEN 16 /*! Number of requests assigned in a single pass. */ #define XFR_SWEEP_INTERVAL 2 /*! [seconds] between sweeps. */ -#define XFR_BUFFER_SIZE 65535 /*! Do not change this - maximum value for UDP packet length. */ #define XFR_MSG_DLTTR 9 /*! Index of letter differentiating IXFR/AXFR in log msg. */ /* Messages */ @@ -70,6 +70,27 @@ static knot_lookup_table_t xfr_result_table[] = { { XFR_TYPE_AIN, NULL } }; +/* Limits. */ +static bool xfr_pending_incr(xfrhandler_t *xfr) +{ + bool ret = false; + pthread_mutex_lock(&xfr->pending_mx); + if (xfr->pending < conf()->xfers) { + ++xfr->pending; + ret = true; + } + pthread_mutex_unlock(&xfr->pending_mx); + + return ret; +} + +static void xfr_pending_decr(xfrhandler_t *xfr) +{ + pthread_mutex_lock(&xfr->pending_mx); + --xfr->pending; + pthread_mutex_unlock(&xfr->pending_mx); +} + /* I/O wrappers */ static int xfr_send_tcp(int fd, sockaddr_t *addr, uint8_t *msg, size_t msglen) @@ -84,33 +105,6 @@ static int xfr_recv_tcp(int fd, sockaddr_t *addr, uint8_t *buf, size_t buflen) static int xfr_recv_udp(int fd, sockaddr_t *addr, uint8_t *buf, size_t buflen) { return recvfrom(fd, buf, buflen, 0, (struct sockaddr *)addr, &addr->len); } -/* Context fetching. */ - -static knot_ns_xfr_t* xfr_task_get(xfrworker_t *w, int fd) -{ - value_t *val = ahtable_tryget(w->pool.t, (const char*)&fd, sizeof(int)); - if (!val) return NULL; - return *val; -} - -static void xfr_task_set(xfrworker_t *w, int fd, knot_ns_xfr_t *rq) -{ - int events = OS_EV_READ; - /* Asynchronous connect, watch for writability. */ - if (rq->flags & XFR_FLAG_CONNECTING) - events = OS_EV_WRITE; - - fdset_add(w->pool.fds, fd, events); - value_t *val = ahtable_get(w->pool.t, (const char*)&fd, sizeof(int)); - *val = rq; -} - -static void xfr_task_clear(xfrworker_t *w, int fd) -{ - ahtable_del(w->pool.t, (const char*)&fd, sizeof(int)); - fdset_remove(w->pool.fds, fd); -} - /*! \brief Wrapper function for answering AXFR/OUT. */ static int xfr_answer_axfr(knot_nameserver_t *ns, knot_ns_xfr_t *xfr) { @@ -340,27 +334,8 @@ static int xfr_task_close(knot_ns_xfr_t *rq) return KNOT_EOK; } -/*! \brief Close and free task. */ -static int xfr_task_remove(xfrworker_t *w, int fd) -{ - knot_ns_xfr_t *rq = xfr_task_get(w, fd); - if (!rq) - return KNOT_ENOENT; - - /* End the task properly. */ - int ret = xfr_task_close(rq); - - /* Remove from set and close. */ - xfr_task_clear(w, fd); - if (fd > 0) - socket_close(fd); - --w->pending; - - return ret; -} - /*! \brief Timeout handler. */ -static int xfr_task_expire(fdset_t *fds, knot_ns_xfr_t *rq) +static int xfr_task_expire(fdset_t *set, int i, knot_ns_xfr_t *rq) { /* Fetch related zone (refcounted, no RCU). */ knot_zone_t *zone = (knot_zone_t *)rq->zone; @@ -372,7 +347,7 @@ static int xfr_task_expire(fdset_t *fds, knot_ns_xfr_t *rq) case XFR_TYPE_NOTIFY: if ((long)--rq->data > 0) { /* Retries */ notify_create_request(contents, rq->wire, &rq->wire_size); - fdset_set_watchdog(fds, rq->session, NOTIFY_TIMEOUT); + fdset_set_tmout(set, i, NOTIFY_TIMEOUT); rq->send(rq->session, &rq->addr, rq->wire, rq->wire_size); log_zone_info("%s Query issued (serial %u).\n", rq->msg, knot_zone_serial(contents)); @@ -466,7 +441,26 @@ static int xfr_task_start(knot_ns_xfr_t *rq) return KNOT_EOK; } -static int xfr_task_process_begin(xfrworker_t *w, knot_ns_xfr_t* rq) +static int xfr_task_is_transfer(knot_ns_xfr_t *rq) +{ + return rq->type <= XFR_TYPE_IIN; +} + +static void xfr_async_setbuf(knot_ns_xfr_t* rq, uint8_t *buf, size_t buflen) +{ + /* Update request. */ + rq->wire = buf; + rq->wire_size = buflen; + rq->wire_maxlen = buflen; + rq->send = &xfr_send_udp; + rq->recv = &xfr_recv_udp; + if (rq->flags & XFR_FLAG_TCP) { + rq->send = &xfr_send_tcp; + rq->recv = &xfr_recv_tcp; + } +} + +static int xfr_async_start(fdset_t *set, knot_ns_xfr_t* rq) { /* Update XFR message prefix. */ int ret = KNOT_EOK; @@ -476,49 +470,52 @@ static int xfr_task_process_begin(xfrworker_t *w, knot_ns_xfr_t* rq) if (rq->session <= 0) ret = xfr_task_connect(rq); - /* Watch socket if valid. */ - if (ret == KNOT_EOK) - xfr_task_set(w, rq->session, rq); + /* Add to set. */ + if (ret == KNOT_EOK) { + unsigned flags = POLLIN; + if (rq->flags & XFR_FLAG_CONNECTING) + flags = POLLOUT; + int next_id = fdset_add(set, rq->session, flags, rq); + if (next_id >= 0) { + /* Set default connection timeout. */ + rcu_read_lock(); + fdset_set_tmout(set, next_id, conf()->max_conn_reply); + rcu_read_unlock(); + } else { + /* Or refuse if failed. */ + ret = KNOT_ECONNREFUSED; + } + } return ret; } -static int xfr_task_process(xfrworker_t *w, knot_ns_xfr_t* rq, uint8_t *buf, size_t buflen) +static int xfr_async_state(knot_ns_xfr_t* rq) { - /* Connected, drop the extra flags. */ - if (rq->flags & XFR_FLAG_CONNECTING) { - int err = EINVAL; - socklen_t len = sizeof(int); - if (getsockopt(rq->session, SOL_SOCKET, SO_ERROR, &err, &len) < 0) - ; - /* Connection failure. */ - if (err != 0) - return KNOT_ECONNREFUSED; + /* Check socket status. */ + int err = EINVAL; + socklen_t len = sizeof(int); + if (getsockopt(rq->session, SOL_SOCKET, SO_ERROR, &err, &len) < 0) + return KNOT_ERROR; + if (err != 0) + return knot_map_errno(err); + return KNOT_EOK; +} - /* Watch for incoming data from now on. */ +static int xfr_async_finish(fdset_t *set, unsigned id) +{ + /* Drop back to synchronous mode. */ + int ret = KNOT_EOK; + knot_ns_xfr_t *rq = (knot_ns_xfr_t *)set->ctx[id]; + if ((ret = xfr_async_state(rq)) == KNOT_EOK) { rq->flags &= ~XFR_FLAG_CONNECTING; - fdset_set_events(w->pool.fds, rq->session, OS_EV_READ); - - /* Drop asynchronous mode. */ - if (fcntl(rq->session, F_SETFL, 0) < 0) + set->pfd[id].events = POLLIN; + if (fcntl(set->pfd[id].fd, F_SETFL, 0) < 0) ; } - /* Update request. */ - rq->wire = buf; - rq->wire_size = buflen; - rq->wire_maxlen = buflen; - rq->send = &xfr_send_udp; - rq->recv = &xfr_recv_udp; - if (rq->flags & XFR_FLAG_TCP) { - rq->send = &xfr_send_tcp; - rq->recv = &xfr_recv_tcp; - } - - /* Check if not already processing, zone is refcounted. */ - zonedata_t *zd = (zonedata_t *)knot_zone_data(rq->zone); - /* Check if the zone is not discarded. */ + zonedata_t *zd = (zonedata_t *)knot_zone_data(rq->zone); if (!zd || knot_zone_flags(rq->zone) & KNOT_ZONE_DISCARDED) { dbg_xfr_verb("xfr: request on a discarded zone, ignoring\n"); return KNOT_EINVAL; @@ -526,7 +523,7 @@ static int xfr_task_process(xfrworker_t *w, knot_ns_xfr_t* rq, uint8_t *buf, siz /* Handle request. */ dbg_xfr("%s processing request type '%d'\n", rq->msg, rq->type); - int ret = xfr_task_start(rq); + ret = xfr_task_start(rq); const char *msg = knot_strerror(ret); knot_lookup_table_t *xd = knot_lookup_by_id(xfr_result_table, rq->type); if (xd && ret == KNOT_EOK) { @@ -544,12 +541,11 @@ static int xfr_task_process(xfrworker_t *w, knot_ns_xfr_t* rq, uint8_t *buf, siz } break; case XFR_TYPE_NOTIFY: /* Send on first timeout <0,5>s. */ - fdset_set_watchdog(w->pool.fds, rq->session, (int)(tls_rand() * 5)); - xfr_task_set(w, rq->session, rq); + fdset_set_tmout(set, id, (int)(tls_rand() * 5)); return KNOT_EOK; case XFR_TYPE_SOA: case XFR_TYPE_FORWARD: - fdset_set_watchdog(w->pool.fds, rq->session, conf()->max_conn_reply); + fdset_set_tmout(set, id, conf()->max_conn_reply); break; default: break; @@ -803,7 +799,7 @@ static int xfr_task_xfer(xfrworker_t *w, knot_ns_xfr_t *rq) } /* Update REFRESH/RETRY */ - zones_schedule_refresh(rq->zone, 0); + zones_schedule_refresh(rq->zone, REFRESH_DEFAULT); ret = KNOT_ECONNREFUSED; /* Disconnect */ } @@ -841,36 +837,36 @@ static int xfr_process_event(xfrworker_t *w, knot_ns_xfr_t *rq) } } -/*! \brief Sweep non-replied connection. */ -static void xfr_sweep(fdset_t *set, int fd, void *data) +/*! \brief Sweep inactive connection. */ +static enum fdset_sweep_state xfr_sweep(fdset_t *set, int i, void *data) { - dbg_xfr("xfr: sweeping fd=%d\n", fd); - if (!set || !data) { - dbg_xfr("xfr: invalid sweep operation on NULL worker or set\n"); - return; - } - xfrworker_t *w = (xfrworker_t *)data; - knot_ns_xfr_t *rq = xfr_task_get(w, fd); - if (!rq) { - dbg_xfr("xfr: NULL data to sweep\n"); - return; - } + assert(set && i < set->n && i >= 0); + + knot_ns_xfr_t *rq = set->ctx[i]; + xfrhandler_t *xfr = (xfrhandler_t *)data; - /* Skip non-sweepable types. */ + /* Expire only UDP requests. */ int ret = KNOT_ECONNREFUSED; switch(rq->type) { case XFR_TYPE_SOA: case XFR_TYPE_NOTIFY: case XFR_TYPE_FORWARD: - ret = xfr_task_expire(set, rq); + ret = xfr_task_expire(set, i, rq); break; default: break; } + /* Close if not valid anymore. */ if (ret != KNOT_EOK) { - xfr_task_remove(w, fd); + if (xfr_task_is_transfer(rq)) + xfr_pending_decr(xfr); + xfr_task_close(rq); + socket_close(set->pfd[i].fd); + return FDSET_SWEEP; } + + return FDSET_KEEP; } /*! \brief Check TSIG if exists. */ @@ -1008,7 +1004,7 @@ int xfr_worker(dthread_t *thread) xfrhandler_t *xfr = w->master; /* Buffer for answering. */ - size_t buflen = XFR_BUFFER_SIZE; + size_t buflen = SOCKET_MTU_SZ; uint8_t* buf = malloc(buflen); if (buf == NULL) { dbg_xfr("xfr: failed to allocate buffer for XFR worker\n"); @@ -1020,54 +1016,69 @@ int xfr_worker(dthread_t *thread) time_now(&next_sweep); next_sweep.tv_sec += XFR_SWEEP_INTERVAL; - int limit = XFR_CHUNKLEN * 3; - if (conf() && conf()->xfers > 0) { - limit = conf()->xfers; + /* Capacity limits. */ + rcu_read_lock(); + unsigned threads = w->master->unit->size; + unsigned thread_capacity = XFR_MAX_TASKS / threads; + rcu_read_unlock(); + + /* Set of connections. */ + fdset_t set; + int ret = fdset_init(&set, thread_capacity); + if (ret != KNOT_EOK) { + free(buf); + return ret; } - unsigned thread_capacity = limit / w->master->unit->size; - if (thread_capacity < 1) - thread_capacity = 1; - w->pool.fds = fdset_new(); - w->pool.t = ahtable_create(); - w->pending = 0; /* Accept requests. */ - int ret = 0; dbg_xfr_verb("xfr: worker=%p starting\n", w); for (;;) { /* Populate pool with new requests. */ - if (w->pending <= thread_capacity) { + unsigned newconns = 0; + for (;;) { + /* Do not exceed thread capacity. */ + if (set.n >= thread_capacity || newconns > XFR_CHUNKLEN) + break; + + /* Tak first request. */ pthread_mutex_lock(&xfr->mx); - unsigned was_pending = w->pending; - while (!EMPTY_LIST(xfr->queue)) { - knot_ns_xfr_t *rq = HEAD(xfr->queue); - rem_node(&rq->n); - - /* Start asynchronous connect. */ - ret = xfr_task_process_begin(w, rq); - if (ret == KNOT_EOK) - ++w->pending; - else - xfr_task_close(rq); - - /* Balance pending tasks among threads. */ - if (w->pending - was_pending > XFR_CHUNKLEN) - break; - /* Do not exceed thread capacity. */ - if (w->pending >= thread_capacity) - break; + if (EMPTY_LIST(xfr->queue)) { + pthread_mutex_unlock(&xfr->mx); + break; } + + + /* Limit number of transfers. */ + knot_ns_xfr_t *rq = HEAD(xfr->queue); + unsigned is_transfer = xfr_task_is_transfer(rq); + if (is_transfer && !xfr_pending_incr(xfr)) { + pthread_mutex_unlock(&xfr->mx); + break; + } + + rem_node(&rq->n); pthread_mutex_unlock(&xfr->mx); + + /* Start asynchronous connect. */ + xfr_async_setbuf(rq, buf, buflen); + if (xfr_async_start(&set, rq) != KNOT_EOK) { + if (is_transfer) + xfr_pending_decr(xfr); + xfr_task_close(rq); + break; + } + + ++newconns; } /* Check pending threads. */ - if (dt_is_cancelled(thread) || w->pending == 0) { + if (dt_is_cancelled(thread) || set.n == 0) { break; } /* Poll fdset. */ - int nfds = fdset_wait(w->pool.fds, (XFR_SWEEP_INTERVAL/2) * 1000); + int nfds = poll(set.pfd, set.n, XFR_SWEEP_INTERVAL * 1000); if (nfds < 0) { if (errno == EINTR) continue; @@ -1075,36 +1086,45 @@ int xfr_worker(dthread_t *thread) } /* Iterate fdset. */ - fdset_it_t it; - fdset_begin(w->pool.fds, &it); - while(nfds > 0) { - knot_ns_xfr_t *rq = xfr_task_get(w, it.fd); - if (rq) { - if (rq->flags & XFR_FLAG_CONNECTING) { - ret = xfr_task_process(w, rq, buf, buflen); - } else { - ret = xfr_process_event(w, rq); - } - - /* Check task state. */ - if (ret != KNOT_EOK) { - xfr_task_remove(w, it.fd); - --it.pos; /* Reset iterator */ - } + unsigned i = 0; + while (nfds > 0 && i < set.n && !dt_is_cancelled(thread)) { + + if (!(set.pfd[i].revents & set.pfd[i].events)) { + /* Skip inactive. */ + ++i; + continue; + } else { + /* One less active event. */ + --nfds; } - /* Check for cancellation or next active fd. */ - if (dt_is_cancelled(thread)) - break; - if (fdset_next(w->pool.fds, &it) < 0) - break; + /* Process pending tasks. */ + knot_ns_xfr_t *rq = (knot_ns_xfr_t *)set.ctx[i]; + if (rq->flags & XFR_FLAG_CONNECTING) { + ret = xfr_async_finish(&set, i); + } else { + ret = xfr_process_event(w, rq); + } + + /* Check task state. */ + if (ret != KNOT_EOK) { + if (xfr_task_is_transfer(rq)) + xfr_pending_decr(xfr); + xfr_task_close(rq); + socket_close(set.pfd[i].fd); + fdset_remove(&set, i); + continue; /* Stay on the same index. */ + } + + /* Next active. */ + ++i; } /* Sweep inactive. */ timev_t now; if (time_now(&now) == 0) { if (now.tv_sec >= next_sweep.tv_sec) { - fdset_sweep(w->pool.fds, &xfr_sweep, w); + fdset_sweep(&set, &xfr_sweep, xfr); memcpy(&next_sweep, &now, sizeof(next_sweep)); next_sweep.tv_sec += XFR_SWEEP_INTERVAL; } @@ -1112,24 +1132,17 @@ int xfr_worker(dthread_t *thread) } /* Cancel existing connections. */ - ahtable_iter_t i; - ahtable_iter_begin(w->pool.t, &i, false); - while (!ahtable_iter_finished(&i)) { - /* Free all tasks, do not retry cancelled. */ - knot_ns_xfr_t *rq = (knot_ns_xfr_t *)(*ahtable_iter_val(&i)); - if (rq->session > 0) - socket_close(rq->session); + for (unsigned i = 0; i < set.n; ++i) { + knot_ns_xfr_t *rq = (knot_ns_xfr_t *)set.ctx[i]; + socket_close(set.pfd[i].fd); + if (xfr_task_is_transfer(rq)) + xfr_pending_decr(xfr); xfr_task_free(rq); - ahtable_iter_next(&i); } - ahtable_iter_free(&i); - - /* Destroy data structures. */ - fdset_destroy(w->pool.fds); - ahtable_free(w->pool.t); - free(buf); dbg_xfr_verb("xfr: worker=%p finished.\n", w); + fdset_clear(&set); + free(buf); return KNOT_EOK; } @@ -1163,6 +1176,7 @@ xfrhandler_t *xfr_create(size_t thrcount, knot_nameserver_t *ns) /* Create tasks structure and mutex. */ pthread_mutex_init(&xfr->mx, 0); + pthread_mutex_init(&xfr->pending_mx, 0); init_list(&xfr->queue); /* Assign worker threads. */ @@ -1181,6 +1195,7 @@ int xfr_free(xfrhandler_t *xfr) } /* Free RR mutex. */ + pthread_mutex_destroy(&xfr->pending_mx); pthread_mutex_destroy(&xfr->mx); /* Free pending queue. */ diff --git a/src/knot/server/xfr-handler.h b/src/knot/server/xfr-handler.h index 204260541..f0b048757 100644 --- a/src/knot/server/xfr-handler.h +++ b/src/knot/server/xfr-handler.h @@ -48,11 +48,6 @@ enum xfrstate_t { */ typedef struct xfrworker_t { - struct { - ahtable_t *t; - fdset_t *fds; - } pool; - unsigned pending; struct xfrhandler_t *master; /*! \brief Worker master. */ } xfrworker_t; @@ -62,6 +57,8 @@ typedef struct xfrworker_t typedef struct xfrhandler_t { list queue; + unsigned pending; /*!< \brief Pending transfers. */ + pthread_mutex_t pending_mx; pthread_mutex_t mx; /*!< \brief Tasks synchronisation. */ knot_nameserver_t *ns; dt_unit_t *unit; /*!< \brief Threading unit. */ diff --git a/src/knot/server/zones.c b/src/knot/server/zones.c index d1a3bf694..17b0c7ceb 100644 --- a/src/knot/server/zones.c +++ b/src/knot/server/zones.c @@ -2495,7 +2495,7 @@ int zones_process_response(knot_nameserver_t *nameserver, /* No updates available. */ if (ret == 0) { - zones_schedule_refresh(zone, 0); + zones_schedule_refresh(zone, REFRESH_DEFAULT); rcu_read_unlock(); return KNOT_EUPTODATE; } @@ -2698,8 +2698,7 @@ int zones_ns_conf_hook(const struct conf_t *conf, void *data) /* REFRESH zones. */ for (unsigned i = 0; i < knot_zonedb_zone_count(ns->zone_db); ++i) { - /* Refresh new slave zones (almost) immediately. */ - zones_schedule_refresh(zones[i], tls_rand() * 500 + i/2); + zones_schedule_refresh(zones[i], 0); /* Now. */ zones_schedule_notify(zones[i]); } @@ -3177,7 +3176,7 @@ int zones_schedule_notify(knot_zone_t *zone) return KNOT_EOK; } -int zones_schedule_refresh(knot_zone_t *zone, unsigned time) +int zones_schedule_refresh(knot_zone_t *zone, int time) { if (!zone || !zone->data) { return KNOT_EINVAL; @@ -3206,18 +3205,17 @@ int zones_schedule_refresh(knot_zone_t *zone, unsigned time) if (zd->xfr_in.has_master) { /* Schedule REFRESH timer. */ - uint32_t refresh_tmr = time; - if (refresh_tmr == 0) { - if (knot_zone_contents(zone)) { - refresh_tmr = zones_jitter(zones_soa_refresh(zone)); - } else { - refresh_tmr = zd->xfr_in.bootstrap_retry; - } + if (time < 0) { + if (knot_zone_contents(zone)) + time = zones_jitter(zones_soa_refresh(zone)); + else + time = zd->xfr_in.bootstrap_retry; } + zd->xfr_in.timer = evsched_schedule_cb(sch, zones_refresh_ev, - zone, refresh_tmr); + zone, time); dbg_zones("zone: REFRESH '%s' set to %u\n", - zd->conf->name, refresh_tmr); + zd->conf->name, time); zd->xfr_in.state = XFR_SCHED; } rcu_read_unlock(); diff --git a/src/knot/server/zones.h b/src/knot/server/zones.h index 5045d50bd..64086c423 100644 --- a/src/knot/server/zones.h +++ b/src/knot/server/zones.h @@ -46,6 +46,10 @@ #define IXFR_DBSYNC_TIMEOUT (60*1000) /*!< Database sync timeout = 60s. */ #define AXFR_BOOTSTRAP_RETRY (30*1000) /*!< Interval between AXFR BS retries. */ +enum { + REFRESH_DEFAULT = -1 /* Use time value from zone structure. */ +}; + /*! * \brief Zone-related data. */ @@ -302,13 +306,13 @@ int zones_store_and_apply_chgsets(knot_changesets_t *chs, * REFRESH/RETRY/EXPIRE timers are updated according to SOA. * * \param zone Related zone. - * \param time Specific time or 0 for default. + * \param time Specific time or REFRESH_DEFAULT for default. * * \retval KNOT_EOK * \retval KNOT_EINVAL * \retval KNOT_ERROR */ -int zones_schedule_refresh(knot_zone_t *zone, unsigned time); +int zones_schedule_refresh(knot_zone_t *zone, int time); /*! * \brief Schedule NOTIFY after zone update. diff --git a/src/libknot/nameserver/name-server.h b/src/libknot/nameserver/name-server.h index 5f058acb9..6574539bb 100644 --- a/src/libknot/nameserver/name-server.h +++ b/src/libknot/nameserver/name-server.h @@ -174,8 +174,8 @@ typedef enum knot_ns_transport { typedef enum knot_ns_xfr_type_t { /* DNS events. */ XFR_TYPE_AIN = 0, /*!< AXFR-IN request (start transfer). */ - XFR_TYPE_AOUT, /*!< AXFR-OUT request (incoming transfer). */ XFR_TYPE_IIN, /*!< IXFR-IN request (start transfer). */ + XFR_TYPE_AOUT, /*!< AXFR-OUT request (incoming transfer). */ XFR_TYPE_IOUT, /*!< IXFR-OUT request (incoming transfer). */ XFR_TYPE_SOA, /*!< Pending SOA request. */ XFR_TYPE_NOTIFY, /*!< Pending NOTIFY query. */ diff --git a/src/tests/common/fdset_tests.c b/src/tests/common/fdset_tests.c index d7b29fa52..e8b9d73ee 100644 --- a/src/tests/common/fdset_tests.c +++ b/src/tests/common/fdset_tests.c @@ -98,22 +98,21 @@ static int fdset_tests_count(int argc, char *argv[]) static int fdset_tests_run(int argc, char *argv[]) { - diag("fdset: implements '%s'", fdset_method()); - /* 1. Create fdset. */ - fdset_t *set = fdset_new(); - ok(set != 0, "fdset: new"); + fdset_t set; + int ret = fdset_init(&set, 32); + ok(ret == 0, "fdset: init"); /* 2. Create pipe. */ int fds[2], tmpfds[2]; - int ret = pipe(fds); + ret = pipe(fds); ok(ret >= 0, "fdset: pipe() works"); ret = pipe(tmpfds); /* 3. Add fd to set. */ - ret = fdset_add(set, fds[0], OS_EV_READ); + ret = fdset_add(&set, fds[0], POLLIN, NULL); ok(ret == 0, "fdset: add to set works"); - fdset_add(set, tmpfds[0], OS_EV_READ); + fdset_add(&set, tmpfds[0], POLLIN, NULL); /* Schedule write. */ struct timeval ts, te; @@ -122,54 +121,44 @@ static int fdset_tests_run(int argc, char *argv[]) pthread_create(&t, 0, thr_action, &fds[1]); /* 4. Watch fdset. */ - ret = fdset_wait(set, OS_EV_FOREVER); + int nfds = poll(set.pfd, set.n, 2000); gettimeofday(&te, 0); size_t diff = timeval_diff(&ts, &te); - ok(ret > 0 && diff > 99 && diff < 10000, - "fdset: poll returned events in %zu ms", diff); + ok(nfds > 0 && diff > 99 && diff < 10000, + "fdset: poll returned %d events in %zu ms", nfds, diff); /* 5. Prepare event set. */ - fdset_it_t it; - ret = fdset_begin(set, &it); - ok(ret == 0 && it.fd == fds[0], "fdset: begin is valid, ret=%d", ret); + ok(set.pfd[0].revents & POLLIN, "fdset: pipe is active"); /* 6. Receive data. */ char buf = 0x00; - ret = read(it.fd, &buf, WRITE_PATTERN_LEN); - ok(ret >= 0 && buf == WRITE_PATTERN, "fdset: contains valid data, fd=%d", it.fd); - - /* 7. Iterate event set. */ - ret = fdset_next(set, &it); - ok(ret < 0, "fdset: boundary check works"); + ret = read(set.pfd[0].fd, &buf, WRITE_PATTERN_LEN); + ok(ret >= 0 && buf == WRITE_PATTERN, "fdset: contains valid data"); - /* 8. Remove from event set. */ - ret = fdset_remove(set, fds[0]); + /* 7-9. Remove from event set. */ + ret = fdset_remove(&set, 0); ok(ret == 0, "fdset: remove from fdset works"); close(fds[0]); close(fds[1]); - ret = fdset_remove(set, tmpfds[0]); + ret = fdset_remove(&set, 0); close(tmpfds[1]); close(tmpfds[1]); - - /* 9. Poll empty fdset. */ - ret = fdset_wait(set, OS_EV_FOREVER); - ok(ret <= 0, "fdset: polling empty fdset returns -1 (ret=%d)", ret); + ok(ret == 0, "fdset: remove from fdset works (2)"); + ret = fdset_remove(&set, 0); + ok(ret != 0, "fdset: removing nonexistent item"); /* 10. Crash test. */ lives_ok({ - fdset_destroy(0); - fdset_add(0, -1, 0); - fdset_remove(0, -1); - fdset_wait(0, OS_EV_NOWAIT); - fdset_begin(0, 0); - fdset_end(0, 0); - fdset_next(0, 0); - fdset_method(); + fdset_init(0, 0); + fdset_add(0, 1, 1, 0); + fdset_add(0, 0, 1, 0); + fdset_remove(0, 1); + fdset_remove(0, 0); }, "fdset: crash test successful"); /* 11. Destroy fdset. */ - ret = fdset_destroy(set); + ret = fdset_clear(&set); ok(ret == 0, "fdset: destroyed"); /* Cleanup. */ -- GitLab