Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add blocking_fifo pool type #46

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/include/abt.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ enum ABT_sched_type {
};

enum ABT_pool_kind {
ABT_POOL_FIFO
ABT_POOL_FIFO,
ABT_POOL_BLOCKING_FIFO /* pop() will block unless/until work available */
};

enum ABT_pool_access {
Expand Down Expand Up @@ -318,6 +319,7 @@ typedef int (*ABT_pool_free_fn)(ABT_pool);

typedef struct {
ABT_pool_access access; /* Access type */
ABT_pool_kind kind;

/* Functions to manage units */
ABT_unit_get_type_fn u_get_type;
Expand Down
3 changes: 3 additions & 0 deletions src/include/abti.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ struct ABTI_sched {

struct ABTI_pool {
ABT_pool_access access; /* Access mode */
ABT_pool_kind kind; /* Kind of pool */
ABT_bool automatic; /* To know if automatic data free */
int32_t num_scheds; /* Number of associated schedulers */
/* NOTE: int32_t to check if still positive */
Expand Down Expand Up @@ -540,6 +541,7 @@ int ABTI_sched_config_read_global(ABT_sched_config config,

/* Pool */
int ABTI_pool_get_fifo_def(ABT_pool_access access, ABT_pool_def *p_def);
int ABTI_pool_get_blocking_fifo_def(ABT_pool_access access, ABT_pool_def *p_def);
#ifndef ABT_CONFIG_DISABLE_POOL_CONSUMER_CHECK
int ABTI_pool_set_consumer(ABTI_pool *p_pool, ABTI_xstream *p_xstream);
#endif
Expand All @@ -549,6 +551,7 @@ int ABTI_pool_set_producer(ABTI_pool *p_pool, ABTI_xstream *p_xstream);
int ABTI_pool_accept_migration(ABTI_pool *p_pool, ABTI_pool *source);
void ABTI_pool_print(ABTI_pool *p_pool, FILE *p_os, int indent);
void ABTI_pool_reset_id(void);
ABT_pool_kind ABTI_pool_get_kind(ABT_pool_def *def);

/* User-level Thread (ULT) */
int ABTI_thread_migrate_to_pool(ABTI_thread *p_thread, ABTI_pool *p_pool);
Expand Down
122 changes: 103 additions & 19 deletions src/pool/fifo.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
* See COPYRIGHT in top-level directory.
*/

#include <errno.h>
#include "abti.h"

/* FIFO pool implementation */

static int pool_init(ABT_pool pool, ABT_pool_config config);
static int pool_init_blocking(ABT_pool pool, ABT_pool_config config);
static int pool_free(ABT_pool pool);
static int pool_free_blocking(ABT_pool pool);
static size_t pool_get_size(ABT_pool pool);
static void pool_push_shared(ABT_pool pool, ABT_unit unit);
static void pool_push_shared_blocking(ABT_pool pool, ABT_unit unit);
static void pool_push_private(ABT_pool pool, ABT_unit unit);
static ABT_unit pool_pop_shared(ABT_pool pool);
static ABT_unit pool_pop_shared_blocking(ABT_pool pool);
static ABT_unit pool_pop_private(ABT_pool pool);
static int pool_remove_shared(ABT_pool pool, ABT_unit unit);
static int pool_remove_private(ABT_pool pool, ABT_unit unit);
Expand All @@ -26,30 +31,13 @@ static ABT_unit unit_create_from_thread(ABT_thread thread);
static ABT_unit unit_create_from_task(ABT_task task);
static void unit_free(ABT_unit *unit);


/* FIXME: do we need this? */
ABT_pool_def ABTI_pool_fifo = {
.access = ABT_POOL_ACCESS_MPSC,
.p_init = pool_init,
.p_free = pool_free,
.p_get_size = pool_get_size,
.p_push = pool_push_shared,
.p_pop = pool_pop_shared,
.p_remove = pool_remove_shared,
.u_get_type = unit_get_type,
.u_get_thread = unit_get_thread,
.u_get_task = unit_get_task,
.u_is_in_pool = unit_is_in_pool,
.u_create_from_thread = unit_create_from_thread,
.u_create_from_task = unit_create_from_task,
.u_free = unit_free,
};

struct data {
ABTI_spinlock mutex;
size_t num_units;
unit_t *p_head;
unit_t *p_tail;
pthread_mutex_t blocking_mutex;
pthread_cond_t blocking_cond;
};
typedef struct data data_t;

Expand Down Expand Up @@ -87,6 +75,7 @@ int ABTI_pool_get_fifo_def(ABT_pool_access access, ABT_pool_def *p_def)
}

/* Common definitions regardless of the access type */
p_def->kind = ABT_POOL_FIFO;
p_def->access = access;
p_def->p_init = pool_init;
p_def->p_free = pool_free;
Expand All @@ -107,6 +96,26 @@ int ABTI_pool_get_fifo_def(ABT_pool_access access, ABT_pool_def *p_def)
goto fn_exit;
}

int ABTI_pool_get_blocking_fifo_def(ABT_pool_access access, ABT_pool_def *p_def)
{
int ret;

/* a blocking fifo pool is a normal fifo pool with some function
* pointers replaced to add additional blocking capability atop the normal
* functions.
*/
ret = ABTI_pool_get_fifo_def(access, p_def);
if(ret == ABT_SUCCESS) {
p_def->kind = ABT_POOL_BLOCKING_FIFO;
p_def->p_remove = pool_remove_shared;
p_def->p_pop = pool_pop_shared_blocking;
p_def->p_push = pool_push_shared_blocking;
p_def->p_init = pool_init_blocking;
p_def->p_free = pool_free_blocking;
}

return ret;
}

/* Pool functions */

Expand Down Expand Up @@ -134,6 +143,22 @@ int pool_init(ABT_pool pool, ABT_pool_config config)
return abt_errno;
}

int pool_init_blocking(ABT_pool pool, ABT_pool_config config)
{
int ret;

ret = pool_init(pool, config);
if(ret == ABT_SUCCESS) {
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
void *data = ABTI_pool_get_data(p_pool);
data_t *p_data = pool_get_data_ptr(data);
pthread_mutex_init(&p_data->blocking_mutex, NULL);
pthread_cond_init(&p_data->blocking_cond, NULL);
}

return ret;
}

static int pool_free(ABT_pool pool)
{
int abt_errno = ABT_SUCCESS;
Expand All @@ -152,6 +177,18 @@ static int pool_free(ABT_pool pool)
return abt_errno;
}

static int pool_free_blocking(ABT_pool pool)
{
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
void *data = ABTI_pool_get_data(p_pool);
data_t *p_data = pool_get_data_ptr(data);

pthread_mutex_destroy(&p_data->blocking_mutex);
pthread_cond_destroy(&p_data->blocking_cond);

return(pool_free(pool));
}

static size_t pool_get_size(ABT_pool pool)
{
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
Expand Down Expand Up @@ -188,6 +225,21 @@ static void pool_push_shared(ABT_pool pool, ABT_unit unit)
ABTI_spinlock_release(&p_data->mutex);
}

static void pool_push_shared_blocking(ABT_pool pool, ABT_unit unit)
{
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
void *data = ABTI_pool_get_data(p_pool);
data_t *p_data = pool_get_data_ptr(data);

/* hold additional pthread mutex and signal anyone who might be blocking
* on pop()
*/
pthread_mutex_lock(&p_data->blocking_mutex);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think acquiring the lock here is necessary. The pool push operation should be thread safe and cond_signal does not need a lock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree cond_signal() itself doesn't need a lock, but the lock in this specific example is to prevent a race between the waiter checking for the pool being empty and this function adding something to the pool. Without the mutex the you could theoretically get this order of operations, with thread a calling pop() and thread b calling push():

thread a: lock
thread a: check if pool is empty
thread b: insert into pool queue
thread b: cond_signal
thread a: cond_wait

Thread a will block in this case even though there is work in the pool when it hits the cond_wait() call.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the timeout != infinite avoids deadlocks in this PR, so I think it's still correct. Performance wise, my suggestion allows push() to be on a fast path and pop() on empty (including false positives) on a slow path. But I agree that if we change the semantic of the pool to include infinite wait on pop() (which I think is a likely API extension in the future), then keeping the lock for correctness is a must

pool_push_shared(pool, unit);
pthread_cond_signal(&p_data->blocking_cond);
pthread_mutex_unlock(&p_data->blocking_mutex);
}

static void pool_push_private(ABT_pool pool, ABT_unit unit)
{
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
Expand Down Expand Up @@ -246,6 +298,38 @@ static ABT_unit pool_pop_shared(ABT_pool pool)
return h_unit;
}

static ABT_unit pool_pop_shared_blocking(ABT_pool pool)
{
ABT_unit unit;
int ret;

ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
void *data = ABTI_pool_get_data(p_pool);
data_t *p_data = pool_get_data_ptr(data);

pthread_mutex_lock(&p_data->blocking_mutex);
unit = pool_pop_shared(pool);
if(unit == ABT_UNIT_NULL) {
/* timedwait with 100 ms timeout to give the scheduler a chance to
* wake up and check for events even if no work units arrive. We
* intentionally do not loop here.
*/
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_nsec += 1e8;
if(ts.tv_nsec > 1e9) {
ts.tv_nsec -= 1e9;
ts.tv_sec++;
}
ret = pthread_cond_timedwait(&p_data->blocking_cond, &p_data->blocking_mutex, &ts);
if(ret != ETIMEDOUT)
unit = pool_pop_shared(pool);
}
pthread_mutex_unlock(&p_data->blocking_mutex);

return(unit);
}

static ABT_unit pool_pop_private(ABT_pool pool)
{
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
Expand Down
4 changes: 4 additions & 0 deletions src/pool/pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ int ABT_pool_create(ABT_pool_def *def, ABT_pool_config config,

p_pool = (ABTI_pool *)ABTU_malloc(sizeof(ABTI_pool));
p_pool->access = def->access;
p_pool->kind = def->kind;
p_pool->automatic = ABT_FALSE;
p_pool->num_scheds = 0;
#ifndef ABT_CONFIG_DISABLE_POOL_CONSUMER_CHECK
Expand Down Expand Up @@ -109,6 +110,9 @@ int ABT_pool_create_basic(ABT_pool_kind kind, ABT_pool_access access,
case ABT_POOL_FIFO:
abt_errno = ABTI_pool_get_fifo_def(access, &def);
break;
case ABT_POOL_BLOCKING_FIFO:
abt_errno = ABTI_pool_get_blocking_fifo_def(access, &def);
break;
default:
abt_errno = ABT_ERR_INV_POOL_KIND;
break;
Expand Down
35 changes: 32 additions & 3 deletions src/sched/basic.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ static void sched_run(ABT_sched sched)
ABT_pool *pools;
int i;
CNT_DECL(run_cnt);
int n_blocking_pools = 0;
ABTI_pool *p_pool;

ABTI_xstream *p_xstream = ABTI_local_get_xstream();
ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
Expand All @@ -105,15 +107,42 @@ static void sched_run(ABT_sched sched)
num_pools = p_data->num_pools;
pools = p_data->pools;

/* check for the presence of blocking pools */
for(i=0; i < num_pools; i++) {
ABT_pool pool = pools[i];
p_pool = ABTI_pool_get_ptr(pool);
if(p_pool->kind == ABT_POOL_BLOCKING_FIFO)
n_blocking_pools++;
}
/* check configuration; if a blocking pools is in use then
* this scheduler can only be associated with one pool. Otherwise other
* pools could be starved.
*/
ABTI_ASSERT(!n_blocking_pools || num_pools < 2);
if(n_blocking_pools) {
/* set event_freq to 1 to avoid long delays in event processing if
* the pop() function blocks (note that it uses a timer to avoid
* indefinite sleeps; events will eventually be processed).
*/
p_data->event_freq = 1;
event_freq = 1;
#ifdef ABT_CONFIG_USE_SCHED_SLEEP
/* set sleep_time to 0 to prevent superflous sleeps; the pool will
* handle that if idle.
*/
p_data->sleep_time.tv_sec = 0;
p_data->sleep_time.tv_nsec = 0;
#endif
}

while (1) {
CNT_INIT(run_cnt, 0);

/* Execute one work unit from the scheduler's pool */
for (i = 0; i < num_pools; i++) {
ABT_pool pool = pools[i];
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
size_t size = ABTI_pool_get_size(p_pool);
if (size > 0) {
p_pool = ABTI_pool_get_ptr(pool);
if (num_pools == 1 || ABTI_pool_get_size(p_pool) > 0 ) {
/* Pop one work unit */
ABT_unit unit = ABTI_pool_pop(p_pool);
if (unit != ABT_UNIT_NULL) {
Expand Down
3 changes: 1 addition & 2 deletions src/sched/prio.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ static void sched_run(ABT_sched sched)
for (i = 0; i < num_pools; i++) {
ABT_pool pool = p_pools[i];
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
size_t size = ABTI_pool_get_size(p_pool);
if (size > 0) {
if (num_pools == 1 || ABTI_pool_get_size(p_pool) > 0) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above

ABT_unit unit = ABTI_pool_pop(p_pool);
if (unit != ABT_UNIT_NULL) {
ABTI_xstream_run_unit(p_xstream, unit, p_pool);
Expand Down
5 changes: 2 additions & 3 deletions src/sched/randws.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ static void sched_run(ABT_sched sched)
/* Execute one work unit from the scheduler's pool */
ABT_pool pool = p_pools[0];
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
size_t size = ABTI_pool_get_size(p_pool);
if (size > 0) {
if (num_pools == 1 || ABTI_pool_get_size(p_pool) > 0) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above with a slight variation.
We pop a unit, and if that unit is NULL then work-steal.

unit = ABTI_pool_pop(p_pool);
if (unit != ABT_UNIT_NULL) {
ABTI_xstream_run_unit(p_xstream, unit, p_pool);
Expand All @@ -94,7 +93,7 @@ static void sched_run(ABT_sched sched)
target = (num_pools == 2) ? 1 : (rand_r(&seed) % (num_pools-1) + 1);
pool = p_pools[target];
p_pool = ABTI_pool_get_ptr(pool);
size = ABTI_pool_get_size(p_pool);
size_t size = ABTI_pool_get_size(p_pool);
if (size > 0) {
unit = ABTI_pool_pop(p_pool);
LOG_EVENT_POOL_POP(p_pool, unit);
Expand Down
3 changes: 3 additions & 0 deletions test/basic/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ TESTS = \
sched_prio \
sched_randws \
sched_set_main \
pool_blocking_fifo \
sched_stack \
sched_config \
sched_user_ws \
Expand Down Expand Up @@ -96,6 +97,7 @@ sched_basic_SOURCES = sched_basic.c
sched_prio_SOURCES = sched_prio.c
sched_randws_SOURCES = sched_randws.c
sched_set_main_SOURCES = sched_set_main.c
pool_blocking_fifo_SOURCES = pool_blocking_fifo.c
sched_stack_SOURCES = sched_stack.c
sched_config_SOURCES = sched_config.c
sched_user_ws_SOURCES = sched_user_ws.c
Expand Down Expand Up @@ -149,6 +151,7 @@ testing:
./sched_prio
./sched_randws
./sched_set_main
./pool_blocking_fifo
./sched_stack
./sched_config
./sched_user_ws
Expand Down
Loading