diff --git a/src/pool/fifo.c b/src/pool/fifo.c index 0f85fcda..7f852123 100644 --- a/src/pool/fifo.c +++ b/src/pool/fifo.c @@ -32,6 +32,9 @@ struct data { size_t num_threads; ABTI_thread *p_head; ABTI_thread *p_tail; + /* If the pool is empty, pop() accesses only is_empty so that pop() does not + * slow down a push operation. */ + ABTD_atomic_int is_empty; /* Whether the pool is empty or not. */ }; typedef struct data data_t; @@ -40,6 +43,28 @@ static inline data_t *pool_get_data_ptr(void *p_data) return (data_t *)p_data; } +ABTU_ret_err static inline int spinlock_acquire_if_not_empty(data_t *p_data) +{ + if (ABTD_atomic_acquire_load_int(&p_data->is_empty)) { + /* The pool is empty. Lock is not taken. */ + return 1; + } + while (ABTD_spinlock_try_acquire(&p_data->mutex)) { + /* Lock acquisition failed. Check the size. */ + while (1) { + if (ABTD_atomic_acquire_load_int(&p_data->is_empty)) { + /* The pool becomes empty. Lock is not taken. */ + return 1; + } else if (!ABTD_spinlock_is_locked(&p_data->mutex)) { + /* Lock seems released. Let's try to take a lock again. */ + break; + } + } + } + /* Lock is acquired. */ + return 0; +} + /* Obtain the FIFO pool definition according to the access type */ ABTU_ret_err int ABTI_pool_get_fifo_def(ABT_pool_access access, ABTI_pool_def *p_def) @@ -105,6 +130,7 @@ static int pool_init(ABT_pool pool, ABT_pool_config config) p_data->num_threads = 0; p_data->p_head = NULL; p_data->p_tail = NULL; + ABTD_atomic_relaxed_store_int(&p_data->is_empty, 1); p_pool->data = p_data; @@ -141,6 +167,8 @@ static void pool_push_shared(ABT_pool pool, ABT_unit unit) p_thread->p_next = p_thread; p_data->p_head = p_thread; p_data->p_tail = p_thread; + p_data->num_threads = 1; + ABTD_atomic_release_store_int(&p_data->is_empty, 0); } else { ABTI_thread *p_head = p_data->p_head; ABTI_thread *p_tail = p_data->p_tail; @@ -149,8 +177,8 @@ static void pool_push_shared(ABT_pool pool, ABT_unit unit) p_thread->p_prev = p_tail; p_thread->p_next = p_head; p_data->p_tail = p_thread; + p_data->num_threads++; } - p_data->num_threads++; ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1); ABTD_spinlock_release(&p_data->mutex); @@ -167,6 +195,8 @@ static void pool_push_private(ABT_pool pool, ABT_unit unit) p_thread->p_next = p_thread; p_data->p_head = p_thread; p_data->p_tail = p_thread; + p_data->num_threads = 1; + ABTD_atomic_release_store_int(&p_data->is_empty, 0); } else { ABTI_thread *p_head = p_data->p_head; ABTI_thread *p_tail = p_data->p_tail; @@ -175,8 +205,8 @@ static void pool_push_private(ABT_pool pool, ABT_unit unit) p_thread->p_prev = p_tail; p_thread->p_next = p_head; p_data->p_tail = p_thread; + p_data->num_threads++; } - p_data->num_threads++; ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1); } @@ -186,120 +216,125 @@ static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs) ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); data_t *p_data = pool_get_data_ptr(p_pool->data); ABTI_thread *p_thread = NULL; - ABT_unit h_unit = ABT_UNIT_NULL; double time_start = 0.0; - do { - ABTD_spinlock_acquire(&p_data->mutex); - if (p_data->num_threads > 0) { - p_thread = p_data->p_head; - if (p_data->num_threads == 1) { - p_data->p_head = NULL; - p_data->p_tail = NULL; - } else { - p_thread->p_prev->p_next = p_thread->p_next; - p_thread->p_next->p_prev = p_thread->p_prev; - p_data->p_head = p_thread->p_next; + while (1) { + if (spinlock_acquire_if_not_empty(p_data) == 0) { + ABT_unit h_unit = ABT_UNIT_NULL; + if (p_data->num_threads > 0) { + p_thread = p_data->p_head; + if (p_data->num_threads == 1) { + p_data->p_head = NULL; + p_data->p_tail = NULL; + p_data->num_threads = 0; + ABTD_atomic_release_store_int(&p_data->is_empty, 1); + } else { + p_thread->p_prev->p_next = p_thread->p_next; + p_thread->p_next->p_prev = p_thread->p_prev; + p_data->p_head = p_thread->p_next; + p_data->num_threads--; + } + + p_thread->p_prev = NULL; + p_thread->p_next = NULL; + ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0); + + h_unit = (ABT_unit)p_thread; } - p_data->num_threads--; - - p_thread->p_prev = NULL; - p_thread->p_next = NULL; - ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0); - - h_unit = (ABT_unit)p_thread; ABTD_spinlock_release(&p_data->mutex); + if (h_unit != ABT_UNIT_NULL) + return h_unit; + } + if (time_start == 0.0) { + time_start = ABTI_get_wtime(); } else { - ABTD_spinlock_release(&p_data->mutex); - if (time_start == 0.0) { - time_start = ABTI_get_wtime(); - } else { - double elapsed = ABTI_get_wtime() - time_start; - if (elapsed > time_secs) - break; + double elapsed = ABTI_get_wtime() - time_start; + if (elapsed > time_secs) + return ABT_UNIT_NULL; + } + /* Sleep. */ + const int sleep_nsecs = 100; + struct timespec ts = { 0, sleep_nsecs }; + nanosleep(&ts, NULL); + } +} + +static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs) +{ + ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); + data_t *p_data = pool_get_data_ptr(p_pool->data); + ABTI_thread *p_thread = NULL; + + while (1) { + if (spinlock_acquire_if_not_empty(p_data) == 0) { + ABT_unit h_unit = ABT_UNIT_NULL; + if (p_data->num_threads > 0) { + p_thread = p_data->p_head; + if (p_data->num_threads == 1) { + p_data->p_head = NULL; + p_data->p_tail = NULL; + p_data->num_threads = 0; + ABTD_atomic_release_store_int(&p_data->is_empty, 1); + } else { + p_thread->p_prev->p_next = p_thread->p_next; + p_thread->p_next->p_prev = p_thread->p_prev; + p_data->p_head = p_thread->p_next; + p_data->num_threads--; + } + + p_thread->p_prev = NULL; + p_thread->p_next = NULL; + ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0); + + h_unit = (ABT_unit)p_thread; } - /* Sleep. */ - const int sleep_nsecs = 100; - struct timespec ts = { 0, sleep_nsecs }; - nanosleep(&ts, NULL); + ABTD_spinlock_release(&p_data->mutex); + if (h_unit != ABT_UNIT_NULL) + return h_unit; } - } while (h_unit == ABT_UNIT_NULL); + const int sleep_nsecs = 100; + struct timespec ts = { 0, sleep_nsecs }; + nanosleep(&ts, NULL); - return h_unit; + if (ABTI_get_wtime() > abstime_secs) + return ABT_UNIT_NULL; + } } -static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs) +static ABT_unit pool_pop_shared(ABT_pool pool) { ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); data_t *p_data = pool_get_data_ptr(p_pool->data); ABTI_thread *p_thread = NULL; - ABT_unit h_unit = ABT_UNIT_NULL; - do { - ABTD_spinlock_acquire(&p_data->mutex); + if (spinlock_acquire_if_not_empty(p_data) == 0) { + ABT_unit h_unit = ABT_UNIT_NULL; if (p_data->num_threads > 0) { p_thread = p_data->p_head; if (p_data->num_threads == 1) { p_data->p_head = NULL; p_data->p_tail = NULL; + p_data->num_threads = 0; + ABTD_atomic_release_store_int(&p_data->is_empty, 1); } else { p_thread->p_prev->p_next = p_thread->p_next; p_thread->p_next->p_prev = p_thread->p_prev; p_data->p_head = p_thread->p_next; + p_data->num_threads--; } - p_data->num_threads--; p_thread->p_prev = NULL; p_thread->p_next = NULL; ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0); h_unit = (ABT_unit)p_thread; - ABTD_spinlock_release(&p_data->mutex); - } else { - ABTD_spinlock_release(&p_data->mutex); - /* Sleep. */ - const int sleep_nsecs = 100; - struct timespec ts = { 0, sleep_nsecs }; - nanosleep(&ts, NULL); - - if (ABTI_get_wtime() > abstime_secs) - break; - } - } while (h_unit == ABT_UNIT_NULL); - - return h_unit; -} - -static ABT_unit pool_pop_shared(ABT_pool pool) -{ - ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); - data_t *p_data = pool_get_data_ptr(p_pool->data); - ABTI_thread *p_thread = NULL; - ABT_unit h_unit = ABT_UNIT_NULL; - - ABTD_spinlock_acquire(&p_data->mutex); - if (p_data->num_threads > 0) { - p_thread = p_data->p_head; - if (p_data->num_threads == 1) { - p_data->p_head = NULL; - p_data->p_tail = NULL; - } else { - p_thread->p_prev->p_next = p_thread->p_next; - p_thread->p_next->p_prev = p_thread->p_prev; - p_data->p_head = p_thread->p_next; } - p_data->num_threads--; - - p_thread->p_prev = NULL; - p_thread->p_next = NULL; - ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0); - - h_unit = (ABT_unit)p_thread; + ABTD_spinlock_release(&p_data->mutex); + return h_unit; + } else { + return ABT_UNIT_NULL; } - ABTD_spinlock_release(&p_data->mutex); - - return h_unit; } static ABT_unit pool_pop_private(ABT_pool pool) @@ -307,19 +342,21 @@ static ABT_unit pool_pop_private(ABT_pool pool) ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); data_t *p_data = pool_get_data_ptr(p_pool->data); ABTI_thread *p_thread = NULL; - ABT_unit h_unit = ABT_UNIT_NULL; + ABT_unit h_unit = ABT_UNIT_NULL; if (p_data->num_threads > 0) { p_thread = p_data->p_head; if (p_data->num_threads == 1) { p_data->p_head = NULL; p_data->p_tail = NULL; + p_data->num_threads = 0; + ABTD_atomic_relaxed_store_int(&p_data->is_empty, 1); } else { p_thread->p_prev->p_next = p_thread->p_next; p_thread->p_next->p_prev = p_thread->p_prev; p_data->p_head = p_thread->p_next; + p_data->num_threads--; } - p_data->num_threads--; p_thread->p_prev = NULL; p_thread->p_next = NULL; @@ -327,7 +364,6 @@ static ABT_unit pool_pop_private(ABT_pool pool) h_unit = (ABT_unit)p_thread; } - return h_unit; } @@ -345,6 +381,8 @@ static int pool_remove_shared(ABT_pool pool, ABT_unit unit) if (p_data->num_threads == 1) { p_data->p_head = NULL; p_data->p_tail = NULL; + p_data->num_threads = 0; + ABTD_atomic_release_store_int(&p_data->is_empty, 1); } else { p_thread->p_prev->p_next = p_thread->p_next; p_thread->p_next->p_prev = p_thread->p_prev; @@ -353,8 +391,8 @@ static int pool_remove_shared(ABT_pool pool, ABT_unit unit) } else if (p_thread == p_data->p_tail) { p_data->p_tail = p_thread->p_prev; } + p_data->num_threads--; } - p_data->num_threads--; ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0); ABTD_spinlock_release(&p_data->mutex); @@ -378,6 +416,8 @@ static int pool_remove_private(ABT_pool pool, ABT_unit unit) if (p_data->num_threads == 1) { p_data->p_head = NULL; p_data->p_tail = NULL; + p_data->num_threads = 0; + ABTD_atomic_relaxed_store_int(&p_data->is_empty, 1); } else { p_thread->p_prev->p_next = p_thread->p_next; p_thread->p_next->p_prev = p_thread->p_prev; @@ -386,8 +426,8 @@ static int pool_remove_private(ABT_pool pool, ABT_unit unit) } else if (p_thread == p_data->p_tail) { p_data->p_tail = p_thread->p_prev; } + p_data->num_threads--; } - p_data->num_threads--; ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0); p_thread->p_prev = NULL; diff --git a/src/pool/fifo_wait.c b/src/pool/fifo_wait.c index 7fd62c65..99b4acff 100644 --- a/src/pool/fifo_wait.c +++ b/src/pool/fifo_wait.c @@ -28,6 +28,7 @@ struct data { size_t num_threads; ABTI_thread *p_head; ABTI_thread *p_tail; + ABTD_atomic_int is_empty; /* Whether the pool is empty or not. */ }; typedef struct data data_t; @@ -84,6 +85,7 @@ static int pool_init(ABT_pool pool, ABT_pool_config config) p_data->num_threads = 0; p_data->p_head = NULL; p_data->p_tail = NULL; + ABTD_atomic_relaxed_store_int(&p_data->is_empty, 1); p_pool->data = p_data; @@ -122,6 +124,8 @@ static void pool_push(ABT_pool pool, ABT_unit unit) p_thread->p_next = p_thread; p_data->p_head = p_thread; p_data->p_tail = p_thread; + p_data->num_threads = 1; + ABTD_atomic_release_store_int(&p_data->is_empty, 0); } else { ABTI_thread *p_head = p_data->p_head; ABTI_thread *p_tail = p_data->p_tail; @@ -130,8 +134,8 @@ static void pool_push(ABT_pool pool, ABT_unit unit) p_thread->p_prev = p_tail; p_thread->p_next = p_head; p_data->p_tail = p_thread; + p_data->num_threads++; } - p_data->num_threads++; ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1); pthread_cond_signal(&p_data->cond); @@ -179,12 +183,14 @@ static ABT_unit pool_pop_wait(ABT_pool pool, double time_secs) if (p_data->num_threads == 1) { p_data->p_head = NULL; p_data->p_tail = NULL; + p_data->num_threads = 0; + ABTD_atomic_release_store_int(&p_data->is_empty, 1); } else { p_thread->p_prev->p_next = p_thread->p_next; p_thread->p_next->p_prev = p_thread->p_prev; p_data->p_head = p_thread->p_next; + p_data->num_threads--; } - p_data->num_threads--; p_thread->p_prev = NULL; p_thread->p_next = NULL; @@ -224,12 +230,14 @@ static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs) if (p_data->num_threads == 1) { p_data->p_head = NULL; p_data->p_tail = NULL; + p_data->num_threads = 0; + ABTD_atomic_release_store_int(&p_data->is_empty, 1); } else { p_thread->p_prev->p_next = p_thread->p_next; p_thread->p_next->p_prev = p_thread->p_prev; p_data->p_head = p_thread->p_next; + p_data->num_threads--; } - p_data->num_threads--; p_thread->p_prev = NULL; p_thread->p_next = NULL; @@ -247,30 +255,35 @@ static ABT_unit pool_pop(ABT_pool pool) ABTI_pool *p_pool = ABTI_pool_get_ptr(pool); data_t *p_data = pool_get_data_ptr(p_pool->data); ABTI_thread *p_thread = NULL; - ABT_unit h_unit = ABT_UNIT_NULL; - pthread_mutex_lock(&p_data->mutex); - if (p_data->num_threads > 0) { - p_thread = p_data->p_head; - if (p_data->num_threads == 1) { - p_data->p_head = NULL; - p_data->p_tail = NULL; - } else { - p_thread->p_prev->p_next = p_thread->p_next; - p_thread->p_next->p_prev = p_thread->p_prev; - p_data->p_head = p_thread->p_next; + if (ABTD_atomic_acquire_load_int(&p_data->is_empty) == 0) { + ABT_unit h_unit = ABT_UNIT_NULL; + pthread_mutex_lock(&p_data->mutex); + if (p_data->num_threads > 0) { + p_thread = p_data->p_head; + if (p_data->num_threads == 1) { + p_data->p_head = NULL; + p_data->p_tail = NULL; + p_data->num_threads = 0; + ABTD_atomic_release_store_int(&p_data->is_empty, 1); + } else { + p_thread->p_prev->p_next = p_thread->p_next; + p_thread->p_next->p_prev = p_thread->p_prev; + p_data->p_head = p_thread->p_next; + p_data->num_threads--; + } + + p_thread->p_prev = NULL; + p_thread->p_next = NULL; + ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0); + + h_unit = (ABT_unit)p_thread; } - p_data->num_threads--; - - p_thread->p_prev = NULL; - p_thread->p_next = NULL; - ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0); - - h_unit = (ABT_unit)p_thread; + pthread_mutex_unlock(&p_data->mutex); + return h_unit; + } else { + return ABT_UNIT_NULL; } - pthread_mutex_unlock(&p_data->mutex); - - return h_unit; } static int pool_remove(ABT_pool pool, ABT_unit unit) @@ -287,6 +300,8 @@ static int pool_remove(ABT_pool pool, ABT_unit unit) if (p_data->num_threads == 1) { p_data->p_head = NULL; p_data->p_tail = NULL; + p_data->num_threads = 0; + ABTD_atomic_release_store_int(&p_data->is_empty, 1); } else { p_thread->p_prev->p_next = p_thread->p_next; p_thread->p_next->p_prev = p_thread->p_prev; @@ -295,8 +310,8 @@ static int pool_remove(ABT_pool pool, ABT_unit unit) } else if (p_thread == p_data->p_tail) { p_data->p_tail = p_thread->p_prev; } + p_data->num_threads--; } - p_data->num_threads--; ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0); pthread_mutex_unlock(&p_data->mutex);