diff --git a/src/include/abt.h.in b/src/include/abt.h.in index 1793641dc..76fb31455 100644 --- a/src/include/abt.h.in +++ b/src/include/abt.h.in @@ -156,7 +156,8 @@ enum ABT_sched_type { }; enum ABT_pool_kind { - ABT_POOL_FIFO + ABT_POOL_FIFO, + ABT_POOL_BLOCKING_FIFO /* pop() will block unless/until work available */ }; enum ABT_pool_access { @@ -318,6 +319,7 @@ typedef int (*ABT_pool_free_fn)(ABT_pool); typedef struct { ABT_pool_access access; /* Access type */ + ABT_pool_kind kind; /* Functions to manage units */ ABT_unit_get_type_fn u_get_type; diff --git a/src/include/abti.h b/src/include/abti.h index df68fcb34..baaeaa0e1 100644 --- a/src/include/abti.h +++ b/src/include/abti.h @@ -275,6 +275,7 @@ struct ABTI_sched { struct ABTI_pool { ABT_pool_access access; /* Access mode */ + ABT_pool_kind kind; /* Kind of pool */ ABT_bool automatic; /* To know if automatic data free */ int32_t num_scheds; /* Number of associated schedulers */ /* NOTE: int32_t to check if still positive */ @@ -540,6 +541,7 @@ int ABTI_sched_config_read_global(ABT_sched_config config, /* Pool */ int ABTI_pool_get_fifo_def(ABT_pool_access access, ABT_pool_def *p_def); +int ABTI_pool_get_blocking_fifo_def(ABT_pool_access access, ABT_pool_def *p_def); #ifndef ABT_CONFIG_DISABLE_POOL_CONSUMER_CHECK int ABTI_pool_set_consumer(ABTI_pool *p_pool, ABTI_xstream *p_xstream); #endif @@ -549,6 +551,7 @@ int ABTI_pool_set_producer(ABTI_pool *p_pool, ABTI_xstream *p_xstream); int ABTI_pool_accept_migration(ABTI_pool *p_pool, ABTI_pool *source); void ABTI_pool_print(ABTI_pool *p_pool, FILE *p_os, int indent); void ABTI_pool_reset_id(void); +ABT_pool_kind ABTI_pool_get_kind(ABT_pool_def *def); /* User-level Thread (ULT) */ int ABTI_thread_migrate_to_pool(ABTI_thread *p_thread, ABTI_pool *p_pool); diff --git a/src/pool/fifo.c b/src/pool/fifo.c index bec71016b..cd658a665 100644 --- a/src/pool/fifo.c +++ b/src/pool/fifo.c @@ -3,16 +3,21 @@ * See COPYRIGHT in top-level directory. */ +#include #include "abti.h" /* FIFO pool implementation */ static int pool_init(ABT_pool pool, ABT_pool_config config); +static int pool_init_blocking(ABT_pool pool, ABT_pool_config config); static int pool_free(ABT_pool pool); +static int pool_free_blocking(ABT_pool pool); static size_t pool_get_size(ABT_pool pool); static void pool_push_shared(ABT_pool pool, ABT_unit unit); +static void pool_push_shared_blocking(ABT_pool pool, ABT_unit unit); static void pool_push_private(ABT_pool pool, ABT_unit unit); static ABT_unit pool_pop_shared(ABT_pool pool); +static ABT_unit pool_pop_shared_blocking(ABT_pool pool); static ABT_unit pool_pop_private(ABT_pool pool); static int pool_remove_shared(ABT_pool pool, ABT_unit unit); static int pool_remove_private(ABT_pool pool, ABT_unit unit); @@ -26,30 +31,13 @@ static ABT_unit unit_create_from_thread(ABT_thread thread); static ABT_unit unit_create_from_task(ABT_task task); static void unit_free(ABT_unit *unit); - -/* FIXME: do we need this? */ -ABT_pool_def ABTI_pool_fifo = { - .access = ABT_POOL_ACCESS_MPSC, - .p_init = pool_init, - .p_free = pool_free, - .p_get_size = pool_get_size, - .p_push = pool_push_shared, - .p_pop = pool_pop_shared, - .p_remove = pool_remove_shared, - .u_get_type = unit_get_type, - .u_get_thread = unit_get_thread, - .u_get_task = unit_get_task, - .u_is_in_pool = unit_is_in_pool, - .u_create_from_thread = unit_create_from_thread, - .u_create_from_task = unit_create_from_task, - .u_free = unit_free, -}; - struct data { ABTI_spinlock mutex; size_t num_units; unit_t *p_head; unit_t *p_tail; + pthread_mutex_t blocking_mutex; + pthread_cond_t blocking_cond; }; typedef struct data data_t; @@ -87,6 +75,7 @@ int ABTI_pool_get_fifo_def(ABT_pool_access access, ABT_pool_def *p_def) } /* Common definitions regardless of the access type */ + p_def->kind = ABT_POOL_FIFO; p_def->access = access; p_def->p_init = pool_init; p_def->p_free = pool_free; @@ -107,6 +96,26 @@ int ABTI_pool_get_fifo_def(ABT_pool_access access, ABT_pool_def *p_def) goto fn_exit; } +int ABTI_pool_get_blocking_fifo_def(ABT_pool_access access, ABT_pool_def *p_def) +{ + int ret; + + /* a blocking fifo pool is a normal fifo pool with some function + * pointers replaced to add additional blocking capability atop the normal + * functions. + */ + ret = ABTI_pool_get_fifo_def(access, p_def); + if(ret == ABT_SUCCESS) { + p_def->kind = ABT_POOL_BLOCKING_FIFO; + p_def->p_remove = pool_remove_shared; + p_def->p_pop = pool_pop_shared_blocking; + p_def->p_push = pool_push_shared_blocking; + p_def->p_init = pool_init_blocking; + p_def->p_free = pool_free_blocking; + } + + return ret; +} /* Pool functions */ @@ -134,6 +143,22 @@ int pool_init(ABT_pool pool, ABT_pool_config config) return abt_errno; } +int pool_init_blocking(ABT_pool pool, ABT_pool_config config) +{ + int ret; + + ret = pool_init(pool, config); + if(ret == ABT_SUCCESS) { + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + void *data = ABTI_pool_get_data(p_pool); + data_t *p_data = pool_get_data_ptr(data); + pthread_mutex_init(&p_data->blocking_mutex, NULL); + pthread_cond_init(&p_data->blocking_cond, NULL); + } + + return ret; +} + static int pool_free(ABT_pool pool) { int abt_errno = ABT_SUCCESS; @@ -152,6 +177,18 @@ static int pool_free(ABT_pool pool) return abt_errno; } +static int pool_free_blocking(ABT_pool pool) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + void *data = ABTI_pool_get_data(p_pool); + data_t *p_data = pool_get_data_ptr(data); + + pthread_mutex_destroy(&p_data->blocking_mutex); + pthread_cond_destroy(&p_data->blocking_cond); + + return(pool_free(pool)); +} + static size_t pool_get_size(ABT_pool pool) { ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); @@ -188,6 +225,21 @@ static void pool_push_shared(ABT_pool pool, ABT_unit unit) ABTI_spinlock_release(&p_data->mutex); } +static void pool_push_shared_blocking(ABT_pool pool, ABT_unit unit) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + void *data = ABTI_pool_get_data(p_pool); + data_t *p_data = pool_get_data_ptr(data); + + /* hold additional pthread mutex and signal anyone who might be blocking + * on pop() + */ + pthread_mutex_lock(&p_data->blocking_mutex); + pool_push_shared(pool, unit); + pthread_cond_signal(&p_data->blocking_cond); + pthread_mutex_unlock(&p_data->blocking_mutex); +} + static void pool_push_private(ABT_pool pool, ABT_unit unit) { ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); @@ -246,6 +298,38 @@ static ABT_unit pool_pop_shared(ABT_pool pool) return h_unit; } +static ABT_unit pool_pop_shared_blocking(ABT_pool pool) +{ + ABT_unit unit; + int ret; + + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + void *data = ABTI_pool_get_data(p_pool); + data_t *p_data = pool_get_data_ptr(data); + + pthread_mutex_lock(&p_data->blocking_mutex); + unit = pool_pop_shared(pool); + if(unit == ABT_UNIT_NULL) { + /* timedwait with 100 ms timeout to give the scheduler a chance to + * wake up and check for events even if no work units arrive. We + * intentionally do not loop here. + */ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_nsec += 1e8; + if(ts.tv_nsec > 1e9) { + ts.tv_nsec -= 1e9; + ts.tv_sec++; + } + ret = pthread_cond_timedwait(&p_data->blocking_cond, &p_data->blocking_mutex, &ts); + if(ret != ETIMEDOUT) + unit = pool_pop_shared(pool); + } + pthread_mutex_unlock(&p_data->blocking_mutex); + + return(unit); +} + static ABT_unit pool_pop_private(ABT_pool pool) { ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); diff --git a/src/pool/pool.c b/src/pool/pool.c index 7b02931a7..46f9c5f52 100644 --- a/src/pool/pool.c +++ b/src/pool/pool.c @@ -36,6 +36,7 @@ int ABT_pool_create(ABT_pool_def *def, ABT_pool_config config, p_pool = (ABTI_pool *)ABTU_malloc(sizeof(ABTI_pool)); p_pool->access = def->access; + p_pool->kind = def->kind; p_pool->automatic = ABT_FALSE; p_pool->num_scheds = 0; #ifndef ABT_CONFIG_DISABLE_POOL_CONSUMER_CHECK @@ -109,6 +110,9 @@ int ABT_pool_create_basic(ABT_pool_kind kind, ABT_pool_access access, case ABT_POOL_FIFO: abt_errno = ABTI_pool_get_fifo_def(access, &def); break; + case ABT_POOL_BLOCKING_FIFO: + abt_errno = ABTI_pool_get_blocking_fifo_def(access, &def); + break; default: abt_errno = ABT_ERR_INV_POOL_KIND; break; diff --git a/src/sched/basic.c b/src/sched/basic.c index 5bb1ba8cd..f859fc14e 100644 --- a/src/sched/basic.c +++ b/src/sched/basic.c @@ -95,6 +95,8 @@ static void sched_run(ABT_sched sched) ABT_pool *pools; int i; CNT_DECL(run_cnt); + int n_blocking_pools = 0; + ABTI_pool *p_pool; ABTI_xstream *p_xstream = ABTI_local_get_xstream(); ABTI_sched *p_sched = ABTI_sched_get_ptr(sched); @@ -105,15 +107,42 @@ static void sched_run(ABT_sched sched) num_pools = p_data->num_pools; pools = p_data->pools; + /* check for the presence of blocking pools */ + for(i=0; i < num_pools; i++) { + ABT_pool pool = pools[i]; + p_pool = ABTI_pool_get_ptr(pool); + if(p_pool->kind == ABT_POOL_BLOCKING_FIFO) + n_blocking_pools++; + } + /* check configuration; if a blocking pools is in use then + * this scheduler can only be associated with one pool. Otherwise other + * pools could be starved. + */ + ABTI_ASSERT(!n_blocking_pools || num_pools < 2); + if(n_blocking_pools) { + /* set event_freq to 1 to avoid long delays in event processing if + * the pop() function blocks (note that it uses a timer to avoid + * indefinite sleeps; events will eventually be processed). + */ + p_data->event_freq = 1; + event_freq = 1; +#ifdef ABT_CONFIG_USE_SCHED_SLEEP + /* set sleep_time to 0 to prevent superflous sleeps; the pool will + * handle that if idle. + */ + p_data->sleep_time.tv_sec = 0; + p_data->sleep_time.tv_nsec = 0; +#endif + } + while (1) { CNT_INIT(run_cnt, 0); /* Execute one work unit from the scheduler's pool */ for (i = 0; i < num_pools; i++) { ABT_pool pool = pools[i]; - ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); - size_t size = ABTI_pool_get_size(p_pool); - if (size > 0) { + p_pool = ABTI_pool_get_ptr(pool); + if (num_pools == 1 || ABTI_pool_get_size(p_pool) > 0 ) { /* Pop one work unit */ ABT_unit unit = ABTI_pool_pop(p_pool); if (unit != ABT_UNIT_NULL) { diff --git a/src/sched/prio.c b/src/sched/prio.c index a5b17ee59..b66fdb7e4 100644 --- a/src/sched/prio.c +++ b/src/sched/prio.c @@ -95,8 +95,7 @@ static void sched_run(ABT_sched sched) for (i = 0; i < num_pools; i++) { ABT_pool pool = p_pools[i]; ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); - size_t size = ABTI_pool_get_size(p_pool); - if (size > 0) { + if (num_pools == 1 || ABTI_pool_get_size(p_pool) > 0) { ABT_unit unit = ABTI_pool_pop(p_pool); if (unit != ABT_UNIT_NULL) { ABTI_xstream_run_unit(p_xstream, unit, p_pool); diff --git a/src/sched/randws.c b/src/sched/randws.c index 0714279ee..581021a8f 100644 --- a/src/sched/randws.c +++ b/src/sched/randws.c @@ -82,8 +82,7 @@ static void sched_run(ABT_sched sched) /* Execute one work unit from the scheduler's pool */ ABT_pool pool = p_pools[0]; ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); - size_t size = ABTI_pool_get_size(p_pool); - if (size > 0) { + if (num_pools == 1 || ABTI_pool_get_size(p_pool) > 0) { unit = ABTI_pool_pop(p_pool); if (unit != ABT_UNIT_NULL) { ABTI_xstream_run_unit(p_xstream, unit, p_pool); @@ -94,7 +93,7 @@ static void sched_run(ABT_sched sched) target = (num_pools == 2) ? 1 : (rand_r(&seed) % (num_pools-1) + 1); pool = p_pools[target]; p_pool = ABTI_pool_get_ptr(pool); - size = ABTI_pool_get_size(p_pool); + size_t size = ABTI_pool_get_size(p_pool); if (size > 0) { unit = ABTI_pool_pop(p_pool); LOG_EVENT_POOL_POP(p_pool, unit); diff --git a/test/basic/Makefile.am b/test/basic/Makefile.am index b6e58e60d..84c2b59dd 100644 --- a/test/basic/Makefile.am +++ b/test/basic/Makefile.am @@ -31,6 +31,7 @@ TESTS = \ sched_prio \ sched_randws \ sched_set_main \ + pool_blocking_fifo \ sched_stack \ sched_config \ sched_user_ws \ @@ -96,6 +97,7 @@ sched_basic_SOURCES = sched_basic.c sched_prio_SOURCES = sched_prio.c sched_randws_SOURCES = sched_randws.c sched_set_main_SOURCES = sched_set_main.c +pool_blocking_fifo_SOURCES = pool_blocking_fifo.c sched_stack_SOURCES = sched_stack.c sched_config_SOURCES = sched_config.c sched_user_ws_SOURCES = sched_user_ws.c @@ -149,6 +151,7 @@ testing: ./sched_prio ./sched_randws ./sched_set_main + ./pool_blocking_fifo ./sched_stack ./sched_config ./sched_user_ws diff --git a/test/basic/pool_blocking_fifo.c b/test/basic/pool_blocking_fifo.c new file mode 100644 index 000000000..bc15fb785 --- /dev/null +++ b/test/basic/pool_blocking_fifo.c @@ -0,0 +1,153 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ +/* + * See COPYRIGHT in top-level directory. + */ + +/* This test program is based on sched_set_main, but the objective is swap + * out the default fifo pool with a blocking fifo pool. + */ + +#include +#include +#include "abt.h" +#include "abttest.h" + +#define DEFAULT_NUM_XSTREAMS 4 +#define DEFAULT_NUM_THREADS 4 + +static int num_threads = DEFAULT_NUM_THREADS; + +static void thread_hello(void *arg) +{ + ATS_UNUSED(arg); + int rank; + ABT_thread self; + ABT_thread_id id; + + ABT_xstream_self_rank(&rank); + ABT_thread_self(&self); + ABT_thread_get_id(self, &id); + + ATS_printf(1, "[U%lu:E%d] Hello, world!\n", id, rank); +} + +static void thread_func(void *arg) +{ + int i = (int)(size_t)arg; + int rank, ret; + ABT_xstream xstream; + ABT_thread self; + ABT_thread_id id; + ABT_sched sched; + ABT_pool pool; + ABT_thread *threads; + + ret = ABT_xstream_self(&xstream); + ATS_ERROR(ret, "ABT_xstream_self"); + ABT_xstream_get_rank(xstream, &rank); + + ret = ABT_thread_self(&self); + ATS_ERROR(ret, "ABT_thread_self"); + ABT_thread_get_id(self, &id); + + ATS_printf(1, "[U%lu:E%d] change the main scheduler\n", id, rank); + + /* create a blocking fifo pool */ + ret = ABT_pool_create_basic(ABT_POOL_BLOCKING_FIFO, + ABT_POOL_ACCESS_MPMC, ABT_TRUE, &pool); + ATS_ERROR(ret, "ABT_pool_create_basic"); + + /* Create a new scheduler */ + /* NOTE: Since we use ABT_sched_create_basic, the new scheduler and its + * associated pools will be freed automatically when it is not used anymore + * or the associated ES is terminated. */ + ret = ABT_sched_create_basic(ABT_SCHED_BASIC, 1, &pool, ABT_SCHED_CONFIG_NULL, + &sched); + ATS_ERROR(ret, "ABT_sched_create_basic"); + + /* Change the main scheduler */ + ret = ABT_xstream_set_main_sched(xstream, sched); + ATS_ERROR(ret, "ABT_xstream_set_main_sched"); + + /* Create ULTs for the new scheduler */ + threads = (ABT_thread *)malloc(sizeof(ABT_thread) * num_threads); + for (i = 0; i < num_threads; i++) { + ret = ABT_thread_create(pool, thread_hello, NULL, ABT_THREAD_ATTR_NULL, + &threads[i]); + ATS_ERROR(ret, "ABT_thread_create"); + } + for (i = 0; i < num_threads; i++) { + ret = ABT_thread_free(&threads[i]); + ATS_ERROR(ret, "ABT_thread_free"); + } + free(threads); +} + +int main(int argc, char *argv[]) +{ + size_t i; + int ret; + int num_xstreams = DEFAULT_NUM_XSTREAMS; + ABT_xstream *xstreams; + ABT_pool *pools; + ABT_thread *threads; + + /* Initialize */ + ATS_init(argc, argv); + + if (argc > 1) { + num_xstreams = ATS_get_arg_val(ATS_ARG_N_ES); + num_threads = ATS_get_arg_val(ATS_ARG_N_ULT); + } + ATS_printf(1, "num_xstreams=%d num_threads=%d\n", + num_xstreams, num_threads); + + xstreams = (ABT_xstream *)malloc(sizeof(ABT_xstream) * num_xstreams); + pools = (ABT_pool *)malloc(sizeof(ABT_pool) * num_xstreams); + threads = (ABT_thread *)malloc(sizeof(ABT_thread) * num_xstreams); + + /* Create Execution Streams */ + ret = ABT_xstream_self(&xstreams[0]); + ATS_ERROR(ret, "ABT_xstream_self"); + for (i = 1; i < num_xstreams; i++) { + ret = ABT_xstream_create(ABT_SCHED_NULL, &xstreams[i]); + ATS_ERROR(ret, "ABT_xstream_create"); + + /* Get the first associated pool */ + ret = ABT_xstream_get_main_pools(xstreams[i], 1, pools+i); + ATS_ERROR(ret, "ABT_xstream_get_main_pools"); + } + + /* Create ULTs */ + for (i = 1; i < num_xstreams; i++) { + ret = ABT_thread_create(pools[i], thread_func, (void *)i, + ABT_THREAD_ATTR_NULL, &threads[i]); + ATS_ERROR(ret, "ABT_thread_create"); + } + + thread_func((void *)0); + + /* Join and free all ULTs */ + for (i = 1; i < num_xstreams; i++) { + ret = ABT_thread_free(&threads[i]); + ATS_ERROR(ret, "ABT_thread_free"); + } + + /* Join and free ESs */ + for (i = 1; i < num_xstreams; i++) { + ret = ABT_xstream_join(xstreams[i]); + ATS_ERROR(ret, "ABT_xstream_join"); + ret = ABT_xstream_free(&xstreams[i]); + ATS_ERROR(ret, "ABT_xstream_free"); + } + + /* Finalize */ + ret = ATS_finalize(0); + + free(threads); + free(pools); + free(xstreams); + + return ret; +} +