Skip to content
Snippets Groups Projects
Commit a31a439c authored by Lubos Slovak's avatar Lubos Slovak
Browse files

Merge remote branch 'origin/axfr-in' into debug

Conflicts:
	src/knot/server/tcp-handler.c
parents ffec918d 583a2241
No related branches found
No related tags found
No related merge requests found
Showing
with 1411 additions and 777 deletions
......@@ -107,6 +107,12 @@ src/common/general-tree.h
src/common/general-tree.c
src/common/WELL1024a.c
src/common/WELL1024a.h
src/common/fdset.h
src/common/fdset.c
src/common/fdset_poll.h
src/common/fdset_poll.c
src/common/fdset_epoll.h
src/common/fdset_epoll.c
src/zcompile/parser-descriptor.h
src/zcompile/parser-descriptor.c
src/zcompile/parser-util.h
......@@ -178,6 +184,8 @@ src/tests/common/skiplist_tests.c
src/tests/common/skiplist_tests.h
src/tests/common/slab_tests.c
src/tests/common/slab_tests.h
src/tests/common/fdset_tests.c
src/tests/common/fdset_tests.h
src/tests/knot/dthreads_tests.c
src/tests/knot/dthreads_tests.h
src/tests/knot/conf_tests.c
......
......@@ -93,7 +93,7 @@ AC_TYPE_UINT8_T
# Checks for library functions.
AC_FUNC_FORK
AC_FUNC_MMAP
AC_CHECK_FUNCS([gethostbyname gettimeofday memmove memset munmap regcomp select socket sqrt strcasecmp strchr strdup strerror strncasecmp strtol strtoul])
AC_CHECK_FUNCS([gethostbyname gettimeofday memmove memset munmap regcomp select socket sqrt strcasecmp strchr strdup strerror strncasecmp strtol strtoul poll epoll_wait kqueue])
AC_CONFIG_FILES([Makefile
libknot/Makefile
......
......@@ -810,7 +810,7 @@ void ck_destroy_table(ck_hash_table_t **table, void (*dtor_value)(void *value),
// disconnect the item
(*table)->stash = item->next;
/*! \todo Investigate this. */
// assert(item->item != NULL);
assert(item->item != NULL);
if (dtor_value) {
dtor_value(item->item->value);
......
......@@ -3053,6 +3053,7 @@ int knot_ns_switch_zone(knot_nameserver_t *nameserver,
free(name);
} else {
xfr->zone = z;
zone->zone = z;
}
knot_zone_contents_t *old = rcu_xchg_pointer(&z->contents, zone);
......
......@@ -84,6 +84,7 @@ typedef struct knot_ns_xfr {
size_t wire_size;
void *data;
knot_zone_t *zone;
void *owner;
} knot_ns_xfr_t;
/*----------------------------------------------------------------------------*/
......
......@@ -58,6 +58,8 @@ unittests_SOURCES = \
tests/common/skiplist_tests.h \
tests/common/slab_tests.c \
tests/common/slab_tests.h \
tests/common/fdset_tests.c \
tests/common/fdset_tests.h \
tests/knot/conf_tests.c \
tests/knot/conf_tests.h \
tests/knot/dthreads_tests.c \
......@@ -175,8 +177,14 @@ libknots_la_SOURCES = \
common/ref.c \
common/errors.h \
common/errors.c \
common/WELL1024a.h \
common/WELL1024a.c
common/WELL1024a.h \
common/WELL1024a.c \
common/fdset.h \
common/fdset.c \
common/fdset_poll.h \
common/fdset_poll.c \
common/fdset_epoll.h \
common/fdset_epoll.c
libknotd_la_SOURCES = \
knot/stat/gatherer.c \
......
......@@ -33,12 +33,11 @@ static int acl_compare(void *k1, void *k2)
return 0;
}
/*! \todo Port matching disabled. */
/* Compare ports on address match. */
// ldiff = ntohs(a1->addr4.sin_port) - ntohs(a2->addr4.sin_port);
// if (ldiff != 0) {
// return ldiff < 0 ? -1 : 1;
// }
ldiff = ntohs(a1->addr4.sin_port) - ntohs(a2->addr4.sin_port);
if (ldiff != 0) {
return ldiff < 0 ? -1 : 1;
}
return 0;
}
......@@ -66,12 +65,11 @@ static int acl_compare(void *k1, void *k2)
return 0;
}
/*! \todo Port matching disabled. */
/* Compare ports on address match. */
// ldiff = ntohs(a1->addr6.sin6_port) - ntohs(a2->addr6.sin6_port);
// if (ldiff != 0) {
// return ldiff < 0 ? -1 : 1;
// }
ldiff = ntohs(a1->addr6.sin6_port) - ntohs(a2->addr6.sin6_port);
if (ldiff != 0) {
return ldiff < 0 ? -1 : 1;
}
return 0;
}
#endif
......
#include <sys/time.h>
#include <stdlib.h>
#include <stdio.h>
#include "common/evsched.h"
......@@ -128,9 +129,9 @@ event_t* evsched_next(evsched_t *s)
/* Immediately return. */
if (timercmp(&dt, &next_ev->tv, >=)) {
rem_node(&next_ev->n);
pthread_mutex_unlock(&s->mx);
pthread_mutex_lock(&s->rl);
s->current = next_ev;
pthread_mutex_unlock(&s->mx);
return next_ev;
}
......@@ -182,12 +183,14 @@ int evsched_schedule(evsched_t *s, event_t *ev, uint32_t dt)
/* Schedule event. */
node *n = 0, *prev = 0;
WALK_LIST(n, s->calendar) {
event_t* cur = (event_t *)n;
if (timercmp(&cur->tv, &ev->tv, <)) {
prev = n;
} else {
break;
if (!EMPTY_LIST(s->calendar)) {
WALK_LIST(n, s->calendar) {
event_t* cur = (event_t *)n;
if (timercmp(&cur->tv, &ev->tv, <)) {
prev = n;
} else {
break;
}
}
}
......@@ -263,8 +266,20 @@ int evsched_cancel(evsched_t *s, event_t *ev)
/* Make sure not running. */
pthread_mutex_lock(&s->rl);
/* Find in list. */
event_t *n = 0;
int found = 0;
WALK_LIST(n, s->calendar) {
if (n == ev) {
found = 1;
break;
}
}
/* Remove from list. */
rem_node(&ev->n);
if (found) {
rem_node(&ev->n);
}
/* Enable running events. */
pthread_mutex_unlock(&s->rl);
......
#ifndef _GNU_SOURCE
#define _GNU_SOURCE /* Required for RTLD_DEFAULT. */
#endif
#include <dlfcn.h>
#include <string.h>
#include <stdio.h>
#include "common/fdset.h"
#include "config.h"
struct fdset_backend_t _fdset_backend = {
};
/*! \brief Set backend implementation. */
static void fdset_set_backend(struct fdset_backend_t *backend) {
_fdset_backend.fdset_new = backend->fdset_new;
_fdset_backend.fdset_destroy = backend->fdset_destroy;
_fdset_backend.fdset_add = backend->fdset_add;
_fdset_backend.fdset_remove = backend->fdset_remove;
_fdset_backend.fdset_wait = backend->fdset_wait;
_fdset_backend.fdset_begin = backend->fdset_begin;
_fdset_backend.fdset_end = backend->fdset_end;
_fdset_backend.fdset_next = backend->fdset_next;
_fdset_backend.fdset_method = backend->fdset_method;
}
/* Linux epoll API. */
#ifdef HAVE_EPOLL_WAIT
// /*! \todo Implement correctly. */
// #include "common/fdset_epoll.c"
#endif /* HAVE_EPOLL_WAIT */
/* BSD kqueue API */
#ifdef HAVE_KQUEUE
#warning "fixme: missing kqueue backend"
//#include "common/fdset_kqueue.h"
#endif /* HAVE_KQUEUE */
/* POSIX poll API */
#ifdef HAVE_POLL
#include "common/fdset_poll.c"
#endif /* HAVE_POLL */
/*! \brief Bootstrap polling subsystem (it is called automatically). */
void __attribute__ ((constructor)) fdset_init()
{
/* Preference: epoll */
#ifdef HAVE_EPOLL_WAIT
// if (dlsym(RTLD_DEFAULT, "epoll_wait") != 0) {
// fdset_set_backend(&_fdset_epoll);
// return;
// }
#endif
/* Preference: kqueue */
#ifdef HAVE_KQUEUE
// if (dlsym(RTLD_DEFAULT, "kqueue") != 0) {
// fdset_set_backend(&_fdset_kqueue);
// return;
// }
#endif
/* Preference: poll */
#ifdef HAVE_POLL
if (dlsym(RTLD_DEFAULT, "poll") != 0) {
fdset_set_backend(&_fdset_poll);
return;
}
#endif
/* This shouldn't happen. */
fprintf(stderr, "fdset: fatal error - no valid fdset backend found\n");
return;
}
/*!
* \file fdset.h
*
* \author Marek Vavrusa <marek.vavrusa@nic.cz>
*
* \brief Wrapper for native I/O multiplexing.
*
* Selects best implementation according to config.
* - select()
* - poll() \todo
* - epoll()
* - kqueue()
*
* \addtogroup common_lib
* @{
*/
#ifndef _KNOTD_FDSET_H_
#define _KNOTD_FDSET_H_
#include <stddef.h>
/*! \brief Opaque pointer to implementation-specific fdset data. */
typedef struct fdset_t fdset_t;
/*! \brief Unified event types. */
enum fdset_event_t {
OS_EV_READ = 1 << 0, /*!< Readable event. */
OS_EV_WRITE = 1 << 1, /*!< Writeable event. */
OS_EV_ERROR = 1 << 2 /*!< Error event. */
};
/*! \brief File descriptor set iterator. */
typedef struct fdset_it_t {
int fd; /*!< Current file descriptor. */
int events; /*!< Returned events. */
size_t pos; /* Internal usage. */
} fdset_it_t;
/*!
* \brief File descriptor set implementation backend.
* \notice Functions documentation following.
* \internal
*/
struct fdset_backend_t
{
fdset_t* (*fdset_new)();
int (*fdset_destroy)(fdset_t*);
int (*fdset_add)(fdset_t*, int, int);
int (*fdset_remove)(fdset_t*, int);
int (*fdset_wait)(fdset_t*);
int (*fdset_begin)(fdset_t*, fdset_it_t*);
int (*fdset_end)(fdset_t*, fdset_it_t*);
int (*fdset_next)(fdset_t*, fdset_it_t*);
const char* (*fdset_method)();
};
/*!
* \brief Selected backend.
* \internal
*/
extern struct fdset_backend_t _fdset_backend;
/*!
* \brief Create new fdset.
*
* FDSET implementation depends on system.
*
* \retval Pointer to initialized FDSET structure if successful.
* \retval NULL on error.
*/
static inline fdset_t *fdset_new() {
return _fdset_backend.fdset_new();
}
/*!
* \brief Destroy FDSET.
*
* \retval 0 if successful.
* \retval -1 on error.
*/
static inline int fdset_destroy(fdset_t * fdset) {
return _fdset_backend.fdset_destroy(fdset);
}
/*!
* \brief Add file descriptor to watched set.
*
* \param fdset Target set.
* \param fd Added file descriptor.
* \param events Mask of watched events.
*
* \retval 0 if successful.
* \retval -1 on errors.
*/
static inline int fdset_add(fdset_t *fdset, int fd, int events) {
return _fdset_backend.fdset_add(fdset, fd, events);
}
/*!
* \brief Remove file descriptor from watched set.
*
* \param fdset Target set.
* \param fd File descriptor to be removed.
*
* \retval 0 if successful.
* \retval -1 on errors.
*/
static inline int fdset_remove(fdset_t *fdset, int fd) {
return _fdset_backend.fdset_remove(fdset, fd);
}
/*!
* \brief Poll set for new events.
*
* \param fdset Target set.
*
* \retval Number of events if successful.
* \retval -1 on errors.
*
* \todo Timeout.
*/
static inline int fdset_wait(fdset_t *fdset) {
return _fdset_backend.fdset_wait(fdset);
}
/*!
* \brief Set event iterator to the beginning of last polled events.
*
* \param fdset Target set.
* \param it Event iterator.
*
* \retval 0 if successful.
* \retval -1 on errors.
*/
static inline int fdset_begin(fdset_t *fdset, fdset_it_t *it) {
return _fdset_backend.fdset_begin(fdset, it);
}
/*!
* \brief Set event iterator to the end of last polled events.
*
* \param fdset Target set.
* \param it Event iterator.
*
* \retval 0 if successful.
* \retval -1 on errors.
*/
static inline int fdset_end(fdset_t *fdset, fdset_it_t *it) {
return _fdset_backend.fdset_end(fdset, it);
}
/*!
* \brief Set event iterator to the next event.
*
* Event iterator fd will be set to -1 if next event doesn't exist.
*
* \param fdset Target set.
* \param it Event iterator.
*
* \retval 0 if successful.
* \retval -1 on errors.
*/
static inline int fdset_next(fdset_t *fdset, fdset_it_t *it) {
return _fdset_backend.fdset_next(fdset, it);
}
/*!
* \brief Returned name of underlying poll method.
*
* \retval Name if successful.
* \retval NULL if no method was loaded (shouldn't happen).
*/
static inline const char* fdset_method() {
return _fdset_backend.fdset_method();
}
#endif /* _KNOTD_FDSET_H_ */
/*! @} */
#ifdef HAVE_EPOLL_WAIT
#include <sys/epoll.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include "fdset.h"
#define OS_FDS_CHUNKSIZE 8 /*!< Number of pollfd structs in a chunk. */
#define OS_FDS_KEEPCHUNKS 32 /*!< Will attempt to free memory when reached. */
struct fdset_t {
int epfd;
struct epoll_event *events;
size_t nfds;
size_t reserved;
size_t polled;
};
fdset_t *fdset_epoll_new()
{
fdset_t *set = malloc(sizeof(fdset_t));
if (!set) {
return 0;
}
/* Blank memory. */
memset(set, 0, sizeof(fdset_t));
/* Create epoll fd. */
set->epfd = epoll_create(OS_FDS_CHUNKSIZE);
return set;
}
int fdset_epoll_destroy(fdset_t * fdset)
{
if(!fdset) {
return -1;
}
/* Teardown epoll. */
close(fdset->epfd);
/* OK if NULL. */
free(fdset->events);
free(fdset);
return 0;
}
int fdset_epoll_add(fdset_t *fdset, int fd, int events)
{
if (!fdset || fd < 0 || events <= 0) {
return -1;
}
/* Realloc needed. */
if (fdset->nfds == fdset->reserved) {
const size_t chunk = OS_FDS_CHUNKSIZE;
const size_t nsize = (fdset->reserved + chunk) *
sizeof(struct epoll_event);
struct epoll_event *events_n = malloc(nsize);
if (!events_n) {
return -1;
}
/* Clear and copy old fdset data. */
memset(events_n, 0, nsize);
memcpy(events_n, fdset->events,
fdset->nfds * sizeof(struct epoll_event));
free(fdset->events);
fdset->events = events_n;
fdset->reserved += chunk;
}
/* Add to epoll set. */
struct epoll_event ev;
ev.events = EPOLLIN; /*! \todo MAP events. */
ev.data.fd = fd;
if (epoll_ctl(fdset->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
return -1;
}
++fdset->nfds;
return 0;
}
int fdset_epoll_remove(fdset_t *fdset, int fd)
{
if (!fdset || fd < 0) {
return -1;
}
/* Attempt to remove from set. */
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(fdset->epfd, EPOLL_CTL_DEL, fd, &ev) < 0) {
return -1;
}
/* Overwrite current item. */
--fdset->nfds;
/*! \todo Return memory if overallocated (nfds is far lower than reserved). */
return 0;
}
int fdset_epoll_wait(fdset_t *fdset)
{
if (!fdset || fdset->nfds < 1 || !fdset->events) {
return -1;
}
/* Poll new events. */
fdset->polled = 0;
int nfds = epoll_wait(fdset->epfd, fdset->events, fdset->nfds, -1);
/* Check. */
if (nfds < 0) {
return -1;
}
/* Events array is ordered from 0 to nfds. */
fdset->polled = nfds;
return nfds;
}
int fdset_epoll_begin(fdset_t *fdset, fdset_it_t *it)
{
if (!fdset || !it) {
return -1;
}
/* Find first. */
it->pos = 0;
return fdset_next(fdset, it);
}
int fdset_epoll_end(fdset_t *fdset, fdset_it_t *it)
{
if (!fdset || !it || fdset->nfds < 1) {
return -1;
}
/* Check for polled events. */
if (fdset->polled < 1) {
it->fd = -1;
it->pos = 0;
return -1;
}
/* No end found, ends on the beginning. */
size_t nid = fdset->polled - 1;
it->fd = fdset->events[nid].data.fd;
it->pos = nid;
it->events = 0; /*! \todo Map events. */
return -1;
}
int fdset_epoll_next(fdset_t *fdset, fdset_it_t *it)
{
if (!fdset || !it || fdset->nfds < 1) {
return -1;
}
/* Check boundaries. */
if (it->pos >= fdset->polled) {
return -1;
}
/* Select next. */
size_t nid = it->pos++;
it->fd = fdset->events[nid].data.fd;
it->events = 0; /*! \todo Map events. */
return 0;
}
const char* fdset_epoll_method()
{
return "epoll";
}
/* Package APIs. */
struct fdset_backend_t _fdset_epoll = {
.fdset_new = fdset_epoll_new,
.fdset_destroy = fdset_epoll_destroy,
.fdset_add = fdset_epoll_add,
.fdset_remove = fdset_epoll_remove,
.fdset_wait = fdset_epoll_wait,
.fdset_begin = fdset_epoll_begin,
.fdset_end = fdset_epoll_end,
.fdset_next = fdset_epoll_next,
.fdset_method = fdset_epoll_method
};
#endif
#ifdef HAVE_POLL
#include <stdlib.h>
#include <string.h>
#include <sys/poll.h>
#include <stddef.h>
//#include "common/fdset_poll.h"
#define OS_FDS_CHUNKSIZE 8 /*!< Number of pollfd structs in a chunk. */
#define OS_FDS_KEEPCHUNKS 32 /*!< Will attempt to free memory when reached. */
struct fdset_t {
struct pollfd *fds;
nfds_t nfds;
size_t reserved;
size_t polled;
size_t begin;
};
fdset_t *fdset_poll_new()
{
fdset_t *set = malloc(sizeof(fdset_t));
if (!set) {
return 0;
}
/* Blank memory. */
memset(set, 0, sizeof(fdset_t));
return set;
}
int fdset_poll_destroy(fdset_t * fdset)
{
if(!fdset) {
return -1;
}
/*! \todo No teardown required I guess. */
/* OK if NULL. */
free(fdset->fds);
free(fdset);
return 0;
}
int fdset_poll_add(fdset_t *fdset, int fd, int events)
{
if (!fdset || fd < 0 || events <= 0) {
return -1;
}
/* Realloc needed. */
if (fdset->nfds == fdset->reserved) {
const size_t chunk = OS_FDS_CHUNKSIZE;
const size_t nsize = sizeof(struct pollfd) * (fdset->reserved + chunk);
struct pollfd *fds_n = malloc(nsize);
if (!fds_n) {
return -1;
}
/* Clear and copy old fdset data. */
memset(fds_n, 0, nsize);
memcpy(fds_n, fdset->fds, fdset->nfds * sizeof(struct pollfd));
free(fdset->fds);
fdset->fds = fds_n;
fdset->reserved += chunk;
}
/* Append. */
int nid = fdset->nfds++;
fdset->fds[nid].fd = fd;
fdset->fds[nid].events = POLLIN; /*! \todo Map events to POLL events. */
return 0;
}
int fdset_poll_remove(fdset_t *fdset, int fd)
{
if (!fdset || fd < 0) {
return -1;
}
/* Find file descriptor. */
unsigned found = 0;
size_t pos = 0;
for (size_t i = 0; i < fdset->nfds; ++i) {
if (fdset->fds[i].fd == fd) {
found = 1;
pos = i;
break;
}
}
/* Check. */
if (!found) {
return -1;
}
/* Overwrite current item. */
size_t remaining = ((fdset->nfds - pos) - 1) * sizeof(struct pollfd);
memmove(fdset->fds + pos, fdset->fds + (pos + 1), remaining);
--fdset->nfds;
/*! \todo Return memory if overallocated (nfds is far lower than reserved). */
/*! \todo Maybe >64 free chunks is excess? */
return 0;
}
int fdset_poll_wait(fdset_t *fdset)
{
if (!fdset || fdset->nfds < 1 || !fdset->fds) {
return -1;
}
/* Initialize pointers. */
fdset->polled = 0;
fdset->begin = 0;
/* Poll for events. */
int ret = poll(fdset->fds, fdset->nfds, -1);
if (ret < 0) {
return -1;
}
/* Set pointers for iterating. */
fdset->polled = ret;
fdset->begin = 0;
return ret;
}
int fdset_poll_begin(fdset_t *fdset, fdset_it_t *it)
{
if (!fdset || !it) {
return -1;
}
/* Find first. */
it->pos = 0;
return fdset_next(fdset, it);
}
int fdset_poll_end(fdset_t *fdset, fdset_it_t *it)
{
if (!fdset || !it || fdset->nfds < 1) {
return -1;
}
/* Check for polled events. */
if (fdset->polled < 1) {
it->fd = -1;
it->pos = 0;
return -1;
}
/* Trace last matching item from the end. */
struct pollfd* pfd = fdset->fds + fdset->nfds - 1;
while (pfd != fdset->fds) {
if (pfd->events & pfd->revents) {
it->fd = pfd->fd;
it->pos = pfd - fdset->fds;
return 0;
}
}
/* No end found, ends on the beginning. */
it->fd = -1;
it->pos = 0;
return -1;
}
int fdset_poll_next(fdset_t *fdset, fdset_it_t *it)
{
if (!fdset || !it || fdset->nfds < 1) {
return -1;
}
/* Find next with matching flags. */
for (; it->pos < fdset->nfds; ++it->pos) {
struct pollfd* pfd = fdset->fds + it->pos;
if (pfd->events & pfd->revents) {
it->fd = pfd->fd;
it->events = pfd->revents; /*! \todo MAP events. */
++it->pos; /* Next will start after current. */
return 0;
}
}
/* No matching event found. */
it->fd = -1;
it->pos = 0;
return -1;
}
const char* fdset_poll_method()
{
return "poll";
}
/* Package APIs. */
struct fdset_backend_t _fdset_poll = {
.fdset_new = fdset_poll_new,
.fdset_destroy = fdset_poll_destroy,
.fdset_add = fdset_poll_add,
.fdset_remove = fdset_poll_remove,
.fdset_wait = fdset_poll_wait,
.fdset_begin = fdset_poll_begin,
.fdset_end = fdset_poll_end,
.fdset_next = fdset_poll_next,
.fdset_method = fdset_poll_method
};
#endif
......@@ -99,6 +99,8 @@ rem_node(node *n)
z->next = x;
x->prev = z;
n->prev = 0;
n->next = 0;
}
/**
......
......@@ -112,9 +112,19 @@ int sockaddr_tostr(sockaddr_t *addr, char *dst, size_t size)
}
/* Convert. */
const char *ret = inet_ntop(addr->family, addr->ptr, dst, size);
if (ret == 0) {
return -1;
#ifdef DISABLE_IPV6
dst[0] = '\0';
#else
/* Load IPv6 addr if default. */
if (addr->family == AF_INET6) {
inet_ntop(addr->family, &addr->addr6.sin6_addr,
dst, size);
}
#endif
/* Load IPv4 if set. */
if (addr->family == AF_INET) {
inet_ntop(addr->family, &addr->addr4.sin_addr,
dst, size);
}
return 0;
......@@ -130,13 +140,13 @@ int sockaddr_portnum(sockaddr_t *addr)
/* IPv4 */
case AF_INET:
return addr->addr4.sin_port;
return ntohs(addr->addr4.sin_port);
break;
/* IPv6 */
#ifndef DISABLE_IPV6
case AF_INET6:
return addr->addr6.sin6_port;
return ntohs(addr->addr6.sin6_port);
break;
#endif
......
......@@ -259,6 +259,7 @@ int main(int argc, char **argv)
}
}
}
pthread_sigmask(SIG_UNBLOCK, &sa.sa_mask, NULL);
if ((res = server_wait(server)) != KNOTD_EOK) {
log_server_error("An error occured while "
......
......@@ -314,9 +314,6 @@ static int server_bind_handlers(server_t *server)
tcp_unit_size = 3;
}
/*! \bug Bug prevents multithreading with libev-based TCP. */
tcp_unit_size = 1;
/* Lock config. */
conf_read_lock();
......@@ -344,9 +341,9 @@ static int server_bind_handlers(server_t *server)
/* Create TCP handlers. */
if (!iface->handler[TCP_ID]) {
unit = dt_create(tcp_unit_size); /*! \todo Multithreaded TCP. */
tcp_loop_unit(unit);
unit = dt_create(tcp_unit_size);
h = server_create_handler(server, iface->fd[TCP_ID], unit);
tcp_loop_unit(h, unit);
h->type = iface->type[TCP_ID];
h->iface = iface;
......@@ -430,7 +427,9 @@ iohandler_t *server_create_handler(server_t *server, int fd, dt_unit_t *unit)
// Update unit data object
for (int i = 0; i < unit->size; ++i) {
dthread_t *thread = unit->threads[i];
dt_repurpose(thread, thread->run, handler);
if (thread->run) {
dt_repurpose(thread, thread->run, handler);
}
}
/*! \todo This requires either RCU compatible ptr swap or locking. */
......
......@@ -9,10 +9,9 @@
#include <stdio.h>
#include <stdlib.h>
#include <ev.h>
#include "common/sockaddr.h"
#include "common/skip-list.h"
#include "common/fdset.h"
#include "knot/common.h"
#include "knot/server/tcp-handler.h"
#include "knot/server/xfr-handler.h"
......@@ -22,15 +21,12 @@
#include "libknot/util/wire.h"
#include "knot/server/zones.h"
/*! \brief TCP watcher. */
typedef struct tcp_io_t {
ev_io io;
struct ev_loop *loop; /*!< Associated event loop. */
server_t *server; /*!< Name server */
iohandler_t *io_h; /*!< Master I/O handler. */
stat_t *stat; /*!< Statistics gatherer */
unsigned data; /*!< Watcher-related data. */
} tcp_io_t;
/*! \brief TCP worker data. */
typedef struct tcp_worker_t {
iohandler_t *ioh; /*!< Shortcut to I/O handler. */
fdset_t *fdset; /*!< File descriptor set. */
int pipe[2]; /*!< Master-worker signalization pipes. */
} tcp_worker_t;
/*
* Forward decls.
......@@ -43,38 +39,6 @@ static int xfr_send_cb(int session, sockaddr_t *addr, uint8_t *msg, size_t msgle
return tcp_send(session, msg, msglen);
}
/*! \brief Create new TCP connection watcher. */
static inline tcp_io_t* tcp_conn_new(struct ev_loop *loop, int fd, tcp_cb_t cb)
{
tcp_io_t *w = malloc(sizeof(tcp_io_t));
if (w) {
/* Omit invalid filedescriptors. */
w->io.fd = -1;
if (fd >= 0) {
ev_io_init((ev_io *)w, cb, fd, EV_READ);
ev_io_start(loop, (ev_io *)w);
}
w->data = 0;
w->loop = loop;
}
return w;
}
/*! \brief Delete a TCP connection watcher. */
static inline void tcp_conn_free(struct ev_loop *loop, tcp_io_t *w)
{
ev_io_stop(loop, (ev_io *)w);
close(((ev_io *)w)->fd);
free(w);
}
/*! \brief Noop event handler. */
static void tcp_noop(struct ev_loop *loop, ev_io *w, int revents)
{
}
/*!
* \brief TCP event handler function.
*
......@@ -83,31 +47,35 @@ static void tcp_noop(struct ev_loop *loop, ev_io *w, int revents)
* \param w Associated I/O event.
* \param revents Returned events.
*/
static void tcp_handle(struct ev_loop *loop, ev_io *w, int revents)
static void tcp_handle(tcp_worker_t *w, int fd)
{
if (fd < 0 || !w || !w->ioh) {
return;
}
debug_net("tcp: handling TCP event in thread %p.\n",
(void*)pthread_self());
tcp_io_t *tcp_w = (tcp_io_t *)w;
knot_nameserver_t *ns = tcp_w->server->nameserver;
xfrhandler_t *xfr_h = tcp_w->server->xfr_h;
knot_nameserver_t *ns = w->ioh->server->nameserver;
xfrhandler_t *xfr_h = w->ioh->server->xfr_h;
/* Check address type. */
sockaddr_t addr;
if (sockaddr_init(&addr, tcp_w->io_h->type) != KNOTD_EOK) {
if (sockaddr_init(&addr, w->ioh->type) != KNOTD_EOK) {
log_server_error("Socket type %d is not supported, "
"IPv6 support is probably disabled.\n",
tcp_w->io_h->type);
w->ioh->type);
return;
}
/* Receive data. */
uint8_t qbuf[65535]; /*! \todo This may be problematic. */
size_t qbuf_maxlen = sizeof(qbuf);
int n = tcp_recv(w->fd, qbuf, qbuf_maxlen, &addr);
int n = tcp_recv(fd, qbuf, qbuf_maxlen, &addr);
if (n <= 0) {
debug_net("tcp: client disconnected\n");
tcp_conn_free(loop, tcp_w);
fdset_remove(w->fdset, fd);
close(fd);
return;
}
......@@ -172,12 +140,12 @@ static void tcp_handle(struct ev_loop *loop, ev_io *w, int revents)
// packet->wireformat = wire_copy;
// xfr.query = packet; /* Will be freed after processing. */
// xfr.send = xfr_send_cb;
// xfr.session = w->fd;
// xfr.session = fd;
// memcpy(&xfr.addr, &addr, sizeof(sockaddr_t));
// xfr_request(xfr_h, &xfr);
// debug_net("tcp: enqueued IXFR query on fd=%d\n", w->fd);
// debug_net("tcp: enqueued IXFR query on fd=%d\n", fd);
// return;
debug_net("tcp: IXFR not supported, will answer as AXFR on fd=%d\n", w->fd);
debug_net("tcp: IXFR not supported, will answer as AXFR on fd=%d\n", fd);
case KNOT_QUERY_AXFR:
memset(&xfr, 0, sizeof(knot_ns_xfr_t));
xfr.type = XFR_TYPE_AOUT;
......@@ -191,10 +159,10 @@ static void tcp_handle(struct ev_loop *loop, ev_io *w, int revents)
packet->wireformat = wire_copy;
xfr.query = packet;
xfr.send = xfr_send_cb;
xfr.session = w->fd;
xfr.session = fd;
memcpy(&xfr.addr, &addr, sizeof(sockaddr_t));
xfr_request(xfr_h, &xfr);
debug_net("tcp: enqueued AXFR query on fd=%d\n", w->fd);
debug_net("tcp: enqueued AXFR query on fd=%d\n", fd);
return;
case KNOT_QUERY_NOTIFY:
case KNOT_QUERY_UPDATE:
......@@ -214,7 +182,7 @@ static void tcp_handle(struct ev_loop *loop, ev_io *w, int revents)
/* Send answer. */
if (res == KNOTD_EOK) {
assert(resp_len > 0);
res = tcp_send(w->fd, qbuf, resp_len);
res = tcp_send(fd, qbuf, resp_len);
/* Check result. */
if (res != (int)resp_len) {
......@@ -227,13 +195,10 @@ static void tcp_handle(struct ev_loop *loop, ev_io *w, int revents)
return;
}
static void tcp_accept(struct ev_loop *loop, ev_io *w, int revents)
static int tcp_accept(int fd)
{
tcp_io_t *tcp_w = (tcp_io_t *)w;
/* Accept incoming connection. */
debug_net("tcp: accepting connection on fd = %d\n", w->fd);
int incoming = accept(w->fd, 0, 0);
int incoming = accept(fd, 0, 0);
/* Evaluate connection. */
if (incoming < 0) {
......@@ -242,141 +207,53 @@ static void tcp_accept(struct ev_loop *loop, ev_io *w, int revents)
"(%d).\n", errno);
}
} else {
/*! \todo Improve allocation performance. */
tcp_io_t *conn = tcp_conn_new(loop, incoming, tcp_handle);
if (conn) {
conn->server = tcp_w->server;
conn->stat = tcp_w->stat;
conn->io_h = tcp_w->io_h;
}
debug_net("tcp: accepted connection fd = %d\n", incoming);
}
}
static void tcp_interrupt(iohandler_t *h)
{
/* For each thread in unit. */
for (unsigned i = 0; i < h->unit->size; ++i) {
tcp_io_t *w = (tcp_io_t *)(h->unit->threads[i]->data);
/* Only if watcher exists and isn't I/O handler. */
if (w && (void*)w != (void*)h) {
/* Stop master socket watcher. */
if (w->io.fd >= 0) {
ev_io_stop(w->loop, (ev_io *)w);
}
/* Break loop. */
ev_unloop(w->loop, EVUNLOOP_ALL);
}
}
return incoming;
}
static void tcp_loop_install(dthread_t *thread, int fd, tcp_cb_t cb)
tcp_worker_t* tcp_worker_create()
{
iohandler_t *handler = (iohandler_t *)thread->data;
/* Install interrupt handler. */
handler->interrupt = tcp_interrupt;
/* Create event loop. */
/*! \todo Maybe check for EVFLAG_NOSIGMASK support? */
struct ev_loop *loop = ev_loop_new(0);
/* Watch bound socket if exists. */
tcp_io_t *w = tcp_conn_new(loop, fd, cb);
if (w) {
w->io_h = handler;
w->server = handler->server;
w->stat = 0; //!< \todo Implement stat.
tcp_worker_t *w = malloc(sizeof(tcp_worker_t));
if (!w) {
debug_net("tcp_master: out of memory when creating worker\n");
return 0;
}
/* Reinstall as thread-specific data. */
thread->data = w;
}
static void tcp_loop_uninstall(dthread_t *thread)
{
tcp_io_t *w = (tcp_io_t *)thread->data;
/* Free watcher if exists. */
if (w) {
ev_loop_destroy(w->loop);
/* Create signal pipes. */
memset(w, 0, sizeof(tcp_worker_t));
if (pipe(w->pipe) < 0) {
free(w);
return 0;
}
/* Invalidate thread data. */
thread->data = 0;
}
/*! \brief Switch event loop in threading unit in RR fashion
* and accept connection in it.
*/
static void tcp_accept_rr(struct ev_loop *loop, ev_io *w, int revents)
{
tcp_io_t *tcp_w = (tcp_io_t *)w;
/* Select next loop thread. */
dt_unit_t *unit = tcp_w->io_h->unit;
dthread_t *thr = unit->threads[tcp_w->data];
/* Select loop from selected thread. */
tcp_io_t *thr_w = (tcp_io_t *)thr->data;
if (thr_w) {
loop = thr_w->loop;
}
/* Move to next thread in unit. */
tcp_w->data = get_next_rr(tcp_w->data, unit->size);
/* Accept incoming connection in target loop. */
tcp_accept(loop, w, revents);
}
static int tcp_loop_run(dthread_t *thread)
{
debug_dt("dthreads: [%p] running TCP loop, state: %d\n",
thread, thread->state);
/* Fetch loop. */
tcp_io_t *w = (tcp_io_t *)thread->data;
/* Accept clients. */
debug_net("tcp: loop started, backend = 0x%x\n", ev_backend(w->loop));
for (;;) {
/* Cancellation point. */
if (dt_is_cancelled(thread)) {
break;
}
/* Run event loop for accepting connections. */
ev_loop(w->loop, 0);
/* Create fdset. */
w->fdset = fdset_new();
if (!w->fdset) {
close(w->pipe[0]);
close(w->pipe[1]);
free(w);
}
/* Stop whole unit. */
debug_net("tcp: loop finished\n");
return KNOTD_EOK;
fdset_add(w->fdset, w->pipe[0], OS_EV_READ);
return w;
}
int tcp_loop_master_rr(dthread_t *thread)
void tcp_worker_free(tcp_worker_t* w)
{
iohandler_t *handler = (iohandler_t *)thread->data;
/* Check socket. */
if (handler->fd < 0) {
debug_net("tcp_master: null socket recevied, finishing.\n");
return KNOTD_EINVAL;
if (!w) {
return;
}
debug_net("tcp_master: threading unit master with %d workers\n",
thread->unit->size - 1);
int dupfd = dup(handler->fd);
int ret = tcp_loop(thread, dupfd, tcp_accept_rr);
close(dupfd);
return ret;
/* Destroy fdset. */
fdset_destroy(w->fdset);
/* Close pipe write end and worker. */
close(w->pipe[0]);
close(w->pipe[1]);
free(w);
}
/*
......@@ -440,9 +317,21 @@ int tcp_recv(int fd, uint8_t *buf, size_t len, sockaddr_t *addr)
}
/* Receive payload. */
n = recv(fd, buf, pktsize, MSG_WAITALL);
if (n <= 0) {
return KNOTD_ERROR;
size_t to_read = pktsize;
size_t readb = 0;
while (to_read > 0) {
/*! \todo Implement timeout to prevent keeping recv() locked. */
n = recv(fd, buf + readb, to_read, MSG_WAITALL);
if (n <= 0) {
/* Ignore interrupted calls. */
if (n < 0 && errno == EINTR) {
continue;
}
return KNOTD_ERROR;
}
readb += n;
to_read -= n;
}
/* Get peer name. */
......@@ -454,66 +343,156 @@ int tcp_recv(int fd, uint8_t *buf, size_t len, sockaddr_t *addr)
debug_net("tcp: received packet size=%hu on fd=%d\n",
pktsize, fd);
return n;
}
int tcp_loop(dthread_t *thread, int fd, tcp_cb_t cb)
{
/* Install event loop. */
tcp_loop_install(thread, fd, cb);
/* Run event loop. */
int ret = tcp_loop_run(thread);
/* Uninstall event loop. */
tcp_loop_uninstall(thread);
return ret;
return pktsize;
}
int tcp_loop_master(dthread_t *thread)
{
iohandler_t *handler = (iohandler_t *)thread->data;
dt_unit_t *unit = thread->unit;
tcp_worker_t **workers = handler->data;
/* Check socket. */
if (handler->fd < 0) {
debug_net("tcp_master: null socket recevied, finishing.\n");
if (!handler || handler->fd < 0 || !workers) {
debug_net("tcp_master: failed to initialize\n");
return KNOTD_EINVAL;
}
debug_net("tcp_master: created with %d workers\n",
thread->unit->size - 1);
/* Accept connections. */
int id = 0;
debug_net("tcp_master: created with %d workers\n", unit->size - 1);
while(1) {
/* Check for cancellation. */
if (dt_is_cancelled(thread)) {
break;
}
int dupfd = dup(handler->fd);
int ret = tcp_loop(thread, dupfd, tcp_accept);
close(dupfd);
/* Accept client. */
int client = tcp_accept(handler->fd);
if (client < 0) {
continue;
}
/* Add to worker in RR fashion. */
if (write(workers[id]->pipe[1], &client, sizeof(int)) < 0) {
debug_net("tcp_master: failed to register fd=%d to "
"worker=%d\n", client, id);
close(client);
continue;
}
id = get_next_rr(id, unit->size - 1);
}
return ret;
debug_net("tcp_master: finished\n");
free(workers);
return KNOTD_EOK;
}
int tcp_loop_worker(dthread_t *thread)
{
return tcp_loop(thread, -1, tcp_noop);
tcp_worker_t *w = thread->data;
if (!w) {
return KNOTD_EINVAL;
}
/* Accept clients. */
debug_net("tcp: worker started, backend = %s\n", fdset_method());
for (;;) {
/* Cancellation point. */
if (dt_is_cancelled(thread)) {
break;
}
/* Wait for events. */
int nfds = fdset_wait(w->fdset);
if (nfds <= 0) {
continue;
}
/* Process incoming events. */
debug_net("tcp_worker: registered %d events\n",
nfds);
fdset_it_t it;
fdset_begin(w->fdset, &it);
while(1) {
/* Handle incoming clients. */
if (it.fd == w->pipe[0]) {
int client = 0;
if (read(it.fd, &client, sizeof(int)) < 0) {
continue;
}
debug_net("tcp_worker: registered client %d\n",
client);
fdset_add(w->fdset, client, OS_EV_READ);
} else {
/* Handle other events. */
tcp_handle(w, it.fd);
}
/* Check if next exists. */
if (fdset_next(w->fdset, &it) != 0) {
break;
}
}
}
/* Stop whole unit. */
debug_net("tcp_worker: worker finished\n");
tcp_worker_free(w);
return KNOTD_EOK;
}
int tcp_loop_unit(dt_unit_t *unit)
int tcp_loop_unit(iohandler_t *ioh, dt_unit_t *unit)
{
if (unit->size < 1) {
return KNOTD_EINVAL;
}
/* Create unit data. */
tcp_worker_t **workers = malloc((unit->size - 1) *
sizeof(tcp_worker_t *));
if (!workers) {
debug_net("tcp_master: out of memory\n");
return KNOTD_EINVAL;
}
/*! \todo Implement working master+worker threads. */
/* Repurpose first thread as master (unit controller). */
//dt_repurpose(unit->threads[0], tcp_loop_master_rr, 0);
/* Repurpose remaining threads as workers. */
//for (unsigned i = 1; i < unit->size; ++i) {
// dt_repurpose(unit->threads[i], tcp_loop_worker, 0);
//}
for (unsigned i = 0; i < 1; ++i) {
dt_repurpose(unit->threads[i], tcp_loop_master, 0);
/* Prepare worker data. */
unsigned allocated = 0;
for (unsigned i = 0; i < unit->size - 1; ++i) {
workers[i] = tcp_worker_create();
if (workers[i] == 0) {
break;
}
workers[i]->ioh = ioh;
++allocated;
}
/* Check allocated workers. */
if (allocated != unit->size - 1) {
for (unsigned i = 0; i < allocated; ++i) {
tcp_worker_free(workers[i]);
}
free(workers);
debug_net("tcp_master: out of memory when allocated workers\n");
return KNOTD_EINVAL;
}
/* Store worker data. */
ioh->data = workers;
/* Repurpose workers. */
for (unsigned i = 0; i < allocated; ++i) {
dt_repurpose(unit->threads[i + 1], tcp_loop_worker, workers[i]);
}
/* Repurpose first thread as master (unit controller). */
dt_repurpose(unit->threads[0], tcp_loop_master, ioh);
return KNOTD_EOK;
}
......@@ -26,9 +26,6 @@
#include "knot/server/server.h"
#include "knot/server/dthreads.h"
/*! \brief TCP event callback. */
typedef void (*tcp_cb_t)(struct ev_loop *, ev_io*, int);
/*!
* \brief Send TCP message.
*
......@@ -55,20 +52,6 @@ int tcp_send(int fd, uint8_t *msg, size_t msglen);
*/
int tcp_recv(int fd, uint8_t *buf, size_t len, sockaddr_t *addr);
/*!
* \brief Generic TCP event loop.
*
* Run TCP handler event loop.
*
* \param thread Associated thread from DThreads unit.
* \param fd First descriptor to be watched (or -1).
* \param cb Callback on fd event.
*
* \retval KNOTD_EOK on success.
* \retval KNOTD_EINVAL invalid parameters.
*/
int tcp_loop(dthread_t *thread, int fd, tcp_cb_t cb);
/*!
* \brief TCP event loop for accepting connections.
*
......@@ -94,12 +77,13 @@ int tcp_loop_worker(dthread_t *thread);
*
* Set-up threading unit for processing TCP requests.
*
* \param ioh Associated I/O handler.
* \param thread Associated thread from DThreads unit.
*
* \retval KNOTD_EOK on success.
* \retval KNOTD_EINVAL invalid parameters.
*/
int tcp_loop_unit(dt_unit_t *unit);
int tcp_loop_unit(iohandler_t *ioh, dt_unit_t *unit);
#endif // _KNOTD_TCPHANDLER_H_
......
This diff is collapsed.
......@@ -12,23 +12,37 @@
#ifndef _KNOTD_XFRHANDLER_H_
#define _KNOTD_XFRHANDLER_H_
#include <ev.h>
#include "knot/server/dthreads.h"
#include "libknot/nameserver/name-server.h"
#include "common/evqueue.h"
#include "common/fdset.h"
#include "common/skip-list.h" /*!< \todo Consider another data struct. */
struct xfrhandler_t;
/*!
* \brief XFR worker structure.
*/
typedef struct xfrworker_t
{
knot_nameserver_t *ns; /*!< \brief Pointer to nameserver.*/
evqueue_t *q; /*!< \brief Shared XFR requests queue.*/
fdset_t *fdset; /*!< \brief File descriptor set. */
struct xfrhandler_t *master; /*! \brief Worker master. */
} xfrworker_t;
/*!
* \brief XFR handler structure.
*/
typedef struct xfrhandler_t
{
dt_unit_t *unit; /*!< \brief Threading unit. */
knot_nameserver_t *ns; /*!< \brief Pointer to nameserver.*/
evqueue_t *q; /*!< \brief Shared XFR requests queue.*/
evqueue_t *cq; /*!< \brief XFR client requests queue.*/
struct ev_loop *loop; /*!< \brief Event loop. */
dt_unit_t *unit; /*!< \brief Threading unit. */
xfrworker_t **workers; /*!< \brief Workers. */
skip_list_t *tasks; /*!< \brief Pending tasks. */
pthread_mutex_t tasks_mx; /*!< \brief Tasks synchronisation. */
void (*interrupt)(struct xfrhandler_t *h); /*!< Interrupt handler. */
unsigned rr; /*!< \brief Round-Robin counter. */
pthread_mutex_t rr_mx; /*!< \brief RR mutex. */
} xfrhandler_t;
/*!
......@@ -104,20 +118,6 @@ static inline int xfr_join(xfrhandler_t *handler) {
*/
int xfr_request(xfrhandler_t *handler, knot_ns_xfr_t *req);
/*!
* \brief Enqueue XFR/IN related request.
*
* Request is processed in threads for XFR/IN.
*
* \param handler XFR handler instance.
* \param req XFR request.
*
* \retval KNOTD_EOK on success.
* \retval KNOTD_EINVAL on NULL handler or request.
* \retval KNOTD_ERROR on error.
*/
int xfr_client_relay(xfrhandler_t *handler, knot_ns_xfr_t *req);
/*!
* \brief XFR master runnable.
*
......@@ -129,19 +129,7 @@ int xfr_client_relay(xfrhandler_t *handler, knot_ns_xfr_t *req);
* \retval KNOTD_EOK on success.
* \retval KNOTD_EINVAL invalid parameters.
*/
int xfr_master(dthread_t *thread);
/*!
* \brief XFR client runnable.
*
* Processess AXFR/IXFR client sessions.
*
* \param thread Associated thread from DThreads unit.
*
* \retval KNOTD_EOK on success.
* \retval KNOTD_EINVAL invalid parameters.
*/
int xfr_client(dthread_t *thread);
int xfr_worker(dthread_t *thread);
#endif // _KNOTD_XFRHANDLER_H_
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment