Skip to content
Snippets Groups Projects
Commit 873e0d97 authored by Jan Hák's avatar Jan Hák Committed by Daniel Salzman
Browse files

fdset: polling using kqueue on BSD systems

parent 5ca366a9
No related branches found
No related tags found
No related merge requests found
Pipeline #80091 passed
......@@ -286,22 +286,33 @@ AS_IF([test "$enable_systemd" = "yes"],[
# Socket polling method
socket_polling=
AC_ARG_WITH([socket-polling],
AS_HELP_STRING([--with-socket-polling=auto|poll|epoll],
AS_HELP_STRING([--with-socket-polling=auto|poll|epoll|kqueue|libkqueue],
[Use specific socket polling method [default=auto]]),
[socket_polling=$withval], [socket_polling=auto]
)
AS_CASE([$socket_polling],
[auto], [AC_CHECK_FUNCS([epoll_create],
[AC_DEFINE([HAVE_EPOLL], [1], [epoll available])
socket_polling=epoll],
[socket_polling=poll])],
[poll], [socket_polling=poll],
[auto], [AC_CHECK_FUNCS([kqueue],
[AC_DEFINE([HAVE_KQUEUE], [1], [kqueue available])
socket_polling=kqueue],
[AC_CHECK_FUNCS([epoll_create],
[AC_DEFINE([HAVE_EPOLL], [1], [epoll available])
socket_polling=epoll],
[socket_polling=poll])])],
[poll], [socket_polling=poll],
[epoll], [AC_CHECK_FUNCS([epoll_create],
[AC_DEFINE([HAVE_EPOLL], [1], [epoll available])
socket_polling=epoll],
[AC_MSG_ERROR([Epoll not available.])])],
[*], [AC_MSG_ERROR([Invalid value of --socket-polling.])]
[AC_DEFINE([HAVE_EPOLL], [1], [epoll available])
socket_polling=epoll],
[AC_MSG_ERROR([epoll not available.])])],
[kqueue], [AC_CHECK_FUNCS([kqueue],
[AC_DEFINE([HAVE_KQUEUE], [1], [kqueue available])
socket_polling=kqueue],
[AC_MSG_ERROR([kqueue not available.])])],
[libkqueue], [PKG_CHECK_MODULES([libkqueue], [libkqueue],
[AC_DEFINE([HAVE_KQUEUE], [1], [libkqueue available])
socket_polling=libkqueue],
[AC_MSG_ERROR([libkqueue not available.])])],
[*], [AC_MSG_ERROR([Invalid value of --socket-polling.])]
)
# Alternative memory allocator
......
libknotd_la_CPPFLAGS = $(AM_CPPFLAGS) $(CFLAG_VISIBILITY) $(liburcu_CFLAGS) \
$(lmdb_CFLAGS) $(systemd_CFLAGS) -DKNOTD_MOD_STATIC
libknotd_la_CPPFLAGS = $(AM_CPPFLAGS) $(CFLAG_VISIBILITY) $(libkqueue_CFLAGS) \
$(liburcu_CFLAGS) $(lmdb_CFLAGS) $(systemd_CFLAGS) \
-DKNOTD_MOD_STATIC
libknotd_la_LDFLAGS = $(AM_LDFLAGS) -export-symbols-regex '^knotd_'
libknotd_la_LIBADD = $(dlopen_LIBS) $(pthread_LIBS)
libknotd_la_LIBADD = $(dlopen_LIBS) $(libkqueue_LIBS) $(pthread_LIBS)
libknotd_LIBS = libknotd.la libknot.la libdnssec.la libzscanner.la \
$(libcontrib_LIBS) $(liburcu_LIBS) $(lmdb_LIBS) \
$(systemd_LIBS)
......
......@@ -36,7 +36,7 @@ static int fdset_resize(fdset_t *set, const unsigned size)
MEM_RESIZE(set->ctx, size);
MEM_RESIZE(set->timeout, size);
#ifdef HAVE_EPOLL
#if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
MEM_RESIZE(set->ev, size);
#else
MEM_RESIZE(set->pfd, size);
......@@ -53,16 +53,20 @@ int fdset_init(fdset_t *set, const unsigned size)
memset(set, 0, sizeof(*set));
#if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
#ifdef HAVE_EPOLL
set->efd = epoll_create1(0);
if (set->efd < 0) {
set->pfd = epoll_create1(0);
#elif HAVE_KQUEUE
set->pfd = kqueue();
#endif
if (set->pfd < 0) {
return knot_map_errno();
}
#endif
int ret = fdset_resize(set, size);
#ifdef HAVE_EPOLL
#if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
if (ret != KNOT_EOK) {
close(set->efd);
close(set->pfd);
}
#endif
return ret;
......@@ -76,10 +80,10 @@ void fdset_clear(fdset_t *set)
free(set->ctx);
free(set->timeout);
#ifdef HAVE_EPOLL
#if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
free(set->ev);
free(set->recv_ev);
close(set->efd);
close(set->pfd);
#else
free(set->pfd);
#endif
......@@ -107,7 +111,12 @@ int fdset_add(fdset_t *set, const int fd, const fdset_event_t events, void *ctx)
.data.u64 = idx,
.events = events
};
if (epoll_ctl(set->efd, EPOLL_CTL_ADD, fd, &ev) != 0) {
if (epoll_ctl(set->pfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
return knot_map_errno();
}
#elif HAVE_KQUEUE
EV_SET(&set->ev[idx], fd, events, EV_ADD, 0, 0, (void *)(intptr_t)idx);
if (kevent(set->pfd, &set->ev[idx], 1, NULL, 0, NULL) < 0) {
return knot_map_errno();
}
#else
......@@ -128,7 +137,25 @@ int fdset_remove(fdset_t *set, const unsigned idx)
const int fd = fdset_get_fd(set, idx);
#ifdef HAVE_EPOLL
/* This is necessary as DDNS duplicates file descriptors! */
(void)epoll_ctl(set->efd, EPOLL_CTL_DEL, fd, NULL);
if (epoll_ctl(set->pfd, EPOLL_CTL_DEL, fd, NULL) != 0) {
close(fd);
return knot_map_errno();
}
#elif HAVE_KQUEUE
/* Return delete flag back to original filter number. */
#if defined(__NetBSD__)
if ((signed short)set->ev[idx].filter < 0)
#else
if (set->ev[idx].filter >= 0)
#endif
{
set->ev[idx].filter = ~set->ev[idx].filter;
}
set->ev[idx].flags = EV_DELETE;
if (kevent(set->pfd, &set->ev[idx], 1, NULL, 0, NULL) < 0) {
close(fd);
return knot_map_errno();
}
#endif
close(fd);
......@@ -137,15 +164,23 @@ int fdset_remove(fdset_t *set, const unsigned idx)
if (idx < last) {
set->ctx[idx] = set->ctx[last];
set->timeout[idx] = set->timeout[last];
#ifdef HAVE_EPOLL
#if defined(HAVE_EPOLL) || defined (HAVE_KQUEUE)
set->ev[idx] = set->ev[last];
#ifdef HAVE_EPOLL
struct epoll_event ev = {
.data.u64 = idx,
.events = set->ev[idx].events
};
if (epoll_ctl(set->efd, EPOLL_CTL_MOD, set->ev[last].data.fd, &ev) != 0) {
if (epoll_ctl(set->pfd, EPOLL_CTL_MOD, set->ev[last].data.fd, &ev) != 0) {
return knot_map_errno();
}
#elif HAVE_KQUEUE
EV_SET(&set->ev[idx], set->ev[last].ident, set->ev[last].filter,
EV_ADD, 0, 0, (void *)(intptr_t)idx);
if (kevent(set->pfd, &set->ev[idx], 1, NULL, 0, NULL) < 0) {
return knot_map_errno();
}
#endif
#else
set->pfd[idx] = set->pfd[last];
#endif
......@@ -167,21 +202,21 @@ int fdset_poll(fdset_t *set, fdset_it_t *it, const unsigned offset, const int ti
it->set = set;
it->idx = offset;
#ifdef HAVE_EPOLL
#if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
if (set->recv_size != set->size) {
MEM_RESIZE(set->recv_ev, set->size);
set->recv_size = set->size;
}
it->ptr = set->recv_ev;
it->dirty = 0;
/*
* NOTE: Can't skip offset without bunch of syscalls!!
* Because of that it waits for `ctx->n` (every socket). Offset is set when TCP
* trotlling is ON. Sometimes it can return with sockets where none of them are
* connection socket, but it should not be common.
* But it can cause problems when adopted in other use-case.
*/
it->unprocessed = epoll_wait(set->efd, set->recv_ev, set->n, timeout_ms);
#ifdef HAVE_EPOLL
if (set->n == 0) {
return 0;
}
if ((it->unprocessed = epoll_wait(set->pfd, set->recv_ev, set->recv_size,
timeout_ms)) == -1) {
return knot_map_errno();
}
#ifndef NDEBUG
/* In specific circumstances with valgrind, it sometimes happens that
* `set->n < it->unprocessed`. */
......@@ -190,6 +225,26 @@ int fdset_poll(fdset_t *set, fdset_it_t *it, const unsigned offset, const int ti
it->unprocessed = 0;
}
#endif
#elif HAVE_KQUEUE
struct timespec timeout = {
.tv_sec = timeout_ms / 1000,
.tv_nsec = (timeout_ms % 1000) * 1000000
};
if ((it->unprocessed = kevent(set->pfd, NULL, 0, set->recv_ev, set->recv_size,
(timeout_ms >= 0) ? &timeout : NULL)) == -1) {
return knot_map_errno();
}
#endif
/*
* NOTE: Can't skip offset without bunch of syscalls!
* Because of that it waits for `ctx->n` (every socket). Offset is set when TCP
* trotlling is ON. Sometimes it can return with sockets where none of them is
* connected socket, but it should not be common.
*/
while (it->unprocessed > 0 && fdset_it_get_idx(it) < it->idx) {
it->ptr++;
it->unprocessed--;
}
return it->unprocessed;
#else
it->unprocessed = poll(&set->pfd[offset], set->n - offset, timeout_ms);
......@@ -213,7 +268,7 @@ void fdset_it_commit(fdset_it_t *it)
if (it == NULL) {
return;
}
#ifdef HAVE_EPOLL
#if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
/* NOTE: reverse iteration to avoid as much "remove last" operations
* as possible. I'm not sure about performance improvement. It
* will skip some syscalls at begin of iteration, but what
......@@ -221,7 +276,16 @@ void fdset_it_commit(fdset_it_t *it)
*/
fdset_t *set = it->set;
for (int i = set->n - 1; it->dirty > 0 && i >= 0; --i) {
if (set->ev[i].events == FDSET_REMOVE_FLAG) {
#ifdef HAVE_EPOLL
if (set->ev[i].events == FDSET_REMOVE_FLAG)
#else
#if defined(__NetBSD__)
if ((signed short)set->ev[i].filter < 0)
#else
if (set->ev[i].filter >= 0)
#endif
#endif
{
(void)fdset_remove(set, i);
it->dirty--;
}
......@@ -264,6 +328,7 @@ void fdset_sweep(fdset_t *set, const fdset_sweep_cb_t cb, void *data)
const int fd = fdset_get_fd(set, idx);
if (cb(set, fd, data) == FDSET_SWEEP) {
(void)fdset_remove(set, idx);
continue;
}
}
++idx;
......
......@@ -27,6 +27,8 @@
#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#elif HAVE_KQUEUE
#include <sys/event.h>
#else
#include <poll.h>
#endif
......@@ -44,11 +46,16 @@ typedef struct {
unsigned size; /*!< Array size (allocated). */
void **ctx; /*!< Context for each fd. */
time_t *timeout; /*!< Timeout for each fd (seconds precision). */
#if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
#ifdef HAVE_EPOLL
struct epoll_event *ev; /*!< Epoll event storage for each fd. */
struct epoll_event *recv_ev; /*!< Array for polled events. */
#elif HAVE_KQUEUE
struct kevent *ev; /*!< Kqueue event storage for each fd. */
struct kevent *recv_ev; /*!< Array for polled events. */
#endif
unsigned recv_size; /*!< Size of array for polled events. */
int efd; /*!< File descriptor of epoll. */
int pfd; /*!< File descriptor of kernel polling structure (epoll or kqueue). */
#else
struct pollfd *pfd; /*!< Poll state for each fd. */
#endif
......@@ -59,8 +66,12 @@ typedef struct {
fdset_t *set; /*!< Source fdset_t. */
unsigned idx; /*!< Event index offset. */
int unprocessed; /*!< Unprocessed events left. */
#if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
#ifdef HAVE_EPOLL
struct epoll_event *ptr; /*!< Pointer on processed event. */
#elif HAVE_KQUEUE
struct kevent *ptr; /*!< Pointer on processed event. */
#endif
unsigned dirty; /*!< Number of fd to be removed on commit. */
#endif
} fdset_it_t;
......@@ -69,6 +80,9 @@ typedef enum {
#ifdef HAVE_EPOLL
FDSET_POLLIN = EPOLLIN,
FDSET_POLLOUT = EPOLLOUT,
#elif HAVE_KQUEUE
FDSET_POLLIN = EVFILT_READ,
FDSET_POLLOUT = EVFILT_WRITE,
#else
FDSET_POLLIN = POLLIN,
FDSET_POLLOUT = POLLOUT,
......@@ -180,6 +194,8 @@ inline static int fdset_get_fd(const fdset_t *set, const unsigned idx)
#ifdef HAVE_EPOLL
return set->ev[idx].data.fd;
#elif HAVE_KQUEUE
return set->ev[idx].ident;
#else
return set->pfd[idx].fd;
#endif
......@@ -212,6 +228,8 @@ inline static unsigned fdset_it_get_idx(const fdset_it_t *it)
#ifdef HAVE_EPOLL
return it->ptr->data.u64;
#elif HAVE_KQUEUE
return (unsigned)(intptr_t)it->ptr->udata;
#else
return it->idx;
#endif
......@@ -231,6 +249,8 @@ inline static int fdset_it_get_fd(const fdset_it_t *it)
#ifdef HAVE_EPOLL
return it->set->ev[fdset_it_get_idx(it)].data.fd;
#elif HAVE_KQUEUE
return it->ptr->ident;
#else
return it->set->pfd[it->idx].fd;
#endif
......@@ -245,7 +265,7 @@ inline static void fdset_it_next(fdset_it_t *it)
{
assert(it);
#ifdef HAVE_EPOLL
#if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
do {
it->ptr++;
it->unprocessed--;
......@@ -272,6 +292,26 @@ inline static void fdset_it_remove(fdset_it_t *it)
const int idx = fdset_it_get_idx(it);
it->set->ev[idx].events = FDSET_REMOVE_FLAG;
it->dirty++;
#elif HAVE_KQUEUE
const int idx = fdset_it_get_idx(it);
/* Bitwise negated filter marks event for delete. */
/* Filters become: */
/* [FreeBSD] */
/* EVFILT_READ (-1) -> 0 */
/* EVFILT_WRITE (-2) -> 1 */
/* [NetBSD] */
/* EVFILT_READ (0) -> -1 */
/* EVFILT_WRITE (1) -> -2 */
/* If not marked for delete then mark for delete. */
#if defined(__NetBSD__)
if ((signed short)it->set->ev[idx].filter >= 0)
#else
if (it->set->ev[idx].filter < 0)
#endif
{
it->set->ev[idx].filter = ~it->set->ev[idx].filter;
}
it->dirty++;
#else
(void)fdset_remove(it->set, fdset_it_get_idx(it));
/* Iterator should return on last valid already processed element. */
......@@ -314,6 +354,8 @@ inline static bool fdset_it_is_pollin(const fdset_it_t *it)
#ifdef HAVE_EPOLL
return it->ptr->events & EPOLLIN;
#elif HAVE_KQUEUE
return it->ptr->filter == EVFILT_READ;
#else
return it->set->pfd[it->idx].revents & POLLIN;
#endif
......@@ -332,6 +374,8 @@ inline static bool fdset_it_is_error(const fdset_it_t *it)
#ifdef HAVE_EPOLL
return it->ptr->events & (EPOLLERR | EPOLLHUP);
#elif HAVE_KQUEUE
return it->ptr->flags & EV_ERROR;
#else
return it->set->pfd[it->idx].revents & (POLLERR | POLLHUP | POLLNVAL);
#endif
......
......@@ -6,7 +6,7 @@ noinst_LTLIBRARIES += libknotus.la
libknotus_la_CPPFLAGS = $(AM_CPPFLAGS) $(CFLAG_VISIBILITY) $(gnutls_CFLAGS) \
$(libedit_CFLAGS) $(libidn2_CFLAGS) $(libidn_CFLAGS) \
$(libnghttp2_CFLAGS) $(lmdb_CFLAGS)
$(libkqueue_CFLAGS) $(libnghttp2_CFLAGS) $(lmdb_CFLAGS)
libknotus_la_LDFLAGS = $(AM_LDFLAGS) $(LDFLAG_EXCLUDE_LIBS)
libknotus_la_LIBADD = $(libidn2_LIBS) $(libidn_LIBS) $(libnghttp2_LIBS)
libknotus_LIBS = libknotus.la libknot.la libdnssec.la $(libcontrib_LIBS) \
......
......@@ -28,6 +28,7 @@ knotd_stdio_SOURCES = \
knotd_stdio_CPPFLAGS = \
$(AM_CPPFLAGS) \
$(libkqueue_CFLAGS) \
$(liburcu_CFLAGS) \
$(lmdb_CFLAGS) \
$(systemd_CFLAGS)
......
......@@ -4,6 +4,7 @@ AM_CPPFLAGS = \
-I$(top_srcdir)/src/libdnssec \
-I$(top_srcdir)/src/libdnssec/shared \
$(gnutls_CFLAGS) \
$(libkqueue_CFLAGS) \
$(lmdb_CFLAGS)
LDADD = \
......
......@@ -96,6 +96,9 @@ int main(int argc, char *argv[])
ok(fdset_get_length(&fdset) == 2, "fdset size 2");
close(fds1[1]);
int fd2_dup = dup(fds2[0]);
ok(fd2_dup >= 0, "duplicate fd");
ret = fdset_poll(&fdset, &it, 0, 100);
struct timespec time2 = time_now();
double diff2 = time_diff_ms(&time0, &time2);
......@@ -116,16 +119,22 @@ int main(int argc, char *argv[])
}
fdset_it_commit(&it);
ok(fdset_get_length(&fdset) == 1, "fdset size 1");
close(fds2[1]);
pthread_join(t1, 0);
pthread_join(t2, 0);
ret = fdset_remove(&fdset, 0);
ok(ret == KNOT_EOK, "fdset remove");
close(fds0[1]);
ok(fdset_get_length(&fdset) == 0, "fdset size 0");
pthread_join(t1, 0);
pthread_join(t2, 0);
write(fds2[1], &PATTERN2, 1);
ret = fdset_poll(&fdset, &it, 0, 100);
ok(ret == 0, "fdset_poll return 3");
close(fds2[1]);
close(fd2_dup);
fdset_clear(&fdset);
return 0;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment