Commit ed07f47b authored by Marek Vavrusa's avatar Marek Vavrusa

Removed deprecated dthreads functionality.

parent 639e87eb
......@@ -444,178 +444,6 @@ void dt_delete(dt_unit_t **unit)
*unit = 0;
}
int dt_resize(dt_unit_t *unit, int size)
{
// Check input
if (unit == 0 || size <= 0) {
return KNOT_EINVAL;
}
// Evaluate delta
int delta = unit->size - size;
// Same size
if (delta == 0) {
return 0;
}
// Unit expansion
if (delta < 0) {
// Lock unit
pthread_mutex_lock(&unit->_notify_mx);
dt_unit_lock(unit);
// Realloc threads
dbg_dt("dthreads: growing from %d to %d threads\n",
unit->size, size);
dthread_t **threads = realloc(unit->threads,
size * sizeof(dthread_t *));
if (threads == NULL) {
dt_unit_unlock(unit);
pthread_mutex_unlock(&unit->_notify_mx);
return -1;
}
// Reassign
unit->threads = threads;
// Create new threads
for (int i = unit->size; i < size; ++i) {
threads[i] = dt_create_thread(unit);
}
// Update unit
unit->size = size;
dt_unit_unlock(unit);
pthread_mutex_unlock(&unit->_notify_mx);
return 0;
}
// Unit shrinking
int remaining = size;
dbg_dt("dthreads: shrinking from %d to %d threads\n",
unit->size, size);
// New threads vector
dthread_t **threads = malloc(size * sizeof(dthread_t *));
if (threads == 0) {
return KNOT_ENOMEM;
}
// Lock unit
pthread_mutex_lock(&unit->_notify_mx);
dt_unit_lock(unit);
// Iterate while there is space in new unit
memset(threads, 0, size * sizeof(dthread_t *));
int threshold = ThreadActive;
for (;;) {
// Find threads matching given criterias
int inspected = 0;
for (int i = 0; i < unit->size; ++i) {
// Get thread
dthread_t *thread = unit->threads[i];
if (thread == 0) {
continue;
}
// Count thread as inspected
++inspected;
lock_thread_rw(thread);
// Populate with matching threads
if ((remaining > 0) &&
(!threshold || (thread->state & threshold))) {
// Append to new vector
threads[size - remaining] = thread;
--remaining;
// Invalidate in old vector
unit->threads[i] = 0;
dbg_dt_verb("dthreads: [%p] dt_resize: elected\n",
thread);
} else if (remaining <= 0) {
// Not enough space, delete thread
if (thread->state & ThreadDead) {
unlock_thread_rw(thread);
--inspected;
continue;
}
// Signalize thread to stop
thread->state = ThreadDead | ThreadCancelled;
dt_signalize(thread, SIGALRM);
dbg_dt_verb("dthreads: [%p] dt_resize: "
"is discarded\n", thread);
}
// Unlock thread and continue
unlock_thread_rw(thread);
}
// Finished inspecting running threads
if (inspected == 0) {
break;
}
// Lower threshold
switch (threshold) {
case ThreadActive:
threshold = ThreadIdle;
break;
case ThreadIdle:
threshold = ThreadDead;
break;
default:
threshold = ThreadJoined;
break;
}
}
// Notify idle threads to wake up
pthread_cond_broadcast(&unit->_notify);
pthread_mutex_unlock(&unit->_notify_mx);
// Join discarded threads
for (int i = 0; i < unit->size; ++i) {
// Get thread
dthread_t *thread = unit->threads[i];
if (thread == 0) {
continue;
}
pthread_join(thread->_thr, 0);
/* Thread is already joined and flagged, but anyway... */
lock_thread_rw(thread);
thread->state = ThreadJoined;
unlock_thread_rw(thread);
// Delete thread
dt_delete_thread(&thread);
unit->threads[i] = 0;
}
// Reassign unit threads vector
unit->size = size;
free(unit->threads);
unit->threads = threads;
// Unlock unit
dt_unit_unlock(unit);
return 0;
}
int dt_start(dt_unit_t *unit)
{
// Check input
......@@ -825,45 +653,6 @@ int dt_stop(dt_unit_t *unit)
return KNOT_EOK;
}
//int dt_setprio(dthread_t *thread, int prio)
//{
// // Check input
// if (thread == 0) {
// return KNOT_EINVAL;
// }
// // Clamp priority
// int policy = SCHED_FIFO;
// prio = MIN(MAX(sched_get_priority_min(policy), prio),
// sched_get_priority_max(policy));
// // Update scheduler policy
// int ret = pthread_attr_setschedpolicy(&thread->_attr, policy);
// // Update priority
// if (ret == 0) {
// struct sched_param sp;
// sp.sched_priority = prio;
// ret = pthread_attr_setschedparam(&thread->_attr, &sp);
// }
// /* Map error codes. */
// if (ret != 0) {
// dbg_dt("dthreads: [%p] %s(%d): failed",
// thread, __func__, prio);
// /* Map "not supported". */
// if (errno == ENOTSUP) {
// return KNOT_ENOTSUP;
// }
// return KNOT_EINVAL;
// }
// return KNOT_EOK;
//}
int dt_setaffinity(dthread_t *thread, unsigned* cpu_id, size_t cpu_count)
{
......
......@@ -45,9 +45,6 @@
struct dthread_t;
struct dt_unit_t;
/* Constants. */
#define DTHREADS_STACKSIZE (1024*1024) /* 1M lightweight stack size. */
/*!
* \brief Thread state enumeration.
*/
......@@ -143,28 +140,6 @@ dt_unit_t *dt_create_coherent(int count, runnable_t runnable, void *data);
*/
void dt_delete(dt_unit_t **unit);
/*!
* \brief Resize unit to given number.
*
* \note Newly created dthreads will have no runnable or data, their state
* will be ThreadJoined (that means no thread will be physically created
* until the next dt_start()).
*
* \warning Be careful when shrinking unit, joined and idle threads are
* reclaimed first, but it may kill your active threads
* as a last resort.
* Threads will stop at their nearest cancellation point,
* so this is potentially an expensive and blocking operation.
*
* \param unit Unit to be resized.
* \param size New unit size.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
* \retval KNOT_ENOMEM out of memory error.
*/
int dt_resize(dt_unit_t *unit, int size);
/*!
* \brief Start all threads in selected unit.
*
......@@ -235,21 +210,6 @@ int dt_stop_id(dthread_t *thread);
*/
int dt_stop(dt_unit_t *unit);
/*!
* \brief Modify thread priority.
*
* \param thread Target thread instance.
* \param prio Requested priority (positive integer, default is 0).
*
* \warning Thread priority setting is disabled as the compatible scheduler
* has significant performance deficiencies (SCHED_OTHER).
* (issue #1809)
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
*/
//int dt_setprio(dthread_t *thread, int prio);
/*!
* \brief Set thread affinity to masked CPU's.
*
......
......@@ -36,7 +36,7 @@ unit_api dthreads_tests_api = {
/*
* Unit implementation.
*/
static const int DT_TEST_COUNT = 23;
static const int DT_TEST_COUNT = 18;
/* Unit runnable data. */
static pthread_mutex_t _runnable_mx;
......@@ -140,87 +140,6 @@ static inline int dt_test_reanimate(dt_unit_t *unit)
return ret == 0;
}
/*! \brief Resize unit. */
static inline int dt_test_resize(dt_unit_t *unit, int size)
{
// Resize
int ret = 0;
ret = dt_resize(unit, size);
if (ret < 0) {
return 0;
}
// Check outcome
if (unit->size != size) {
return 0;
}
// Repurpose all
_runnable_i = 0;
for (int i = 0; i < size; ++i) {
ret += dt_repurpose(unit->threads[i], &runnable, 0);
}
ret += dt_start(unit);
// Wait for finish
ret += dt_join(unit);
// Verify
int expected = size * _runnable_cycles;
note("resize test: %d threads, %d ticks, %d expected",
size, _runnable_i, expected);
if (_runnable_i != expected) {
return 0;
}
// Check return codes
return ret == 0;
}
/*! \brief Resize unit while threads are active. */
static inline int dt_test_liveresize(dt_unit_t *unit)
{
// Size
int size = unit->size;
int size_hi = size + 2;
int size_lo = size - 1;
// Expand
int ret = 0;
ret = dt_resize(unit, size_hi);
if (ret < 0) {
return 0;
}
// Repurpose all
for (int i = 0; i < unit->size; ++i) {
ret += dt_repurpose(unit->threads[i], &runnable, 0);
}
// Restart
_runnable_i = 0;
ret += dt_start(unit);
// Shrink
ret += dt_resize(unit, size_lo);
// Wait for finish
ret += dt_join(unit);
// Verify
int expected_hi = size_hi * _runnable_cycles;
int expected_lo = size_lo * _runnable_cycles;
note("resize test: %d->%d->%d threads, %d ticks, <%d,%d> expected",
size, size_hi, size_lo, _runnable_i, expected_lo, expected_hi);
if (_runnable_i > expected_hi || _runnable_i < expected_lo) {
return 0;
}
// Check return codes
return ret == 0;
}
/*! \brief Start unit. */
static inline int dt_test_start(dt_unit_t *unit)
{
......@@ -324,42 +243,20 @@ static int dt_tests_run(int argc, char *argv[])
/* Test 13: Reanimate dead threads. */
ok(dt_test_reanimate(unit), "dthreads: reanimate dead threads");
/* Test 14: Expand unit by 100%. */
int size = unit->size * 2;
ok(dt_test_resize(unit, size),
"dthreads: expanding unit to size * 2 (%d threads)", size);
/* Test 15: Shrink unit to half. */
size = unit->size / 2;
ok(dt_test_resize(unit, size),
"dthreads: shrinking unit to size / 2 (%d threads)", size);
/* Test 16: Resize while threads are active. */
ok(dt_test_liveresize(unit), "dthreads: resizing unit while active");
/* Test 17: Deinitialize */
/* Test 14: Deinitialize */
dt_delete(&unit);
ok(unit == 0, "dthreads: delete unit");
endskip;
/* Test 18: Wrong values. */
/* Test 15: Wrong values. */
unit = dt_create(-1);
ok(unit == 0, "dthreads: create with negative count");
unit = dt_create_coherent(dt_optimal_size(), 0, 0);
/* Test 19: NULL runnable. */
/* Test 16: NULL runnable. */
cmp_ok(dt_start(unit), "==", 0, "dthreads: start with NULL runnable");
/* Test 20: resize to negative value. */
cmp_ok(dt_resize(unit, -19),
"<", 0, "dthreads: resize to negative size");
/* Test 21: resize to zero value. */
cmp_ok(dt_resize(unit, 0), "<", 0, "dthreads: resize to NULL size");
dt_join(unit);
dt_delete(&unit);
/* Test 22: NULL operations crashing. */
/* Test 17: NULL operations crashing. */
int op_count = 14;
int expected_min = op_count * -1;
// All functions must return -1 at least
......@@ -372,7 +269,6 @@ static int dt_tests_run(int argc, char *argv[])
ret += dt_is_cancelled(0); // 0
ret += dt_join(0); // -1
ret += dt_repurpose(0, 0, 0); // -1
ret += dt_resize(0, 0); // -1
ret += dt_signalize(0, SIGALRM); // -1
ret += dt_start(0); // -1
ret += dt_start_id(0); // -1
......@@ -382,7 +278,7 @@ static int dt_tests_run(int argc, char *argv[])
ret += dt_unit_unlock(0); // -1
}, "dthreads: not crashed while executing functions on NULL context");
/* Test 23: expected results. */
/* Test 18: expected results. */
cmp_ok(ret, "<=", expected_min,
"dthreads: correct values when passed NULL context "
"(%d, min: %d)", ret, expected_min);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment