Forked from
Knot projects / Knot DNS
10514 commits behind the upstream repository.
-
Daniel Salzman authoredDaniel Salzman authored
dthreads.h 8.31 KiB
/* Copyright (C) 2011 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/*!
* \file dthreads.h
*
* \author Marek Vavrusa <marek.vavusa@nic.cz>
*
* \brief Threading API.
*
* Dynamic threads provide:
* - coherent and incoherent threading capabilities
* - thread repurposing
* - thread prioritization
* - on-the-fly changing of threading unit size
*
* Coherent threading unit is when all threads execute
* the same runnable function.
*
* Incoherent function is when at least one thread executes
* a different runnable than the others.
*
* \addtogroup threading
* @{
*/
#ifndef _KNOTD_DTHREADS_H_
#define _KNOTD_DTHREADS_H_
#include <pthread.h>
/* Forward decls */
struct dthread_t;
struct dt_unit_t;
/*!
* \brief Thread state enumeration.
*/
typedef enum {
ThreadJoined = 1 << 0, /*!< Thread is finished and joined. */
ThreadJoinable = 1 << 1, /*!< Thread is waiting to be reclaimed. */
ThreadCancelled = 1 << 2, /*!< Thread is cancelled, finishing task. */
ThreadDead = 1 << 3, /*!< Thread is finished, exiting. */
ThreadIdle = 1 << 4, /*!< Thread is idle, waiting for purpose. */
ThreadActive = 1 << 5 /*!< Thread is active, working on a task. */
} dt_state_t;
/*!
* \brief Thread runnable prototype.
*
* Runnable is basically a pointer to function which is called on active
* thread runtime.
*
* \note When implementing a runnable, keep in mind to check thread state as
* it may change, and implement a cooperative cancellation point.
*
* Implement this by checking dt_is_cancelled() and return
* as soon as possible.
*/
typedef int (*runnable_t)(struct dthread_t *);
/*!
* \brief Single thread descriptor public API.
*/
typedef struct dthread_t {
volatile unsigned state; /*!< Bitfield of dt_flag flags. */
runnable_t run; /*!< Runnable function or 0. */
runnable_t destruct; /*!< Destructor function or 0. */
void *data; /*!< Currently active data */
struct dt_unit_t *unit; /*!< Reference to assigned unit. */
void *_adata; /* Thread-specific data. */
pthread_t _thr; /* Thread */
pthread_attr_t _attr; /* Thread attributes */
pthread_mutex_t _mx; /* Thread state change lock. */
} dthread_t;
/*!
* \brief Thread unit descriptor API.
*
* Thread unit consists of 1..N threads.
* Unit is coherent if all threads execute
* the same runnable.
*/
typedef struct dt_unit_t {
int size; /*!< Unit width (number of threads) */
struct dthread_t **threads; /*!< Array of threads */
pthread_cond_t _notify; /* Notify thread */
pthread_mutex_t _notify_mx; /* Condition mutex */
pthread_cond_t _report; /* Report thread state */
pthread_mutex_t _report_mx; /* Condition mutex */
pthread_mutex_t _mx; /* Unit lock */
} dt_unit_t;
/*!
* \brief Create a set of coherent threads.
*
* Coherent means, that the threads will share a common runnable and the data.
*
* \param count Requested thread count.
* \param runnable Runnable function for all threads.
* \param destructor Destructor for all threads.
* \param data Any data passed onto threads.
*
* \retval New instance if successful
* \retval NULL on error
*/
dt_unit_t *dt_create(int count, runnable_t runnable, runnable_t destructor, void *data);
/*!
* \brief Free unit.
*
* \warning Behavior is undefined if threads are still active, make sure
* to call dt_join() first.
*
* \param unit Unit to be deleted.
*/
void dt_delete(dt_unit_t **unit);
/*!
* \brief Start all threads in selected unit.
*
* \param unit Unit to be started.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters (unit is null).
*/
int dt_start(dt_unit_t *unit);
/*!
* \brief Send given signal to thread.
*
* \note This is useful to interrupt some blocking I/O as well, for example
* with SIGALRM, which is handled by default.
* \note Signal handler may be overriden in runnable.
*
* \param thread Target thread instance.
* \param signum Signal code.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
* \retval KNOT_ERROR unspecified error.
*/
int dt_signalize(dthread_t *thread, int signum);
/*!
* \brief Wait for all thread in unit to finish.
*
* \param unit Unit to be joined.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
*/
int dt_join(dt_unit_t *unit);
/*!
* \brief Stop all threads in unit.
*
* Thread is interrupted at the nearest runnable cancellation point.
*
* \param unit Unit to be stopped.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
*/
int dt_stop(dt_unit_t *unit);
/*!
* \brief Set thread affinity to masked CPU's.
*
* \param thread Target thread instance.
* \param cpu_id Array of CPU IDs to set affinity to.
* \param cpu_count Number of CPUs in the array, set to 0 for no CPU.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
*/
int dt_setaffinity(dthread_t *thread, unsigned* cpu_id, size_t cpu_count);
/*!
* \brief Wake up thread from idle state.
*
* Thread is awoken from idle state and reenters runnable.
* This function only affects idle threads.
*
* \note Unit needs to be started with dt_start() first, as the function
* doesn't affect dead threads.
*
* \param thread Target thread instance.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
* \retval KNOT_ENOTSUP operation not supported.
*/
int dt_activate(dthread_t *thread);
/*!
* \brief Put thread to idle state, cancells current runnable function.
*
* Thread is flagged with Cancel flag and returns from runnable at the nearest
* cancellation point, which requires complying runnable function.
*
* \note Thread isn't disposed, but put to idle state until it's requested
* again or collected by dt_compact().
*
* \param thread Target thread instance.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
*/
int dt_cancel(dthread_t *thread);
/*!
* \brief Collect and dispose idle threads.
*
* \param unit Target unit instance.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
*/
int dt_compact(dt_unit_t *unit);
/*!
* \brief Return number of online processors.
*
* \retval Number of online CPU's if success.
* \retval <0 on failure.
*/
int dt_online_cpus(void);
/*!
* \brief Return optimal number of threads for instance.
*
* It is estimated as NUM_CPUs + CONSTANT.
* Fallback is DEFAULT_THR_COUNT (\see common.h).
*
* \return Number of threads.
*/
int dt_optimal_size(void);
/*!
* \brief Return true if thread is cancelled.
*
* Synchronously check for ThreadCancelled flag.
*
* \param thread Target thread instance.
*
* \retval 1 if cancelled.
* \retval 0 if not cancelled.
*/
int dt_is_cancelled(dthread_t *thread);
/*!
* \brief Return thread index in threading unit.
*
* \note Returns 0 when thread doesn't have a unit.
*
* \param thread Target thread instance.
*
* \return Thread index.
*/
unsigned dt_get_id(dthread_t *thread);
/*!
* \brief Lock unit to prevent parallel operations which could alter unit
* at the same time.
*
* \param unit Target unit instance.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
* \retval KNOT_EAGAIN lack of resources to lock unit, try again.
* \retval KNOT_ERROR unspecified error.
*/
int dt_unit_lock(dt_unit_t *unit);
/*!
* \brief Unlock unit.
*
* \see dt_unit_lock()
*
* \param unit Target unit instance.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
* \retval KNOT_EAGAIN lack of resources to unlock unit, try again.
* \retval KNOT_ERROR unspecified error.
*/
int dt_unit_unlock(dt_unit_t *unit);
#endif // _KNOTD_DTHREADS_H_
/*! @} */