diff --git a/src/knot/ctl/process.c b/src/knot/ctl/process.c index 50fde217e5aed457a9c75e9ef68b2b2ea1743b34..fe1940d87a44577d7b6625b36fbfbf8def1df55d 100644 --- a/src/knot/ctl/process.c +++ b/src/knot/ctl/process.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2022 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> +/* Copyright (C) 2024 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -20,7 +20,7 @@ #include "libknot/error.h" #include "contrib/string.h" -int ctl_process(knot_ctl_t *ctl, server_t *server) +int ctl_process(knot_ctl_t *ctl, server_t *server, bool *exclusive) { if (ctl == NULL || server == NULL) { return KNOT_EINVAL; @@ -90,11 +90,21 @@ int ctl_process(knot_ctl_t *ctl, server_t *server) continue; } + if ((cmd == CTL_CONF_COMMIT || cmd == CTL_CONF_ABORT) && !*exclusive) { + log_ctl_warning("control, invalid reception of '%s'", cmd_name); + continue; + } + // Execute the command. int cmd_ret = ctl_exec(cmd, &args); switch (cmd_ret) { case KNOT_EOK: strip = false; + if (cmd == CTL_CONF_BEGIN) { + *exclusive = true; + } else if (cmd == CTL_CONF_COMMIT || cmd == CTL_CONF_ABORT) { + *exclusive = false; + } case KNOT_CTL_ESTOP: case KNOT_CTL_EZONE: // KNOT_CTL_EZONE - don't change strip, but don't be reported diff --git a/src/knot/ctl/process.h b/src/knot/ctl/process.h index ab0f75f2629513a9b6f1e17558068b5671cfc98e..20e40d748146b9b3ba081444eab830c16802e2e5 100644 --- a/src/knot/ctl/process.h +++ b/src/knot/ctl/process.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> +/* Copyright (C) 2024 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -22,9 +22,10 @@ /*! * Processes incoming control commands. * - * \param[in] ctl Control context. - * \param[in] server Server instance. + * \param[in] ctl Control context. + * \param[in] server Server instance. + * \param[out] exclusive All following CTLs shall (not) be processed exclusively by this thread. * * \return Error code, KNOT_EOK if successful. */ -int ctl_process(knot_ctl_t *ctl, server_t *server); +int ctl_process(knot_ctl_t *ctl, server_t *server, bool *exclusive); diff --git a/src/utils/knotd/main.c b/src/utils/knotd/main.c index bb70aefd134119028ee53bc1a1433ee7bb2b7b23..203d31324d9be8bfd008997719ef296d24d2b8c7 100644 --- a/src/utils/knotd/main.c +++ b/src/utils/knotd/main.c @@ -50,11 +50,24 @@ #define PROGRAM_NAME "knotd" #define MAX_CTL_CONCURRENT 8 +typedef enum { + CONCURRENT_EMPTY = 0, // fresh cctx without a thread. + CONCURRENT_ASSIGNED, // cctx assigned to process a command. + CONCURRENT_RUNNING, // ctl command is being processed in the thread. + CONCURRENT_IDLE, // command has been processed, waiting for a new one. + CONCURRENT_KILLED, // cctx cleanup has started. + CONCURRENT_FINISHED, // after having been killed, the thread is being joined. +} concurrent_ctl_state_t; + typedef struct { + concurrent_ctl_state_t state; + pthread_mutex_t mutex; // Protects .state. + pthread_cond_t cond; knot_ctl_t *ctl; server_t *server; pthread_t thread; int ret; + bool exclusive; } concurrent_ctl_ctx_t; /* Signal flags. */ @@ -251,44 +264,153 @@ static void check_loaded(server_t *server) dbus_emit_running(true); } +static void *ctl_process_thread(void *arg); + +/*! + * Try to find an empty ctl processing context and if successful, + * prepare to lauch the incomming command processing in it. + * + * \param[in] concurrent_ctxs Configured concurrent control contexts. + * \param[in] n_ctxs Number of configured concurrent control contexts. + * \param[in] ctl Control context. + * + * \return Assigned concurrent control context, or NULL. + */ + static concurrent_ctl_ctx_t *find_free_ctx(concurrent_ctl_ctx_t *concurrent_ctxs, - size_t n_ctxs) + size_t n_ctxs, knot_ctl_t *ctl) { - for (size_t i = 0; i < n_ctxs; i++) { - if (concurrent_ctxs[i].ctl == NULL) { - assert(concurrent_ctxs[i].server == NULL); - return &concurrent_ctxs[i]; + concurrent_ctl_ctx_t *res = NULL; + for (size_t i = 0; i < n_ctxs && res == NULL; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_lock(&cctx->mutex); + if (cctx->exclusive) { + while (cctx->state != CONCURRENT_IDLE) { + pthread_cond_wait(&cctx->cond, &cctx->mutex); + } + knot_ctl_free(cctx->ctl); + cctx->ctl = knot_ctl_clone(ctl); + if (cctx->ctl == NULL) { + cctx->exclusive = false; + pthread_mutex_unlock(&cctx->mutex); + break; + } + cctx->state = CONCURRENT_ASSIGNED; + res = cctx; + pthread_cond_broadcast(&cctx->cond); } + pthread_mutex_unlock(&cctx->mutex); + } + for (size_t i = 0; i < n_ctxs && res == NULL; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_lock(&cctx->mutex); + switch (cctx->state) { + case CONCURRENT_EMPTY: + pthread_create(&cctx->thread, NULL, ctl_process_thread, cctx); + break; + case CONCURRENT_IDLE: + knot_ctl_free(cctx->ctl); + pthread_cond_broadcast(&cctx->cond); + break; + default: + pthread_mutex_unlock(&cctx->mutex); + continue; + } + cctx->ctl = knot_ctl_clone(ctl); + if (cctx->ctl != NULL) { + cctx->state = CONCURRENT_ASSIGNED; + res = cctx; + } + pthread_mutex_unlock(&cctx->mutex); + } + return res; +} + +static void init_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, server_t *server) +{ + for (size_t i = 0; i < n_ctxs; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_init(&cctx->mutex, NULL); + pthread_cond_init(&cctx->cond, NULL); + cctx->server = server; } - return NULL; } -static int cleanup_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs, bool force) +static int cleanup_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) { int ret = KNOT_EOK; for (size_t i = 0; i < n_ctxs; i++) { concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; - if (cctx->ctl != NULL && (force || cctx->server == NULL)) { - (void)pthread_join(cctx->thread, NULL); - assert(cctx->server == NULL); + pthread_mutex_lock(&cctx->mutex); + if (cctx->state == CONCURRENT_IDLE) { + knot_ctl_free(cctx->ctl); + cctx->ctl = NULL; if (cctx->ret == KNOT_CTL_ESTOP) { ret = cctx->ret; } - knot_ctl_free(cctx->ctl); - memset(cctx, 0, sizeof(*cctx)); } + pthread_mutex_unlock(&cctx->mutex); } return ret; } +static void finalize_ctxs(concurrent_ctl_ctx_t *concurrent_ctxs, size_t n_ctxs) +{ + for (size_t i = 0; i < n_ctxs; i++) { + concurrent_ctl_ctx_t *cctx = &concurrent_ctxs[i]; + pthread_mutex_lock(&cctx->mutex); + if (cctx->state == CONCURRENT_EMPTY) { + pthread_mutex_unlock(&cctx->mutex); + pthread_mutex_destroy(&cctx->mutex); + pthread_cond_destroy(&cctx->cond); + continue; + } + + cctx->state = CONCURRENT_KILLED; + pthread_cond_broadcast(&cctx->cond); + pthread_mutex_unlock(&cctx->mutex); + (void)pthread_join(cctx->thread, NULL); + + assert(cctx->state == CONCURRENT_FINISHED); + knot_ctl_free(cctx->ctl); + pthread_mutex_destroy(&cctx->mutex); + pthread_cond_destroy(&cctx->cond); + } +} + static void *ctl_process_thread(void *arg) { concurrent_ctl_ctx_t *ctx = arg; rcu_register_thread(); - setup_signals(); // in fact, this blocks common signals so that they arrive to main thread instead of this one - ctx->ret = ctl_process(ctx->ctl, ctx->server); + setup_signals(); // in fact, this blocks common signals so that they + // arrive to main thread instead of this one + + pthread_mutex_lock(&ctx->mutex); + while (ctx->state != CONCURRENT_KILLED) { + if (ctx->state != CONCURRENT_ASSIGNED) { + pthread_cond_wait(&ctx->cond, &ctx->mutex); + continue; + } + ctx->state = CONCURRENT_RUNNING; + bool exclusive = ctx->exclusive; + pthread_mutex_unlock(&ctx->mutex); + + // Not IDLE, ctx can be read without locking. + int ret = ctl_process(ctx->ctl, ctx->server, &exclusive); + + pthread_mutex_lock(&ctx->mutex); + ctx->ret = ret; + ctx->exclusive = exclusive; + if (ctx->state == CONCURRENT_RUNNING) { // not KILLED + ctx->state = CONCURRENT_IDLE; + pthread_cond_broadcast(&ctx->cond); + } + } + knot_ctl_close(ctx->ctl); - ctx->server = NULL; + + ctx->state = CONCURRENT_FINISHED; + pthread_mutex_unlock(&ctx->mutex); rcu_unregister_thread(); return NULL; } @@ -297,7 +419,6 @@ static void *ctl_process_thread(void *arg) static void event_loop(server_t *server, const char *socket, bool daemonize, unsigned long pid) { - concurrent_ctl_ctx_t concurrent_ctxs[MAX_CTL_CONCURRENT] = { 0 }, *cctx; knot_ctl_t *ctl = knot_ctl_alloc(); if (ctl == NULL) { log_fatal("control, failed to initialize (%s)", @@ -341,6 +462,10 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, enable_signals(); + concurrent_ctl_ctx_t concurrent_ctxs[MAX_CTL_CONCURRENT] = { 0 }; + init_ctxs(concurrent_ctxs, MAX_CTL_CONCURRENT, server); + bool main_thread_exclusive = false; + /* Notify systemd about successful start. */ systemd_ready_notify(); if (daemonize) { @@ -366,7 +491,7 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, server_update_zones(conf(), server, mode); pthread_rwlock_unlock(&server->ctl_lock); } - if (sig_req_stop || cleanup_ctxs(concurrent_ctxs, MAX_CTL_CONCURRENT, false) == KNOT_CTL_ESTOP) { + if (sig_req_stop || cleanup_ctxs(concurrent_ctxs, MAX_CTL_CONCURRENT) == KNOT_CTL_ESTOP) { break; } @@ -384,12 +509,9 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, continue; } - if ((cctx = find_free_ctx(concurrent_ctxs, MAX_CTL_CONCURRENT)) != NULL && - (cctx->ctl = knot_ctl_clone(ctl)) != NULL) { - cctx->server = server; - pthread_create(&cctx->thread, NULL, ctl_process_thread, cctx); - } else { - ret = ctl_process(ctl, server); + if (main_thread_exclusive || + find_free_ctx(concurrent_ctxs, MAX_CTL_CONCURRENT, ctl) == NULL) { + ret = ctl_process(ctl, server, &main_thread_exclusive); knot_ctl_close(ctl); if (ret == KNOT_CTL_ESTOP) { break; @@ -397,7 +519,7 @@ static void event_loop(server_t *server, const char *socket, bool daemonize, } } - (void)cleanup_ctxs(concurrent_ctxs, MAX_CTL_CONCURRENT, true); + finalize_ctxs(concurrent_ctxs, MAX_CTL_CONCURRENT); if (conf()->cache.srv_dbus_event & DBUS_EVENT_RUNNING) { dbus_emit_running(false);