Skip to content
Snippets Groups Projects
Commit 9ee86011 authored by Jan Včelák's avatar Jan Včelák :rocket:
Browse files

worker pool: add API to wait for enqueed tasks

parent c8909077
No related branches found
No related tags found
No related merge requests found
......@@ -36,8 +36,9 @@ struct worker_pool {
pthread_mutex_t lock;
pthread_cond_t wake;
bool suspended;
bool terminating;
int running; /*!< Number of threads performing a task. */
bool suspended; /*!< Is the pool exection suspended? */
bool terminating; /*!< Is the pool terminating? .*/
worker_queue_t tasks;
};
......@@ -76,10 +77,14 @@ static int worker_main(dthread_t *thread)
assert(task);
assert(task->run);
pool->running += 1;
pthread_mutex_unlock(&pool->lock);
task->run(task);
pthread_mutex_lock(&pool->lock);
pool->running -= 1;
pthread_cond_broadcast(&pool->wake);
}
pthread_mutex_unlock(&pool->lock);
......@@ -177,6 +182,19 @@ void worker_pool_suspend(worker_pool_t *pool)
pthread_mutex_unlock(&pool->lock);
}
void worker_pool_wait(worker_pool_t *pool)
{
if (!pool || !pool->suspended) {
return;
}
pthread_mutex_lock(&pool->lock);
while (pool->running > 0) {
pthread_cond_wait(&pool->wake, &pool->lock);
}
pthread_mutex_unlock(&pool->lock);
}
void worker_pool_continue(worker_pool_t *pool)
{
if (!pool) {
......
......@@ -52,9 +52,18 @@ void worker_pool_join(worker_pool_t *pool);
/*!
* \brief Suspend execution of new tasks, existing task are not terminated.
*
* No workes will be started.
*/
void worker_pool_suspend(worker_pool_t *pool);
/*!
* \brief Wait till the number of running tasks is zero.
*
* The pool must be suspended, otherwise no waiting will be performed.
*/
void worker_pool_wait(worker_pool_t *pool);
/*!
* \brief Continue execution of new tasks.
*/
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment