From e69055ccff1b0d2191002803ae710900ac92c15c Mon Sep 17 00:00:00 2001 From: Libor Peltan <libor.peltan@nic.cz> Date: Fri, 3 Jan 2025 13:30:13 +0100 Subject: [PATCH 1/5] refactoring: move ctlsocket-related and signal-related code from knotd/main.c --- Knot.files | 4 + src/knot/Makefile.inc | 4 + src/knot/ctl/threads.c | 160 +++++++++++++++++++++ src/knot/ctl/threads.h | 72 ++++++++++ src/knot/server/signals.c | 86 +++++++++++ src/knot/server/signals.h | 18 +++ src/utils/knotd/main.c | 293 +++----------------------------------- 7 files changed, 363 insertions(+), 274 deletions(-) create mode 100644 src/knot/ctl/threads.c create mode 100644 src/knot/ctl/threads.h create mode 100644 src/knot/server/signals.c create mode 100644 src/knot/server/signals.h diff --git a/Knot.files b/Knot.files index 4bdfa08a9c..307c066eb7 100644 --- a/Knot.files +++ b/Knot.files @@ -209,6 +209,8 @@ src/knot/ctl/commands.c src/knot/ctl/commands.h src/knot/ctl/process.c src/knot/ctl/process.h +src/knot/ctl/threads.c +src/knot/ctl/threads.h src/knot/dnssec/context.c src/knot/dnssec/context.h src/knot/dnssec/ds_query.c @@ -340,6 +342,8 @@ src/knot/server/quic-handler.c src/knot/server/quic-handler.h src/knot/server/server.c src/knot/server/server.h +src/knot/server/signals.c +src/knot/server/signals.h src/knot/server/tcp-handler.c src/knot/server/tcp-handler.h src/knot/server/udp-handler.c diff --git a/src/knot/Makefile.inc b/src/knot/Makefile.inc index d1e5f5c122..0286d08e01 100644 --- a/src/knot/Makefile.inc +++ b/src/knot/Makefile.inc @@ -44,6 +44,8 @@ libknotd_la_SOURCES = \ knot/ctl/commands.h \ knot/ctl/process.c \ knot/ctl/process.h \ + knot/ctl/threads.c \ + knot/ctl/threads.h \ knot/dnssec/context.c \ knot/dnssec/context.h \ knot/dnssec/ds_query.c \ @@ -163,6 +165,8 @@ libknotd_la_SOURCES = \ knot/server/proxyv2.h \ knot/server/server.c \ knot/server/server.h \ + knot/server/signals.c \ + knot/server/signals.h \ knot/server/tcp-handler.c \ knot/server/tcp-handler.h \ knot/server/udp-handler.c \ diff --git a/src/knot/ctl/threads.c b/src/knot/ctl/threads.c new file mode 100644 index 0000000000..d8dd20d5b7 --- /dev/null +++ b/src/knot/ctl/threads.c @@ -0,0 +1,160 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. and contributors + * SPDX-License-Identifier: GPL-2.0-or-later + * For more information, see <https://www.knot-dns.cz/> + */ + +#include <urcu.h> + +#include "contrib/threads.h" +#include "knot/ctl/threads.h" +#include "knot/server/signals.h" + +void ctl_init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, server_t *server) +{ + for (size_t i = 0; i < n_ctxs; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_init(&cctx->mutex, NULL); + pthread_cond_init(&cctx->cond, NULL); + cctx->server = server; + cctx->thread_idx = i + 1; + } +} + +int ctl_cleanup_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) +{ + int ret = KNOT_EOK; + for (size_t i = 0; i < n_ctxs; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_lock(&cctx->mutex); + if (cctx->state == CONCURRENT_IDLE) { + knot_ctl_free(cctx->ctl); + cctx->ctl = NULL; + if (cctx->ret == KNOT_CTL_ESTOP) { + ret = cctx->ret; + } + } + pthread_mutex_unlock(&cctx->mutex); + } + return ret; +} + +void ctl_finalize_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) +{ + for (size_t i = 0; i < n_ctxs; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_lock(&cctx->mutex); + if (cctx->state == CONCURRENT_EMPTY) { + pthread_mutex_unlock(&cctx->mutex); + pthread_mutex_destroy(&cctx->mutex); + pthread_cond_destroy(&cctx->cond); + continue; + } + + cctx->state = CONCURRENT_KILLED; + pthread_cond_broadcast(&cctx->cond); + pthread_mutex_unlock(&cctx->mutex); + (void)pthread_join(cctx->thread, NULL); + + assert(cctx->state == CONCURRENT_FINISHED); + knot_ctl_free(cctx->ctl); + pthread_mutex_destroy(&cctx->mutex); + pthread_cond_destroy(&cctx->cond); + } +} + +static void *ctl_process_thread(void *arg) +{ + concurrent_ctl_ctx_t *ctx = arg; + rcu_register_thread(); + signals_setup(); // in fact, this blocks common signals so that they + // arrive to main thread instead of this one + + pthread_mutex_lock(&ctx->mutex); + while (ctx->state != CONCURRENT_KILLED) { + if (ctx->state != CONCURRENT_ASSIGNED) { + pthread_cond_wait(&ctx->cond, &ctx->mutex); + continue; + } + ctx->state = CONCURRENT_RUNNING; + bool exclusive = ctx->exclusive; + pthread_mutex_unlock(&ctx->mutex); + + // Not IDLE, ctx can be read without locking. + int ret = ctl_process(ctx->ctl, ctx->server, ctx->thread_idx, &exclusive); + + pthread_mutex_lock(&ctx->mutex); + ctx->ret = ret; + ctx->exclusive = exclusive; + if (ctx->state == CONCURRENT_RUNNING) { // not KILLED + ctx->state = CONCURRENT_IDLE; + pthread_cond_broadcast(&ctx->cond); + } + } + + knot_ctl_close(ctx->ctl); + + ctx->state = CONCURRENT_FINISHED; + pthread_mutex_unlock(&ctx->mutex); + rcu_unregister_thread(); + return NULL; +} + +static concurrent_ctl_ctx_t *find_free_ctx(concurrent_ctl_ctx_t *concurrent_ctxs, + size_t n_ctxs, knot_ctl_t *ctl) +{ + concurrent_ctl_ctx_t *res = NULL; + for (size_t i = 0; i < n_ctxs && res == NULL; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_lock(&cctx->mutex); + if (cctx->exclusive) { + while (cctx->state != CONCURRENT_IDLE) { + pthread_cond_wait(&cctx->cond, &cctx->mutex); + } + knot_ctl_free(cctx->ctl); + cctx->ctl = knot_ctl_clone(ctl); + if (cctx->ctl == NULL) { + cctx->exclusive = false; + pthread_mutex_unlock(&cctx->mutex); + break; + } + cctx->state = CONCURRENT_ASSIGNED; + res = cctx; + pthread_cond_broadcast(&cctx->cond); + } + pthread_mutex_unlock(&cctx->mutex); + } + for (size_t i = 0; i < n_ctxs && res == NULL; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_lock(&cctx->mutex); + switch (cctx->state) { + case CONCURRENT_EMPTY: + (void)thread_create_nosignal(&cctx->thread, ctl_process_thread, cctx); + break; + case CONCURRENT_IDLE: + knot_ctl_free(cctx->ctl); + pthread_cond_broadcast(&cctx->cond); + break; + default: + pthread_mutex_unlock(&cctx->mutex); + continue; + } + cctx->ctl = knot_ctl_clone(ctl); + if (cctx->ctl != NULL) { + cctx->state = CONCURRENT_ASSIGNED; + res = cctx; + } + pthread_mutex_unlock(&cctx->mutex); + } + return res; +} + +int ctl_manage(knot_ctl_t *ctl, server_t *server, bool *exclusive, + int thread_idx, concurrent_ctl_ctx_t *ctxs, size_t n_ctxs) +{ + int ret = KNOT_EOK; + if (*exclusive || find_free_ctx(ctxs, n_ctxs, ctl) == NULL) { + ret = ctl_process(ctl, server, thread_idx, exclusive); + knot_ctl_close(ctl); + } + return ret; +} diff --git a/src/knot/ctl/threads.h b/src/knot/ctl/threads.h new file mode 100644 index 0000000000..0d7dccd055 --- /dev/null +++ b/src/knot/ctl/threads.h @@ -0,0 +1,72 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. and contributors + * SPDX-License-Identifier: GPL-2.0-or-later + * For more information, see <https://www.knot-dns.cz/> + */ + +#pragma once + +#include "knot/ctl/process.h" + +typedef enum { + CONCURRENT_EMPTY = 0, // fresh cctx without a thread. + CONCURRENT_ASSIGNED, // cctx assigned to process a command. + CONCURRENT_RUNNING, // ctl command is being processed in the thread. + CONCURRENT_IDLE, // command has been processed, waiting for a new one. + CONCURRENT_KILLED, // cctx cleanup has started. + CONCURRENT_FINISHED, // after having been killed, the thread is being joined. +} concurrent_ctl_state_t; + +typedef struct { + concurrent_ctl_state_t state; + pthread_mutex_t mutex; // Protects .state. + pthread_cond_t cond; + knot_ctl_t *ctl; + server_t *server; + pthread_t thread; + int ret; + int thread_idx; + bool exclusive; +} concurrent_ctl_ctx_t; + +/*! + * \brief Initialize CTL thread processing contexts. + * + * \param concurrent_ctxs Structures to initialize. + * \param n_ctxs Their number/count. + * \param server Server structure. + */ +void ctl_init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, server_t *server); + +/*! + * \brief Regularly check the state of parallel CTL processing workers. + * + * \param concurrent_ctxs Parallel CTL processing contexts. + * \param n_ctxs Their number/count. + * + * \retval KNOT_ESTOP Server shutdown requested. + * \retval KNOT_EOK Otherwise. + */ +int ctl_cleanup_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs); + +/*! + * \brief De-initialize CTL thread processing contexts. + * + * \param concurrent_ctxs Structures to de-initialize. + * \param n_ctxs Their number/count. + */ +void ctl_finalize_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs); + +/*! + * Find/create a thread processing incomming control commands. + * + * \param[in] ctl Control context. + * \param[in] server Server instance. + * \param[in/out] exclusive CTLs are being processed exclusively by calling thread. + * \param[in] thread_idx Calling thread index. + * \param[in] ctxs CTL thread contexts. + * \param[in] n_ctxs Number of CTL thread contexts. + * + * \return Error code, KNOT_EOK if successful, KNOT_CTL_ESTOP if server shutdown desired. + */ +int ctl_manage(knot_ctl_t *ctl, server_t *server, bool *exclusive, + int thread_idx, concurrent_ctl_ctx_t *ctxs, size_t n_ctxs); diff --git a/src/knot/server/signals.c b/src/knot/server/signals.c new file mode 100644 index 0000000000..a7f4af097a --- /dev/null +++ b/src/knot/server/signals.c @@ -0,0 +1,86 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. and contributors + * SPDX-License-Identifier: GPL-2.0-or-later + * For more information, see <https://www.knot-dns.cz/> + */ + +#include <signal.h> +#include <stdlib.h> + +#include "knot/server/signals.h" + +volatile bool signals_req_stop = false; +volatile bool signals_req_reload = false; +volatile bool signals_req_zones_reload = false; + +struct signal { + int signum; + bool handle; +}; + +static const struct signal SIGNALS[] = { + { SIGHUP, true }, /* Reload server. */ + { SIGUSR1, true }, /* Reload zones. */ + { SIGINT, true }, /* Terminate server. */ + { SIGTERM, true }, /* Terminate server. */ + { SIGALRM, false }, /* Internal thread synchronization. */ + { SIGPIPE, false }, /* Ignored. Some I/O errors. */ + { 0 } +}; + +static void handle_signal(int signum) +{ + switch (signum) { + case SIGHUP: + signals_req_reload = true; + break; + case SIGUSR1: + signals_req_zones_reload = true; + break; + case SIGINT: + case SIGTERM: + if (signals_req_stop) { + exit(EXIT_FAILURE); + } + signals_req_stop = true; + break; + default: + /* ignore */ + break; + } +} + +void signals_setup(void) +{ + /* Block all signals. */ + static sigset_t all; + sigfillset(&all); + sigdelset(&all, SIGPROF); + sigdelset(&all, SIGQUIT); + sigdelset(&all, SIGILL); + sigdelset(&all, SIGABRT); + sigdelset(&all, SIGBUS); + sigdelset(&all, SIGFPE); + sigdelset(&all, SIGSEGV); + + /* Setup handlers. */ + struct sigaction action = { .sa_handler = handle_signal }; + for (const struct signal *s = SIGNALS; s->signum > 0; s++) { + sigaction(s->signum, &action, NULL); + } + + pthread_sigmask(SIG_SETMASK, &all, NULL); +} + +void signals_enable(void) +{ + sigset_t mask; + sigemptyset(&mask); + + for (const struct signal *s = SIGNALS; s->signum > 0; s++) { + if (s->handle) { + sigaddset(&mask, s->signum); + } + } + + pthread_sigmask(SIG_UNBLOCK, &mask, NULL); +} diff --git a/src/knot/server/signals.h b/src/knot/server/signals.h new file mode 100644 index 0000000000..fc8970e427 --- /dev/null +++ b/src/knot/server/signals.h @@ -0,0 +1,18 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. and contributors + * SPDX-License-Identifier: GPL-2.0-or-later + * For more information, see <https://www.knot-dns.cz/> + */ + +#pragma once + +#include <stdbool.h> + +extern volatile bool signals_req_stop; +extern volatile bool signals_req_reload; +extern volatile bool signals_req_zones_reload; + +/*! \brief Setup signal handlers and blocking mask. */ +void signals_setup(void); + +/*! \brief Unblock server control signals. */ +void signals_enable(void); diff --git a/src/utils/knotd/main.c b/src/utils/knotd/main.c index 420d018561..e98498f8cc 100644 --- a/src/utils/knotd/main.c +++ b/src/utils/knotd/main.c @@ -24,7 +24,7 @@ #include "contrib/strtonum.h" #include "contrib/threads.h" #include "contrib/time.h" -#include "knot/ctl/process.h" +#include "knot/ctl/threads.h" #include "knot/conf/conf.h" #include "knot/conf/migration.h" #include "knot/conf/module.h" @@ -34,37 +34,12 @@ #include "knot/common/stats.h" #include "knot/common/systemd.h" #include "knot/server/server.h" +#include "knot/server/signals.h" #include "knot/server/tcp-handler.h" #include "utils/common/params.h" #define PROGRAM_NAME "knotd" -typedef enum { - CONCURRENT_EMPTY = 0, // fresh cctx without a thread. - CONCURRENT_ASSIGNED, // cctx assigned to process a command. - CONCURRENT_RUNNING, // ctl command is being processed in the thread. - CONCURRENT_IDLE, // command has been processed, waiting for a new one. - CONCURRENT_KILLED, // cctx cleanup has started. - CONCURRENT_FINISHED, // after having been killed, the thread is being joined. -} concurrent_ctl_state_t; - -typedef struct { - concurrent_ctl_state_t state; - pthread_mutex_t mutex; // Protects .state. - pthread_cond_t cond; - knot_ctl_t *ctl; - server_t *server; - pthread_t thread; - int ret; - int thread_idx; - bool exclusive; -} concurrent_ctl_ctx_t; - -/* Signal flags. */ -static volatile bool sig_req_stop = false; -static volatile bool sig_req_reload = false; -static volatile bool sig_req_zones_reload = false; - static int make_daemon(int nochdir, int noclose) { int ret; @@ -122,83 +97,6 @@ static int make_daemon(int nochdir, int noclose) return 0; } -struct signal { - int signum; - bool handle; -}; - -/*! \brief Signals used by the server. */ -static const struct signal SIGNALS[] = { - { SIGHUP, true }, /* Reload server. */ - { SIGUSR1, true }, /* Reload zones. */ - { SIGINT, true }, /* Terminate server. */ - { SIGTERM, true }, /* Terminate server. */ - { SIGALRM, false }, /* Internal thread synchronization. */ - { SIGPIPE, false }, /* Ignored. Some I/O errors. */ - { 0 } -}; - -/*! \brief Server signal handler. */ -static void handle_signal(int signum) -{ - switch (signum) { - case SIGHUP: - sig_req_reload = true; - break; - case SIGUSR1: - sig_req_zones_reload = true; - break; - case SIGINT: - case SIGTERM: - if (sig_req_stop) { - exit(EXIT_FAILURE); - } - sig_req_stop = true; - break; - default: - /* ignore */ - break; - } -} - -/*! \brief Setup signal handlers and blocking mask. */ -static void setup_signals(void) -{ - /* Block all signals. */ - static sigset_t all; - sigfillset(&all); - sigdelset(&all, SIGPROF); - sigdelset(&all, SIGQUIT); - sigdelset(&all, SIGILL); - sigdelset(&all, SIGABRT); - sigdelset(&all, SIGBUS); - sigdelset(&all, SIGFPE); - sigdelset(&all, SIGSEGV); - - /* Setup handlers. */ - struct sigaction action = { .sa_handler = handle_signal }; - for (const struct signal *s = SIGNALS; s->signum > 0; s++) { - sigaction(s->signum, &action, NULL); - } - - pthread_sigmask(SIG_SETMASK, &all, NULL); -} - -/*! \brief Unblock server control signals. */ -static void enable_signals(void) -{ - sigset_t mask; - sigemptyset(&mask); - - for (const struct signal *s = SIGNALS; s->signum > 0; s++) { - if (s->handle) { - sigaddset(&mask, s->signum); - } - } - - pthread_sigmask(SIG_UNBLOCK, &mask, NULL); -} - /*! \brief Drop POSIX 1003.1e capabilities. */ static void drop_capabilities(void) { @@ -255,158 +153,6 @@ static void check_loaded(server_t *server) dbus_emit_running(true); } -static void *ctl_process_thread(void *arg); - -/*! - * Try to find an empty ctl processing context and if successful, - * prepare to lauch the incomming command processing in it. - * - * \param[in] concurrent_ctxs Configured concurrent control contexts. - * \param[in] n_ctxs Number of configured concurrent control contexts. - * \param[in] ctl Control context. - * - * \return Assigned concurrent control context, or NULL. - */ - -static concurrent_ctl_ctx_t *find_free_ctx(concurrent_ctl_ctx_t *concurrent_ctxs, - size_t n_ctxs, knot_ctl_t *ctl) -{ - concurrent_ctl_ctx_t *res = NULL; - for (size_t i = 0; i < n_ctxs && res == NULL; i++) { - concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; - pthread_mutex_lock(&cctx->mutex); - if (cctx->exclusive) { - while (cctx->state != CONCURRENT_IDLE) { - pthread_cond_wait(&cctx->cond, &cctx->mutex); - } - knot_ctl_free(cctx->ctl); - cctx->ctl = knot_ctl_clone(ctl); - if (cctx->ctl == NULL) { - cctx->exclusive = false; - pthread_mutex_unlock(&cctx->mutex); - break; - } - cctx->state = CONCURRENT_ASSIGNED; - res = cctx; - pthread_cond_broadcast(&cctx->cond); - } - pthread_mutex_unlock(&cctx->mutex); - } - for (size_t i = 0; i < n_ctxs && res == NULL; i++) { - concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; - pthread_mutex_lock(&cctx->mutex); - switch (cctx->state) { - case CONCURRENT_EMPTY: - (void)thread_create_nosignal(&cctx->thread, ctl_process_thread, cctx); - break; - case CONCURRENT_IDLE: - knot_ctl_free(cctx->ctl); - pthread_cond_broadcast(&cctx->cond); - break; - default: - pthread_mutex_unlock(&cctx->mutex); - continue; - } - cctx->ctl = knot_ctl_clone(ctl); - if (cctx->ctl != NULL) { - cctx->state = CONCURRENT_ASSIGNED; - res = cctx; - } - pthread_mutex_unlock(&cctx->mutex); - } - return res; -} - -static void init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, server_t *server) -{ - for (size_t i = 0; i < n_ctxs; i++) { - concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; - pthread_mutex_init(&cctx->mutex, NULL); - pthread_cond_init(&cctx->cond, NULL); - cctx->server = server; - cctx->thread_idx = i + 1; - } -} - -static int cleanup_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) -{ - int ret = KNOT_EOK; - for (size_t i = 0; i < n_ctxs; i++) { - concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; - pthread_mutex_lock(&cctx->mutex); - if (cctx->state == CONCURRENT_IDLE) { - knot_ctl_free(cctx->ctl); - cctx->ctl = NULL; - if (cctx->ret == KNOT_CTL_ESTOP) { - ret = cctx->ret; - } - } - pthread_mutex_unlock(&cctx->mutex); - } - return ret; -} - -static void finalize_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) -{ - for (size_t i = 0; i < n_ctxs; i++) { - concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; - pthread_mutex_lock(&cctx->mutex); - if (cctx->state == CONCURRENT_EMPTY) { - pthread_mutex_unlock(&cctx->mutex); - pthread_mutex_destroy(&cctx->mutex); - pthread_cond_destroy(&cctx->cond); - continue; - } - - cctx->state = CONCURRENT_KILLED; - pthread_cond_broadcast(&cctx->cond); - pthread_mutex_unlock(&cctx->mutex); - (void)pthread_join(cctx->thread, NULL); - - assert(cctx->state == CONCURRENT_FINISHED); - knot_ctl_free(cctx->ctl); - pthread_mutex_destroy(&cctx->mutex); - pthread_cond_destroy(&cctx->cond); - } -} - -static void *ctl_process_thread(void *arg) -{ - concurrent_ctl_ctx_t *ctx = arg; - rcu_register_thread(); - setup_signals(); // in fact, this blocks common signals so that they - // arrive to main thread instead of this one - - pthread_mutex_lock(&ctx->mutex); - while (ctx->state != CONCURRENT_KILLED) { - if (ctx->state != CONCURRENT_ASSIGNED) { - pthread_cond_wait(&ctx->cond, &ctx->mutex); - continue; - } - ctx->state = CONCURRENT_RUNNING; - bool exclusive = ctx->exclusive; - pthread_mutex_unlock(&ctx->mutex); - - // Not IDLE, ctx can be read without locking. - int ret = ctl_process(ctx->ctl, ctx->server, ctx->thread_idx, &exclusive); - - pthread_mutex_lock(&ctx->mutex); - ctx->ret = ret; - ctx->exclusive = exclusive; - if (ctx->state == CONCURRENT_RUNNING) { // not KILLED - ctx->state = CONCURRENT_IDLE; - pthread_cond_broadcast(&ctx->cond); - } - } - - knot_ctl_close(ctx->ctl); - - ctx->state = CONCURRENT_FINISHED; - pthread_mutex_unlock(&ctx->mutex); - rcu_unregister_thread(); - return NULL; -} - /*! \brief Event loop listening for signals and remote commands. */ static void event_loop(server_t *server, const char *socket, bool daemonize, unsigned long pid) @@ -452,10 +198,10 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, } free(listen); - enable_signals(); + signals_enable(); concurrent_ctl_ctx_t concurrent_ctxs[CTL_MAX_CONCURRENT] = { 0 }; - init_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT, server); + ctl_init_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT, server); bool main_thread_exclusive = false; /* Notify systemd about successful start. */ @@ -469,28 +215,30 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, /* Run event loop. */ for (;;) { /* Interrupts. */ - if (sig_req_reload && !sig_req_stop) { - sig_req_reload = false; + if (signals_req_reload && !signals_req_stop) { + signals_req_reload = false; pthread_rwlock_wrlock(&server->ctl_lock); server_reload(server, RELOAD_FULL); pthread_rwlock_unlock(&server->ctl_lock); } - if (sig_req_zones_reload && !sig_req_stop) { - sig_req_zones_reload = false; - reload_t mode = ATOMIC_GET(server->catalog_upd_signal) ? RELOAD_CATALOG : RELOAD_ZONES; + if (signals_req_zones_reload && !signals_req_stop) { + signals_req_zones_reload = false; + reload_t mode = ATOMIC_GET(server->catalog_upd_signal) ? + RELOAD_CATALOG : RELOAD_ZONES; pthread_rwlock_wrlock(&server->ctl_lock); ATOMIC_SET(server->catalog_upd_signal, false); server_update_zones(conf(), server, mode); pthread_rwlock_unlock(&server->ctl_lock); } - if (sig_req_stop || cleanup_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT) == KNOT_CTL_ESTOP) { + if (signals_req_stop || + ctl_cleanup_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT) == KNOT_CTL_ESTOP) { break; } // Update control timeout. knot_ctl_set_timeout(ctl, conf()->cache.ctl_timeout); - if (sig_req_reload || sig_req_zones_reload) { + if (signals_req_reload || signals_req_zones_reload) { continue; } @@ -501,17 +249,14 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, continue; } - if (main_thread_exclusive || - find_free_ctx(concurrent_ctxs, CTL_MAX_CONCURRENT, ctl) == NULL) { - ret = ctl_process(ctl, server, 0, &main_thread_exclusive); - knot_ctl_close(ctl); - if (ret == KNOT_CTL_ESTOP) { - break; - } + ret = ctl_manage(ctl, server, &main_thread_exclusive, 0, + concurrent_ctxs, CTL_MAX_CONCURRENT); + if (ret == KNOT_CTL_ESTOP) { + break; } } - finalize_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT); + ctl_finalize_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT); if (conf()->cache.srv_dbus_event & DBUS_EVENT_RUNNING) { dbus_emit_running(false); @@ -683,7 +428,7 @@ int main(int argc, char **argv) } /* Setup base signal handling. */ - setup_signals(); + signals_setup(); /* Initialize cryptographic backend. */ dnssec_crypto_init(); -- GitLab From b550c5e53f163a90b0cc5f3df83cdc42082e0535 Mon Sep 17 00:00:00 2001 From: Libor Peltan <libor.peltan@nic.cz> Date: Fri, 3 Jan 2025 14:52:16 +0100 Subject: [PATCH 2/5] confio: additional check of same thread for whole confio txn --- src/knot/conf/base.h | 4 ++++ src/knot/conf/confio.c | 18 +++++++++++++++++- src/libknot/errcode.h | 1 + src/libknot/error.c | 1 + 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/knot/conf/base.h b/src/knot/conf/base.h index a6ae1557f1..675e8b956f 100644 --- a/src/knot/conf/base.h +++ b/src/knot/conf/base.h @@ -5,6 +5,8 @@ #pragma once +#include <pthread.h> + #include "libknot/libknot.h" #include "libknot/yparser/ypschema.h" #include "contrib/qp-trie/trie.h" @@ -119,6 +121,8 @@ typedef struct { yp_flag_t flags; /*! Changed zones. */ trie_t *zones; + /*! Thread that initiated the txn (should access it exclusively). */ + pthread_t thread_id; } io; /*! Current config file (for reload if started with config file). */ diff --git a/src/knot/conf/confio.c b/src/knot/conf/confio.c index cb65080630..540cb45aa0 100644 --- a/src/knot/conf/confio.c +++ b/src/knot/conf/confio.c @@ -45,6 +45,13 @@ static void io_reset_bin( io->data.bin_len = bin_len; } +static bool same_thread(void) +{ + return conf()->io.txn == NULL || + pthread_equal(conf()->io.thread_id, pthread_self()) != 0; +} +#define CHECK_SAME_THREAD if (!same_thread()) { return KNOT_TXN_ETHREAD; } + int conf_io_begin( bool child) { @@ -55,6 +62,7 @@ int conf_io_begin( } else if (conf()->io.txn == NULL && child) { return KNOT_TXN_ENOTEXISTS; } + CHECK_SAME_THREAD knot_db_txn_t *parent = conf()->io.txn; knot_db_txn_t *txn = (parent == NULL) ? conf()->io.txn_stack : parent + 1; @@ -74,6 +82,7 @@ int conf_io_begin( } conf()->io.txn = txn; + conf()->io.thread_id = pthread_self(); // Reset master transaction flags. if (!child) { @@ -95,6 +104,7 @@ int conf_io_commit( (child && conf()->io.txn == conf()->io.txn_stack)) { return KNOT_TXN_ENOTEXISTS; } + CHECK_SAME_THREAD knot_db_txn_t *txn = child ? conf()->io.txn : conf()->io.txn_stack; @@ -112,7 +122,7 @@ void conf_io_abort( assert(conf() != NULL); if (conf()->io.txn == NULL || - (child && conf()->io.txn == conf()->io.txn_stack)) { + (child && conf()->io.txn == conf()->io.txn_stack) || !same_thread()) { return; } @@ -163,6 +173,7 @@ int conf_io_list( if (conf()->io.txn == NULL && !get_current) { return KNOT_TXN_ENOTEXISTS; } + CHECK_SAME_THREAD // List schema sections by default. if (key0 == NULL) { @@ -564,6 +575,7 @@ int conf_io_diff( if (conf()->io.txn == NULL) { return KNOT_TXN_ENOTEXISTS; } + CHECK_SAME_THREAD // Compare all sections by default. if (key0 == NULL) { @@ -764,6 +776,7 @@ int conf_io_get( if (conf()->io.txn == NULL && !get_current) { return KNOT_TXN_ENOTEXISTS; } + CHECK_SAME_THREAD // List all sections by default. if (key0 == NULL) { @@ -1006,6 +1019,7 @@ int conf_io_set( if (conf()->io.txn == NULL) { return KNOT_TXN_ENOTEXISTS; } + CHECK_SAME_THREAD // At least key0 must be specified. if (key0 == NULL) { @@ -1201,6 +1215,7 @@ int conf_io_unset( if (conf()->io.txn == NULL) { return KNOT_TXN_ENOTEXISTS; } + CHECK_SAME_THREAD // Unset all sections by default. if (key0 == NULL) { @@ -1554,6 +1569,7 @@ int conf_io_check( if (conf()->io.txn == NULL) { return KNOT_TXN_ENOTEXISTS; } + CHECK_SAME_THREAD int ret; diff --git a/src/libknot/errcode.h b/src/libknot/errcode.h index ee754862b7..d6675e4b1b 100644 --- a/src/libknot/errcode.h +++ b/src/libknot/errcode.h @@ -156,6 +156,7 @@ enum knot_error { /* Transaction errors. */ KNOT_TXN_EEXISTS, KNOT_TXN_ENOTEXISTS, + KNOT_TXN_ETHREAD, /* DNSSEC errors. */ KNOT_INVALID_PUBLIC_KEY, diff --git a/src/libknot/error.c b/src/libknot/error.c index 62db11ac53..12f5be392c 100644 --- a/src/libknot/error.c +++ b/src/libknot/error.c @@ -155,6 +155,7 @@ static const struct error errors[] = { /* Transaction errors. */ { KNOT_TXN_EEXISTS, "too many transactions" }, { KNOT_TXN_ENOTEXISTS, "no active transaction" }, + { KNOT_TXN_ETHREAD, "transaction thread mismatch" }, /* DNSSEC errors. */ { KNOT_INVALID_PUBLIC_KEY, "invalid public key" }, -- GitLab From 5a883eb6e47609678319e59f6774a8f0e7f4bd6c Mon Sep 17 00:00:00 2001 From: Libor Peltan <libor.peltan@nic.cz> Date: Fri, 3 Jan 2025 16:45:43 +0100 Subject: [PATCH 3/5] ctl: move socket handling to separate thread (not main thread) --- src/knot/ctl/threads.c | 56 ++++++++++++++++++++++++++++++++++++++++++ src/knot/ctl/threads.h | 22 +++++++++++++++++ src/utils/knotd/main.c | 30 ++++++++-------------- 3 files changed, 88 insertions(+), 20 deletions(-) diff --git a/src/knot/ctl/threads.c b/src/knot/ctl/threads.c index d8dd20d5b7..b9d527d441 100644 --- a/src/knot/ctl/threads.c +++ b/src/knot/ctl/threads.c @@ -3,6 +3,7 @@ * For more information, see <https://www.knot-dns.cz/> */ +#include <signal.h> #include <urcu.h> #include "contrib/threads.h" @@ -158,3 +159,58 @@ int ctl_manage(knot_ctl_t *ctl, server_t *server, bool *exclusive, } return ret; } + +static int ctl_socket_thr(struct dthread *dt) +{ + ctl_socket_ctx_t *ctx = dt->data; + + concurrent_ctl_ctx_t concurrent_ctxs[CTL_MAX_CONCURRENT] = { 0 }; + ctl_init_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT, ctx->server); + bool this_thread_exclusive = false, stopped = false; + + while (dt->unit->threads[0]->state & ThreadActive) { + if (ctl_cleanup_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT) == KNOT_CTL_ESTOP) { + stopped = true; + break; + } + + // Update control timeout. + knot_ctl_set_timeout(ctx->ctl, conf()->cache.ctl_timeout); + + int ret = knot_ctl_accept(ctx->ctl); + if (ret != KNOT_EOK) { + continue; + } + + ret = ctl_manage(ctx->ctl, ctx->server, &this_thread_exclusive, 0, concurrent_ctxs, CTL_MAX_CONCURRENT); + if (ret == KNOT_CTL_ESTOP) { + stopped = true; + break; + } + } + + if (stopped) { + (void)kill(getpid(), SIGTERM); + } + + ctl_finalize_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT); + + return 0; +} + +int ctl_socket_thr_init(ctl_socket_ctx_t *ctx) +{ + dt_unit_t *dts = dt_create(1, ctl_socket_thr, NULL, ctx); + if (dts == NULL) { + return KNOT_ENOMEM; + } + ctx->unit = dts; + return dt_start(dts); +} + +void ctl_socket_thr_end(ctl_socket_ctx_t *ctx) +{ + (void)dt_stop(ctx->unit); + (void)dt_join(ctx->unit); + dt_delete(&ctx->unit); +} diff --git a/src/knot/ctl/threads.h b/src/knot/ctl/threads.h index 0d7dccd055..78956fba50 100644 --- a/src/knot/ctl/threads.h +++ b/src/knot/ctl/threads.h @@ -28,6 +28,12 @@ typedef struct { bool exclusive; } concurrent_ctl_ctx_t; +typedef struct { + knot_ctl_t *ctl; + server_t *server; + dt_unit_t *unit; +} ctl_socket_ctx_t; + /*! * \brief Initialize CTL thread processing contexts. * @@ -70,3 +76,19 @@ void ctl_finalize_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs); */ int ctl_manage(knot_ctl_t *ctl, server_t *server, bool *exclusive, int thread_idx, concurrent_ctl_ctx_t *ctxs, size_t n_ctxs); + +/*! + * \brief Initialize CTL socket handling thread. + * + * \param ctx Socket thread context. + * + * \return KNOT_E* + */ +int ctl_socket_thr_init(ctl_socket_ctx_t *ctx); + +/*! + * \brief De-initialize CTL socket handling thread. + * + * \param ctx Socket thread context. + */ +void ctl_socket_thr_end(ctl_socket_ctx_t *ctx); diff --git a/src/utils/knotd/main.c b/src/utils/knotd/main.c index e98498f8cc..fc3d058d92 100644 --- a/src/utils/knotd/main.c +++ b/src/utils/knotd/main.c @@ -200,9 +200,12 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, signals_enable(); - concurrent_ctl_ctx_t concurrent_ctxs[CTL_MAX_CONCURRENT] = { 0 }; - ctl_init_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT, server); - bool main_thread_exclusive = false; + ctl_socket_ctx_t sctx = { .ctl = ctl, .server = server }; + ret = ctl_socket_thr_init(&sctx); + if (ret != KNOT_EOK) { + log_fatal("control, failed to launch socket thread (%s)", knot_strerror(ret)); + return; + } /* Notify systemd about successful start. */ systemd_ready_notify(); @@ -230,38 +233,25 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, server_update_zones(conf(), server, mode); pthread_rwlock_unlock(&server->ctl_lock); } - if (signals_req_stop || - ctl_cleanup_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT) == KNOT_CTL_ESTOP) { + if (signals_req_stop) { break; } - // Update control timeout. - knot_ctl_set_timeout(ctl, conf()->cache.ctl_timeout); - if (signals_req_reload || signals_req_zones_reload) { continue; } check_loaded(server); - ret = knot_ctl_accept(ctl); - if (ret != KNOT_EOK) { - continue; - } - - ret = ctl_manage(ctl, server, &main_thread_exclusive, 0, - concurrent_ctxs, CTL_MAX_CONCURRENT); - if (ret == KNOT_CTL_ESTOP) { - break; - } + sleep(5); // wait for signals to arrive } - ctl_finalize_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT); - if (conf()->cache.srv_dbus_event & DBUS_EVENT_RUNNING) { dbus_emit_running(false); } + ctl_socket_thr_end(&sctx); + /* Unbind the control socket. */ knot_ctl_unbind(ctl); knot_ctl_free(ctl); -- GitLab From ebbd2bbe157a97d6f6d9ee1434213fdaddb844d5 Mon Sep 17 00:00:00 2001 From: Libor Peltan <libor.peltan@nic.cz> Date: Fri, 17 Jan 2025 13:36:46 +0100 Subject: [PATCH 4/5] implemented multi-socket CTL... ...with each socket handled by a thread --- doc/reference.rst | 10 +- src/knot/conf/schema.c | 2 +- src/knot/conf/tools.c | 14 +++ src/knot/conf/tools.h | 4 + src/knot/ctl/commands.c | 2 +- src/knot/ctl/process.c | 2 +- src/knot/ctl/process.h | 4 +- src/knot/ctl/threads.c | 54 +++++----- src/knot/ctl/threads.h | 31 ++---- src/knot/server/dthreads.c | 5 +- src/knot/server/dthreads.h | 1 + src/utils/knotd/main.c | 114 ++++++++++++-------- tests-extra/tests/catalog/basic/test.py | 14 +-- tests-extra/tests/ctl/basic/test.py | 6 +- tests-extra/tests/ctl/concurrent/test.py | 13 +-- tests-extra/tests/ctl/txn_zone_conf/test.py | 21 ++-- tests-extra/tests/ixfr/block_notify/test.py | 7 +- tests-extra/tests/modules/geoip/test.py | 4 +- tests-extra/tests/zone/zij_reload/test.py | 7 +- tests-extra/tools/dnstest/server.py | 26 ++++- 20 files changed, 210 insertions(+), 131 deletions(-) diff --git a/doc/reference.rst b/doc/reference.rst index 0cd261693f..937ec526fb 100644 --- a/doc/reference.rst +++ b/doc/reference.rst @@ -983,7 +983,7 @@ Configuration of the server control interface. :: control: - listen: STR + listen: STR ... backlog: INT timeout: TIME @@ -995,6 +995,14 @@ listen A UNIX socket :ref:`path<default_paths>` where the server listens for control commands. +Multiple sockets can be configured for parallel independent use, but their +number is limited (currently to 4), and some operations might be delayed due to +mutexes. + +.. WARNING:: + Transaction-like operations, such as conf-begin/set/commit/abort or + zone-begin/set/commit/abort, must be performed using the same socket. + Change of this parameter requires restart of the Knot server to take effect. *Default:* :ref:`rundir<server_rundir>`\ ``/knot.sock`` diff --git a/src/knot/conf/schema.c b/src/knot/conf/schema.c index 5ac419dac9..0a8820a42b 100644 --- a/src/knot/conf/schema.c +++ b/src/knot/conf/schema.c @@ -270,7 +270,7 @@ static const yp_item_t desc_xdp[] = { }; static const yp_item_t desc_control[] = { - { C_LISTEN, YP_TSTR, YP_VSTR = { "knot.sock" } }, + { C_LISTEN, YP_TSTR, YP_VSTR = { "knot.sock" }, YP_FMULTI, { check_ctl_listen } }, { C_BACKLOG, YP_TINT, YP_VINT = { 0, UINT16_MAX, 5 } }, { C_TIMEOUT, YP_TINT, YP_VINT = { 0, INT32_MAX / 1000, 5, YP_STIME } }, { C_COMMENT, YP_TSTR, YP_VNONE }, diff --git a/src/knot/conf/tools.c b/src/knot/conf/tools.c index ff3c44d643..f56d704ab3 100644 --- a/src/knot/conf/tools.c +++ b/src/knot/conf/tools.c @@ -29,6 +29,7 @@ #include "knot/conf/module.h" #include "knot/conf/schema.h" #include "knot/common/log.h" +#include "knot/ctl/process.h" #include "knot/updates/acl.h" #include "knot/zone/serial.h" #include "knot/zone/skip.h" @@ -358,6 +359,19 @@ int check_modulo( return KNOT_EOK; } +int check_ctl_listen( + knotd_conf_check_args_t *args) +{ + conf_val_t val = conf_get_txn(args->extra->conf, args->extra->txn, + C_CTL, C_LISTEN); + if (conf_val_count(&val) > CTL_MAX_CONCURRENT / 2) { + args->err_str = "too many control sockets configured"; + return KNOT_EINVAL; + } + + return KNOT_EOK; +} + int check_modulo_shift( knotd_conf_check_args_t *args) { diff --git a/src/knot/conf/tools.h b/src/knot/conf/tools.h index 3bea0ad46d..919d5cd0f8 100644 --- a/src/knot/conf/tools.h +++ b/src/knot/conf/tools.h @@ -83,6 +83,10 @@ int check_modulo_shift( knotd_conf_check_args_t *args ); +int check_ctl_listen( + knotd_conf_check_args_t *args +); + int check_database( knotd_conf_check_args_t *args ); diff --git a/src/knot/ctl/commands.c b/src/knot/ctl/commands.c index a8437ee924..778fd84eb0 100644 --- a/src/knot/ctl/commands.c +++ b/src/knot/ctl/commands.c @@ -61,7 +61,7 @@ static struct { sizeof(((send_ctx_t *)0)->ttl) + sizeof(((send_ctx_t *)0)->type) + sizeof(((send_ctx_t *)0)->rdata)]; -} ctl_globals[CTL_MAX_CONCURRENT + 1]; +} ctl_globals[CTL_MAX_CONCURRENT]; static bool allow_blocking_while_ctl_txn(zone_event_type_t event) { diff --git a/src/knot/ctl/process.c b/src/knot/ctl/process.c index 0ca91a6b1c..df82fa3270 100644 --- a/src/knot/ctl/process.c +++ b/src/knot/ctl/process.c @@ -10,7 +10,7 @@ #include "contrib/openbsd/strlcat.h" #include "contrib/string.h" -int ctl_process(knot_ctl_t *ctl, server_t *server, int thread_idx, bool *exclusive) +int ctl_process(knot_ctl_t *ctl, server_t *server, unsigned thread_idx, bool *exclusive) { if (ctl == NULL || server == NULL) { return KNOT_EINVAL; diff --git a/src/knot/ctl/process.h b/src/knot/ctl/process.h index f0ce57ff38..a49c19e430 100644 --- a/src/knot/ctl/process.h +++ b/src/knot/ctl/process.h @@ -8,7 +8,7 @@ #include "libknot/libknot.h" #include "knot/server/server.h" -#define CTL_MAX_CONCURRENT 8 // Number of CTL threads EXCLUDING the main thread which can also process CTL. +#define CTL_MAX_CONCURRENT 8 // Number of CTL threads (total for all sockets combined) to run in parallel. /*! * Processes incoming control commands. @@ -20,4 +20,4 @@ * * \return Error code, KNOT_EOK if successful. */ -int ctl_process(knot_ctl_t *ctl, server_t *server, int thread_idx, bool *exclusive); +int ctl_process(knot_ctl_t *ctl, server_t *server, unsigned thread_idx, bool *exclusive); diff --git a/src/knot/ctl/threads.c b/src/knot/ctl/threads.c index b9d527d441..1bb9227fe8 100644 --- a/src/knot/ctl/threads.c +++ b/src/knot/ctl/threads.c @@ -4,20 +4,23 @@ */ #include <signal.h> +#include <string.h> #include <urcu.h> #include "contrib/threads.h" #include "knot/ctl/threads.h" #include "knot/server/signals.h" -void ctl_init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, server_t *server) +void ctl_init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, + server_t *server, unsigned thr_idx_from) { for (size_t i = 0; i < n_ctxs; i++) { concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + memset(cctx, 0, sizeof(*cctx)); pthread_mutex_init(&cctx->mutex, NULL); pthread_cond_init(&cctx->cond, NULL); cctx->server = server; - cctx->thread_idx = i + 1; + cctx->thread_idx = thr_idx_from + i + 1; } } @@ -149,58 +152,59 @@ static concurrent_ctl_ctx_t *find_free_ctx(concurrent_ctl_ctx_t *concurrent_ctxs return res; } -int ctl_manage(knot_ctl_t *ctl, server_t *server, bool *exclusive, - int thread_idx, concurrent_ctl_ctx_t *ctxs, size_t n_ctxs) -{ - int ret = KNOT_EOK; - if (*exclusive || find_free_ctx(ctxs, n_ctxs, ctl) == NULL) { - ret = ctl_process(ctl, server, thread_idx, exclusive); - knot_ctl_close(ctl); - } - return ret; -} - static int ctl_socket_thr(struct dthread *dt) { ctl_socket_ctx_t *ctx = dt->data; + assert(dt == ctx->unit->threads[dt->idx]); - concurrent_ctl_ctx_t concurrent_ctxs[CTL_MAX_CONCURRENT] = { 0 }; - ctl_init_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT, ctx->server); - bool this_thread_exclusive = false, stopped = false; + unsigned sock_thr_count = ctx->thr_count - 1; + unsigned thr_idx = dt->idx * ctx->thr_count; + knot_ctl_t *thr_ctl = ctx->ctls[dt->idx]; + bool thr_exclusive = false, stopped = false; + + concurrent_ctl_ctx_t concurrent_ctxs[sock_thr_count]; + ctl_init_ctxs(concurrent_ctxs, sock_thr_count, ctx->server, thr_idx); while (dt->unit->threads[0]->state & ThreadActive) { - if (ctl_cleanup_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT) == KNOT_CTL_ESTOP) { + if (ctl_cleanup_ctxs(concurrent_ctxs, sock_thr_count) == KNOT_CTL_ESTOP) { stopped = true; break; } - // Update control timeout. - knot_ctl_set_timeout(ctx->ctl, conf()->cache.ctl_timeout); + knot_ctl_set_timeout(thr_ctl, conf()->cache.ctl_timeout); - int ret = knot_ctl_accept(ctx->ctl); + int ret = knot_ctl_accept(thr_ctl); if (ret != KNOT_EOK) { continue; } - ret = ctl_manage(ctx->ctl, ctx->server, &this_thread_exclusive, 0, concurrent_ctxs, CTL_MAX_CONCURRENT); + if (thr_exclusive || + find_free_ctx(concurrent_ctxs, sock_thr_count, thr_ctl) == NULL) { + ret = ctl_process(thr_ctl, ctx->server, thr_idx, &thr_exclusive); + knot_ctl_close(thr_ctl); + } if (ret == KNOT_CTL_ESTOP) { stopped = true; break; } } + ctl_finalize_ctxs(concurrent_ctxs, sock_thr_count); + if (stopped) { (void)kill(getpid(), SIGTERM); } - ctl_finalize_ctxs(concurrent_ctxs, CTL_MAX_CONCURRENT); - return 0; } -int ctl_socket_thr_init(ctl_socket_ctx_t *ctx) +int ctl_socket_thr_init(ctl_socket_ctx_t *ctx, unsigned sock_count) { - dt_unit_t *dts = dt_create(1, ctl_socket_thr, NULL, ctx); + if (sock_count == 0 || ctx->thr_count < 2) { + return KNOT_EINVAL; + } + + dt_unit_t *dts = dt_create(sock_count, ctl_socket_thr, NULL, ctx); if (dts == NULL) { return KNOT_ENOMEM; } diff --git a/src/knot/ctl/threads.h b/src/knot/ctl/threads.h index 78956fba50..686a3dbf4e 100644 --- a/src/knot/ctl/threads.h +++ b/src/knot/ctl/threads.h @@ -29,9 +29,10 @@ typedef struct { } concurrent_ctl_ctx_t; typedef struct { - knot_ctl_t *ctl; + knot_ctl_t **ctls; server_t *server; dt_unit_t *unit; + unsigned thr_count; } ctl_socket_ctx_t; /*! @@ -40,8 +41,10 @@ typedef struct { * \param concurrent_ctxs Structures to initialize. * \param n_ctxs Their number/count. * \param server Server structure. + * \param thr_idx_from Base thread ID for sub-threads to start with. */ -void ctl_init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, server_t *server); +void ctl_init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, + server_t *server, int thr_idx_from); /*! * \brief Regularly check the state of parallel CTL processing workers. @@ -63,31 +66,17 @@ int ctl_cleanup_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs); void ctl_finalize_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs); /*! - * Find/create a thread processing incomming control commands. - * - * \param[in] ctl Control context. - * \param[in] server Server instance. - * \param[in/out] exclusive CTLs are being processed exclusively by calling thread. - * \param[in] thread_idx Calling thread index. - * \param[in] ctxs CTL thread contexts. - * \param[in] n_ctxs Number of CTL thread contexts. + * \brief Initialize CTL socket handling threads. * - * \return Error code, KNOT_EOK if successful, KNOT_CTL_ESTOP if server shutdown desired. - */ -int ctl_manage(knot_ctl_t *ctl, server_t *server, bool *exclusive, - int thread_idx, concurrent_ctl_ctx_t *ctxs, size_t n_ctxs); - -/*! - * \brief Initialize CTL socket handling thread. - * - * \param ctx Socket thread context. + * \param ctx Socket thread contexts. + * \param sock_count Number of socket threads. * * \return KNOT_E* */ -int ctl_socket_thr_init(ctl_socket_ctx_t *ctx); +int ctl_socket_thr_init(ctl_socket_ctx_t *ctx, unsigned sock_count); /*! - * \brief De-initialize CTL socket handling thread. + * \brief De-initialize CTL socket handling threads. * * \param ctx Socket thread context. */ diff --git a/src/knot/server/dthreads.c b/src/knot/server/dthreads.c index f1e6d64e8a..9588317e21 100644 --- a/src/knot/server/dthreads.c +++ b/src/knot/server/dthreads.c @@ -201,7 +201,7 @@ static void *thread_ep(void *data) * \retval New thread instance on success. * \retval NULL on error. */ -static dthread_t *dt_create_thread(dt_unit_t *unit) +static dthread_t *dt_create_thread(dt_unit_t *unit, unsigned idx) { // Alloc thread dthread_t *thread = malloc(sizeof(dthread_t)); @@ -217,6 +217,7 @@ static dthread_t *dt_create_thread(dt_unit_t *unit) // Set membership in unit thread->unit = unit; + thread->idx = idx; // Initialize attribute pthread_attr_t *attr = &thread->_attr; @@ -314,7 +315,7 @@ static dt_unit_t *dt_create_unit(int count) // Initialize threads int init_success = 1; for (int i = 0; i < count; ++i) { - unit->threads[i] = dt_create_thread(unit); + unit->threads[i] = dt_create_thread(unit, i); if (unit->threads[i] == 0) { init_success = 0; break; diff --git a/src/knot/server/dthreads.h b/src/knot/server/dthreads.h index 6b411aac47..035732cb7d 100644 --- a/src/knot/server/dthreads.h +++ b/src/knot/server/dthreads.h @@ -61,6 +61,7 @@ typedef int (*runnable_t)(struct dthread *); */ typedef struct dthread { volatile unsigned state; /*!< Bitfield of dt_flag flags. */ + unsigned idx; /*!< Index of the thread within the unit. */ runnable_t run; /*!< Runnable function or 0. */ runnable_t destruct; /*!< Destructor function or 0. */ void *data; /*!< Currently active data */ diff --git a/src/utils/knotd/main.c b/src/utils/knotd/main.c index fc3d058d92..53e56ef879 100644 --- a/src/utils/knotd/main.c +++ b/src/utils/knotd/main.c @@ -153,60 +153,93 @@ static void check_loaded(server_t *server) dbus_emit_running(true); } -/*! \brief Event loop listening for signals and remote commands. */ -static void event_loop(server_t *server, const char *socket, bool daemonize, - unsigned long pid) +static void deinit_ctls(knot_ctl_t **ctls, unsigned count) { - knot_ctl_t *ctl = knot_ctl_alloc(); - if (ctl == NULL) { - log_fatal("control, failed to initialize (%s)", - knot_strerror(KNOT_ENOMEM)); - return; + for (unsigned i = 0; i < count; i++) { + knot_ctl_unbind(ctls[i]); + knot_ctl_free(ctls[i]); } + free(ctls); +} - // Set control timeout. - knot_ctl_set_timeout(ctl, conf()->cache.ctl_timeout); +static unsigned count_ctls(const char *socket, conf_val_t *listen_val) +{ + return (socket == NULL) ? MAX(1, conf_val_count(listen_val)) : 1; +} - /* Get control socket configuration. */ - char *listen; - if (socket == NULL) { - conf_val_t listen_val = conf_get(conf(), C_CTL, C_LISTEN); - conf_val_t rundir_val = conf_get(conf(), C_SRV, C_RUNDIR); - char *rundir = conf_abs_path(&rundir_val, NULL); - listen = conf_abs_path(&listen_val, rundir); - free(rundir); - } else { - listen = strdup(socket); - } - if (listen == NULL) { - knot_ctl_free(ctl); - log_fatal("control, empty socket path"); - return; +static knot_ctl_t **init_ctls(const char *socket) +{ + conf_val_t listen_val = conf_get(conf(), C_CTL, C_LISTEN); + unsigned cnt = count_ctls(socket, &listen_val); + + knot_ctl_t **res = calloc(cnt, sizeof(*res)); + for (unsigned i = 0; i < cnt; i++) { + res[i] = knot_ctl_alloc(); + if (res[i] == NULL) { + log_fatal("control, failed to initialize socket"); + deinit_ctls(res, i); + return NULL; + } } - log_info("control, binding to '%s'", listen); - - /* Bind the control socket. */ uint16_t backlog = conf_get_int(conf(), C_CTL, C_BACKLOG); - int ret = knot_ctl_bind(ctl, listen, backlog); + conf_val_t rundir_val = conf_get(conf(), C_SRV, C_RUNDIR); + char *rundir = conf_abs_path(&rundir_val, NULL); + + int ret = KNOT_EOK; + for (unsigned i = 0; i < cnt && ret == KNOT_EOK; i++) { + char *listen = (socket == NULL) ? conf_abs_path(&listen_val, rundir) + : strdup(socket); + if (listen == NULL) { + log_fatal("control, empty socket path"); + ret = KNOT_ENOENT; + } else { + knot_ctl_set_timeout(res[i], conf()->cache.ctl_timeout); + log_info("control, binding to '%s'", listen); + ret = knot_ctl_bind(res[i], listen, backlog); + if (ret != KNOT_EOK) { + log_fatal("control, failed to bind socket '%s' (%s)", + listen, knot_strerror(ret)); + } + free(listen); + } + conf_val_next(&listen_val); + } if (ret != KNOT_EOK) { - knot_ctl_free(ctl); - log_fatal("control, failed to bind socket '%s' (%s)", - listen, knot_strerror(ret)); - free(listen); + deinit_ctls(res, cnt); + res = NULL; + } + free(rundir); + + return res; +} + +/*! \brief Event loop listening for signals and remote commands. */ +static void event_loop(server_t *server, const char *socket, bool daemonize, + unsigned long pid) +{ + knot_ctl_t **ctls = init_ctls(socket); + if (ctls == NULL) { return; } - free(listen); - signals_enable(); + conf_val_t listen_val = conf_get(conf(), C_CTL, C_LISTEN); + unsigned sock_count = count_ctls(socket, &listen_val); + ctl_socket_ctx_t sctx = { + .ctls = ctls, + .server = server, + .thr_count = CTL_MAX_CONCURRENT / sock_count + }; - ctl_socket_ctx_t sctx = { .ctl = ctl, .server = server }; - ret = ctl_socket_thr_init(&sctx); + int ret = ctl_socket_thr_init(&sctx, sock_count); if (ret != KNOT_EOK) { - log_fatal("control, failed to launch socket thread (%s)", knot_strerror(ret)); + log_fatal("control, failed to launch socket threads (%s)", + knot_strerror(ret)); return; } + signals_enable(); + /* Notify systemd about successful start. */ systemd_ready_notify(); if (daemonize) { @@ -251,10 +284,7 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, } ctl_socket_thr_end(&sctx); - - /* Unbind the control socket. */ - knot_ctl_unbind(ctl); - knot_ctl_free(ctl); + deinit_ctls(ctls, sock_count); } static void print_help(void) diff --git a/tests-extra/tests/catalog/basic/test.py b/tests-extra/tests/catalog/basic/test.py index 32a97a37ce..5169239c00 100644 --- a/tests-extra/tests/catalog/basic/test.py +++ b/tests-extra/tests/catalog/basic/test.py @@ -222,9 +222,10 @@ if resp3.count("DNSKEY") > 0: dnskey3 = resp2.resp.answer[0].to_rdataset()[0] # Check inaccessibility of catalog zone -slave.ctl("conf-begin") -slave.ctl("conf-unset zone[catalog1.].acl") # remove transfer-related ACLs -slave.ctl("conf-commit") +confsock = slave.ctl_sock_rnd() +slave.ctl("conf-begin", custom_parm=confsock) +slave.ctl("conf-unset zone[catalog1.].acl", custom_parm=confsock) # remove transfer-related ACLs +slave.ctl("conf-commit", custom_parm=confsock) t.sleep(3) try: resp = slave.dig("version.catalog1.", "TXT", tsig=True) @@ -233,8 +234,9 @@ except: pass # Check for member zones not leaking after zonedb reload (just trigger the reload) -slave.ctl("conf-begin") -slave.ctl("conf-set zone[catalog1.].journal-content changes") -slave.ctl("conf-commit") +confsock = slave.ctl_sock_rnd() +slave.ctl("conf-begin", custom_parm=confsock) +slave.ctl("conf-set zone[catalog1.].journal-content changes", custom_parm=confsock) +slave.ctl("conf-commit", custom_parm=confsock) t.end() diff --git a/tests-extra/tests/ctl/basic/test.py b/tests-extra/tests/ctl/basic/test.py index 7f5b3b6e49..6da46713e8 100644 --- a/tests-extra/tests/ctl/basic/test.py +++ b/tests-extra/tests/ctl/basic/test.py @@ -22,7 +22,8 @@ ZONE_NAME = "testzone." t.start() -ctl.connect(os.path.join(knot.dir, "knot.sock")) +sockname = knot.ctl_sock_rnd(name_only=True) +ctl.connect(os.path.join(knot.dir, sockname)) # Check conf-abort and conf-commit without conf transaction open. @@ -126,7 +127,8 @@ ctl.close() resp = knot.dig(ZONE_NAME, "SOA") resp.check(rcode="NOERROR") -ctl.connect(os.path.join(knot.dir, "knot.sock")) +sockname = knot.ctl_sock_rnd(name_only=True) +ctl.connect(os.path.join(knot.dir, sockname)) # Abort remove SOA. ctl.send_block(cmd="zone-begin") diff --git a/tests-extra/tests/ctl/concurrent/test.py b/tests-extra/tests/ctl/concurrent/test.py index e62331d8a4..5490ada156 100644 --- a/tests-extra/tests/ctl/concurrent/test.py +++ b/tests-extra/tests/ctl/concurrent/test.py @@ -39,31 +39,32 @@ def random_ctls(server, zone_name): random_sleep() def ctl_txn_generic(server, txn_start, txn_modify, txn_commit, txn_abort, abort_failed_start): + txnsock = server.ctl_sock_rnd() try: - server.ctl("zone-status", availability=False) + server.ctl("zone-status", availability=False, custom_parm=txnsock) except: pass try: - server.ctl(txn_start, availability=False) + server.ctl(txn_start, availability=False, custom_parm=txnsock) except: try: if abort_failed_start: - server.ctl(txn_abort, availability=False) + server.ctl(txn_abort, availability=False, custom_parm=txnsock) except: pass return random_sleep() try: - server.ctl(txn_modify, availability=False) + server.ctl(txn_modify, availability=False, custom_parm=txnsock) random_sleep() - server.ctl(txn_commit, availability=False) + server.ctl(txn_commit, availability=False, custom_parm=txnsock) except: attempts = 9 while attempts > 0: time.sleep(2) attempts -= 1 try: - server.ctl(txn_abort, availability=False) + server.ctl(txn_abort, availability=False, custom_parm=txnsock) attempts = 0 except: pass diff --git a/tests-extra/tests/ctl/txn_zone_conf/test.py b/tests-extra/tests/ctl/txn_zone_conf/test.py index af24aa62d3..e47954214f 100644 --- a/tests-extra/tests/ctl/txn_zone_conf/test.py +++ b/tests-extra/tests/ctl/txn_zone_conf/test.py @@ -19,7 +19,12 @@ for z in zones: t.start() serials = master.zones_wait(zones) -master.ctl("zone-begin " + ZONE) +zonesock = master.ctl_sock_rnd() +confsock = master.ctl_sock_rnd() +zonesoc2 = master.ctl_sock_rnd() +confsoc2 = master.ctl_sock_rnd() + +master.ctl("zone-begin " + ZONE, custom_parm=zonesock) try: master.ctl("reload") @@ -28,13 +33,13 @@ except: pass try: - master.ctl("conf-begin") + master.ctl("conf-begin", custom_parm=confsock) set_err("allowed conf-begin within zone txn") except: pass -master.ctl("zone-set " + ZONE + " " + RNDNAME + " 3600 A 1.2.3.4") -master.ctl("zone-commit " + ZONE) +master.ctl("zone-set " + ZONE + " " + RNDNAME + " 3600 A 1.2.3.4", custom_parm=zonesock) +master.ctl("zone-commit " + ZONE, custom_parm=zonesock) serials = master.zones_wait(zones, serials) resp = master.dig(RNDNAME + "." + ZONE, "AAAA", dnssec=True) @@ -42,16 +47,16 @@ resp.check() resp.check_count(1, "NSEC", section="authority") resp.check_count(0, "NSEC3", section="authority") -master.ctl("conf-begin") +master.ctl("conf-begin", custom_parm=confsoc2) try: - master.ctl("zone-begin") + master.ctl("zone-begin", custom_parm=zonesoc2) set_err("allowed zone-begin within conf txn") except: pass -master.ctl("conf-set policy[" + ZONE + "].nsec3 on") -master.ctl("conf-commit") +master.ctl("conf-set policy[" + ZONE + "].nsec3 on", custom_parm=confsoc2) +master.ctl("conf-commit", custom_parm=confsoc2) serials = master.zones_wait(zones, serials) resp = master.dig(RNDNAME + "." + ZONE, "AAAA", dnssec=True) diff --git a/tests-extra/tests/ixfr/block_notify/test.py b/tests-extra/tests/ixfr/block_notify/test.py index 48b168e404..063bf3cddb 100644 --- a/tests-extra/tests/ixfr/block_notify/test.py +++ b/tests-extra/tests/ixfr/block_notify/test.py @@ -29,9 +29,10 @@ slave.zones_wait(zone, serials_init) req = slave.dig("suppnot1.example.com.", "A") req.check(rcode="NOERROR") -tested.ctl("conf-begin") -tested.ctl("conf-set remote[knot1].block-notify-after-transfer on") -tested.ctl("conf-commit") +confsock = tested.ctl_sock_rnd() +tested.ctl("conf-begin", custom_parm=confsock) +tested.ctl("conf-set remote[knot1].block-notify-after-transfer on", custom_parm=confsock) +tested.ctl("conf-commit", custom_parm=confsock) up = master.update(zone) up.add("suppnot2", 3600, "A", "1.2.3.4") diff --git a/tests-extra/tests/modules/geoip/test.py b/tests-extra/tests/modules/geoip/test.py index 1f3bcf7761..9d95199594 100644 --- a/tests-extra/tests/modules/geoip/test.py +++ b/tests-extra/tests/modules/geoip/test.py @@ -131,7 +131,7 @@ for i in range(1, 1000): # Switch subnet file. if RELOAD_OVERWRITE: shutil.copyfile(subnet2_filename, subnet_filename) - knot.ctl("-f zone-reload example.com.", wait=True) + knot.ctl("-f zone-reload example.com.", wait=True, custom_parm=[]) # explicitly DON'T specify socket so that configuration is parsed also by knotc itself else: mod_subnet.config_file = subnet2_filename knot.gen_confile() @@ -164,7 +164,7 @@ else: reload_failed = False try: if RELOAD_OVERWRITE: - knot.ctl("-f zone-reload example.com.", wait=True) + knot.ctl("-f zone-reload example.com.", wait=True, custom_parm=[]) # explicitly DON'T specify socket so that configuration is parsed also by knotc itself else: knot.reload() except: diff --git a/tests-extra/tests/zone/zij_reload/test.py b/tests-extra/tests/zone/zij_reload/test.py index 25091f6318..d32ae2e3ba 100644 --- a/tests-extra/tests/zone/zij_reload/test.py +++ b/tests-extra/tests/zone/zij_reload/test.py @@ -16,9 +16,10 @@ t.link(zone, knot) t.start() knot.zone_wait(zone) -knot.ctl("conf-begin") -knot.ctl("conf-set zone[%s].journal-content all" % zone[0].name) -knot.ctl("conf-commit") +confsock = knot.ctl_sock_rnd() +knot.ctl("conf-begin", custom_parm=confsock) +knot.ctl("conf-set zone[%s].journal-content all" % zone[0].name, custom_parm=confsock) +knot.ctl("conf-commit", custom_parm=confsock) t.sleep(2) knot.stop() diff --git a/tests-extra/tools/dnstest/server.py b/tests-extra/tools/dnstest/server.py index f480849b86..8504f254de 100644 --- a/tests-extra/tools/dnstest/server.py +++ b/tests-extra/tools/dnstest/server.py @@ -390,13 +390,16 @@ class Server(object): self.binding_errors = errors - def ctl(self, cmd, wait=False, availability=True, read_result=False): + def ctl(self, cmd, wait=False, availability=True, read_result=False, custom_parm=None): + if custom_parm is None: + custom_parm = self.ctl_sock_rnd() + if availability: # Check for listening control interface. ok = False for i in range(0, 5): try: - self.ctl("status", availability=False) + self.ctl("status", availability=False, custom_parm=custom_parm) except Failed: time.sleep(1) continue @@ -407,7 +410,7 @@ class Server(object): raise Failed("Unavailable remote control server='%s'" % self.name) # Send control command. - args = self.ctl_params + (self.control_wait if wait else []) + cmd.split() + args = self.ctl_params + custom_parm + (self.control_wait if wait else []) + cmd.split() try: check_call([self.control_bin] + args, stdout=open(self.dir + "/call.out", mode="a"), @@ -832,7 +835,8 @@ class Server(object): for t in range(attempts): try: if use_ctl: - ctl.connect(os.path.join(self.dir, "knot.sock")) + sockname = self.ctl_sock_rnd(self, name_only=True) + ctl.connect(os.path.join(self.dir, sockname)) ctl.send_block(cmd="zone-read", zone=zone_name, owner="@", rtype="SOA") resp = ctl.receive_block() @@ -1275,6 +1279,9 @@ class Bind(Server): return s.conf + def ctl_sock_rnd(self): + return [] + def start(self, clean=False): for zname in self.zones: z = self.zones[zname] @@ -1495,7 +1502,7 @@ class Knot(Server): s.end() s.begin("control") - s.item_str("listen", "knot.sock") + s.item("listen", "[ \"knot.sock\", \"knot2.sock\"]") s.item_str("timeout", "15") s.end() @@ -1918,6 +1925,15 @@ class Knot(Server): return s.conf + def ctl_sock_rnd(self, name_only=False): + sockname = random.choice(["knot.sock", "knot2.sock"]) + sockpath = os.path.join(self.dir, sockname) + + if name_only: + return sockpath + else: + return ["-s", sockpath] + def check_quic(self): res = run([self.daemon_bin, '-VV'], stdout=PIPE) for line in res.stdout.decode('ascii').split("\n"): -- GitLab From f59387903ae49ef8a75342bb78e24e33ef7b913b Mon Sep 17 00:00:00 2001 From: Daniel Salzman <daniel.salzman@nic.cz> Date: Wed, 5 Mar 2025 23:25:04 +0100 Subject: [PATCH 5/5] ctl: hide unused API --- src/knot/ctl/threads.c | 29 ++++++++++++++++++++---- src/knot/ctl/threads.h | 51 ------------------------------------------ 2 files changed, 25 insertions(+), 55 deletions(-) diff --git a/src/knot/ctl/threads.c b/src/knot/ctl/threads.c index 1bb9227fe8..ec11ea380d 100644 --- a/src/knot/ctl/threads.c +++ b/src/knot/ctl/threads.c @@ -11,8 +11,29 @@ #include "knot/ctl/threads.h" #include "knot/server/signals.h" -void ctl_init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, - server_t *server, unsigned thr_idx_from) +typedef enum { + CONCURRENT_EMPTY = 0, // fresh cctx without a thread. + CONCURRENT_ASSIGNED, // cctx assigned to process a command. + CONCURRENT_RUNNING, // ctl command is being processed in the thread. + CONCURRENT_IDLE, // command has been processed, waiting for a new one. + CONCURRENT_KILLED, // cctx cleanup has started. + CONCURRENT_FINISHED, // after having been killed, the thread is being joined. +} concurrent_ctl_state_t; + +typedef struct { + concurrent_ctl_state_t state; + pthread_mutex_t mutex; // Protects .state. + pthread_cond_t cond; + knot_ctl_t *ctl; + server_t *server; + pthread_t thread; + int ret; + unsigned thread_idx; + bool exclusive; +} concurrent_ctl_ctx_t; + +static void ctl_init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, + server_t *server, unsigned thr_idx_from) { for (size_t i = 0; i < n_ctxs; i++) { concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; @@ -24,7 +45,7 @@ void ctl_init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, } } -int ctl_cleanup_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) +static int ctl_cleanup_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) { int ret = KNOT_EOK; for (size_t i = 0; i < n_ctxs; i++) { @@ -42,7 +63,7 @@ int ctl_cleanup_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) return ret; } -void ctl_finalize_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) +static void ctl_finalize_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) { for (size_t i = 0; i < n_ctxs; i++) { concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; diff --git a/src/knot/ctl/threads.h b/src/knot/ctl/threads.h index 686a3dbf4e..b49b6bfd52 100644 --- a/src/knot/ctl/threads.h +++ b/src/knot/ctl/threads.h @@ -7,27 +7,6 @@ #include "knot/ctl/process.h" -typedef enum { - CONCURRENT_EMPTY = 0, // fresh cctx without a thread. - CONCURRENT_ASSIGNED, // cctx assigned to process a command. - CONCURRENT_RUNNING, // ctl command is being processed in the thread. - CONCURRENT_IDLE, // command has been processed, waiting for a new one. - CONCURRENT_KILLED, // cctx cleanup has started. - CONCURRENT_FINISHED, // after having been killed, the thread is being joined. -} concurrent_ctl_state_t; - -typedef struct { - concurrent_ctl_state_t state; - pthread_mutex_t mutex; // Protects .state. - pthread_cond_t cond; - knot_ctl_t *ctl; - server_t *server; - pthread_t thread; - int ret; - int thread_idx; - bool exclusive; -} concurrent_ctl_ctx_t; - typedef struct { knot_ctl_t **ctls; server_t *server; @@ -35,36 +14,6 @@ typedef struct { unsigned thr_count; } ctl_socket_ctx_t; -/*! - * \brief Initialize CTL thread processing contexts. - * - * \param concurrent_ctxs Structures to initialize. - * \param n_ctxs Their number/count. - * \param server Server structure. - * \param thr_idx_from Base thread ID for sub-threads to start with. - */ -void ctl_init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, - server_t *server, int thr_idx_from); - -/*! - * \brief Regularly check the state of parallel CTL processing workers. - * - * \param concurrent_ctxs Parallel CTL processing contexts. - * \param n_ctxs Their number/count. - * - * \retval KNOT_ESTOP Server shutdown requested. - * \retval KNOT_EOK Otherwise. - */ -int ctl_cleanup_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs); - -/*! - * \brief De-initialize CTL thread processing contexts. - * - * \param concurrent_ctxs Structures to de-initialize. - * \param n_ctxs Their number/count. - */ -void ctl_finalize_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs); - /*! * \brief Initialize CTL socket handling threads. * -- GitLab