Skip to content

Commit b415983

Browse files
pool: add thread_queue_t to remove duplication in FIFO and FIFO_WAIT
This patch removes duplication of FIFO implementations shared by FIFO and FIFO_WAIT.
1 parent be80564 commit b415983

File tree

4 files changed

+225
-255
lines changed

4 files changed

+225
-255
lines changed

src/pool/Makefile.mk

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ abt_sources += \
88
pool/fifo_wait.c \
99
pool/pool.c \
1010
pool/pool_config.c \
11-
pool/pool_user_def.c
11+
pool/pool_user_def.c \
12+
pool/thread_queue.h

src/pool/fifo.c

Lines changed: 38 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55

66
#include "abti.h"
7+
#include "thread_queue.h"
78
#include <time.h>
89

910
/* FIFO pool implementation */
@@ -43,12 +44,7 @@ static ABT_bool pool_unit_is_in_pool(ABT_unit unit);
4344

4445
struct data {
4546
ABTD_spinlock mutex;
46-
size_t num_threads;
47-
ABTI_thread *p_head;
48-
ABTI_thread *p_tail;
49-
/* If the pool is empty, pop() accesses only is_empty so that pop() does not
50-
* slow down a push operation. */
51-
ABTD_atomic_int is_empty; /* Whether the pool is empty or not. */
47+
thread_queue_t queue;
5248
};
5349
typedef struct data data_t;
5450

@@ -57,28 +53,6 @@ static inline data_t *pool_get_data_ptr(void *p_data)
5753
return (data_t *)p_data;
5854
}
5955

60-
ABTU_ret_err static inline int spinlock_acquire_if_not_empty(data_t *p_data)
61-
{
62-
if (ABTD_atomic_acquire_load_int(&p_data->is_empty)) {
63-
/* The pool is empty. Lock is not taken. */
64-
return 1;
65-
}
66-
while (ABTD_spinlock_try_acquire(&p_data->mutex)) {
67-
/* Lock acquisition failed. Check the size. */
68-
while (1) {
69-
if (ABTD_atomic_acquire_load_int(&p_data->is_empty)) {
70-
/* The pool becomes empty. Lock is not taken. */
71-
return 1;
72-
} else if (!ABTD_spinlock_is_locked(&p_data->mutex)) {
73-
/* Lock seems released. Let's try to take a lock again. */
74-
break;
75-
}
76-
}
77-
}
78-
/* Lock is acquired. */
79-
return 0;
80-
}
81-
8256
/* Obtain the FIFO pool definition according to the access type */
8357
ABTU_ret_err int
8458
ABTI_pool_get_fifo_def(ABT_pool_access access,
@@ -141,65 +115,36 @@ static int pool_init(ABT_pool pool, ABT_pool_config config)
141115
ABTI_CHECK_ERROR(abt_errno);
142116

143117
access = p_pool->access;
144-
145118
if (access != ABT_POOL_ACCESS_PRIV) {
146119
/* Initialize the mutex */
147120
ABTD_spinlock_clear(&p_data->mutex);
148121
}
149-
150-
p_data->num_threads = 0;
151-
p_data->p_head = NULL;
152-
p_data->p_tail = NULL;
153-
ABTD_atomic_relaxed_store_int(&p_data->is_empty, 1);
122+
thread_queue_init(&p_data->queue);
154123

155124
p_pool->data = p_data;
156-
157125
return abt_errno;
158126
}
159127

160128
static void pool_free(ABT_pool pool)
161129
{
162130
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
163131
data_t *p_data = pool_get_data_ptr(p_pool->data);
164-
132+
thread_queue_free(&p_data->queue);
165133
ABTU_free(p_data);
166134
}
167135

168136
static ABT_bool pool_is_empty(ABT_pool pool)
169137
{
170138
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
171139
data_t *p_data = pool_get_data_ptr(p_pool->data);
172-
return ABTD_atomic_acquire_load_int(&p_data->is_empty) ? ABT_TRUE
173-
: ABT_FALSE;
140+
return thread_queue_is_empty(&p_data->queue);
174141
}
175142

176143
static size_t pool_get_size(ABT_pool pool)
177144
{
178145
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
179146
data_t *p_data = pool_get_data_ptr(p_pool->data);
180-
return p_data->num_threads;
181-
}
182-
183-
static inline void pool_push_unsafe(data_t *p_data, ABTI_thread *p_thread)
184-
{
185-
if (p_data->num_threads == 0) {
186-
p_thread->p_prev = p_thread;
187-
p_thread->p_next = p_thread;
188-
p_data->p_head = p_thread;
189-
p_data->p_tail = p_thread;
190-
p_data->num_threads = 1;
191-
ABTD_atomic_release_store_int(&p_data->is_empty, 0);
192-
} else {
193-
ABTI_thread *p_head = p_data->p_head;
194-
ABTI_thread *p_tail = p_data->p_tail;
195-
p_tail->p_next = p_thread;
196-
p_head->p_prev = p_thread;
197-
p_thread->p_prev = p_tail;
198-
p_thread->p_next = p_head;
199-
p_data->p_tail = p_thread;
200-
p_data->num_threads++;
201-
}
202-
ABTD_atomic_release_store_int(&p_thread->is_in_pool, 1);
147+
return thread_queue_get_size(&p_data->queue);
203148
}
204149

205150
static void pool_push_shared(ABT_pool pool, ABT_unit unit,
@@ -210,7 +155,7 @@ static void pool_push_shared(ABT_pool pool, ABT_unit unit,
210155
data_t *p_data = pool_get_data_ptr(p_pool->data);
211156
ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
212157
ABTD_spinlock_acquire(&p_data->mutex);
213-
pool_push_unsafe(p_data, p_thread);
158+
thread_queue_push_tail(&p_data->queue, p_thread);
214159
ABTD_spinlock_release(&p_data->mutex);
215160
}
216161

@@ -221,7 +166,7 @@ static void pool_push_private(ABT_pool pool, ABT_unit unit,
221166
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
222167
data_t *p_data = pool_get_data_ptr(p_pool->data);
223168
ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
224-
pool_push_unsafe(p_data, p_thread);
169+
thread_queue_push_tail(&p_data->queue, p_thread);
225170
}
226171

227172
static void pool_push_many_shared(ABT_pool pool, const ABT_unit *units,
@@ -236,7 +181,7 @@ static void pool_push_many_shared(ABT_pool pool, const ABT_unit *units,
236181
for (i = 0; i < num_units; i++) {
237182
ABTI_thread *p_thread =
238183
ABTI_unit_get_thread_from_builtin_unit(units[i]);
239-
pool_push_unsafe(p_data, p_thread);
184+
thread_queue_push_tail(&p_data->queue, p_thread);
240185
}
241186
ABTD_spinlock_release(&p_data->mutex);
242187
}
@@ -252,32 +197,7 @@ static void pool_push_many_private(ABT_pool pool, const ABT_unit *units,
252197
for (i = 0; i < num_units; i++) {
253198
ABTI_thread *p_thread =
254199
ABTI_unit_get_thread_from_builtin_unit(units[i]);
255-
pool_push_unsafe(p_data, p_thread);
256-
}
257-
}
258-
259-
static inline ABT_thread pool_pop_unsafe(data_t *p_data)
260-
{
261-
if (p_data->num_threads > 0) {
262-
ABTI_thread *p_thread = p_data->p_head;
263-
if (p_data->num_threads == 1) {
264-
p_data->p_head = NULL;
265-
p_data->p_tail = NULL;
266-
p_data->num_threads = 0;
267-
ABTD_atomic_release_store_int(&p_data->is_empty, 1);
268-
} else {
269-
p_thread->p_prev->p_next = p_thread->p_next;
270-
p_thread->p_next->p_prev = p_thread->p_prev;
271-
p_data->p_head = p_thread->p_next;
272-
p_data->num_threads--;
273-
}
274-
275-
p_thread->p_prev = NULL;
276-
p_thread->p_next = NULL;
277-
ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
278-
return ABTI_thread_get_handle(p_thread);
279-
} else {
280-
return ABT_THREAD_NULL;
200+
thread_queue_push_tail(&p_data->queue, p_thread);
281201
}
282202
}
283203

@@ -289,11 +209,12 @@ static ABT_thread pool_pop_wait(ABT_pool pool, double time_secs,
289209
data_t *p_data = pool_get_data_ptr(p_pool->data);
290210
double time_start = 0.0;
291211
while (1) {
292-
if (spinlock_acquire_if_not_empty(p_data) == 0) {
293-
ABT_thread thread = pool_pop_unsafe(p_data);
212+
if (thread_queue_acquire_spinlock_if_not_empty(&p_data->queue,
213+
&p_data->mutex) == 0) {
214+
ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
294215
ABTD_spinlock_release(&p_data->mutex);
295-
if (thread != ABT_THREAD_NULL)
296-
return thread;
216+
if (p_thread)
217+
return ABTI_thread_get_handle(p_thread);
297218
}
298219
if (time_start == 0.0) {
299220
time_start = ABTI_get_wtime();
@@ -314,11 +235,12 @@ static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
314235
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
315236
data_t *p_data = pool_get_data_ptr(p_pool->data);
316237
while (1) {
317-
if (spinlock_acquire_if_not_empty(p_data) == 0) {
318-
ABT_thread thread = pool_pop_unsafe(p_data);
238+
if (thread_queue_acquire_spinlock_if_not_empty(&p_data->queue,
239+
&p_data->mutex) == 0) {
240+
ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
319241
ABTD_spinlock_release(&p_data->mutex);
320-
if (thread != ABT_THREAD_NULL) {
321-
return ABTI_unit_get_builtin_unit(ABTI_thread_get_ptr(thread));
242+
if (p_thread) {
243+
return ABTI_unit_get_builtin_unit(p_thread);
322244
}
323245
}
324246
const int sleep_nsecs = 100;
@@ -335,10 +257,11 @@ static ABT_thread pool_pop_shared(ABT_pool pool, ABT_pool_context context)
335257
(void)context;
336258
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
337259
data_t *p_data = pool_get_data_ptr(p_pool->data);
338-
if (spinlock_acquire_if_not_empty(p_data) == 0) {
339-
ABT_thread thread = pool_pop_unsafe(p_data);
260+
if (thread_queue_acquire_spinlock_if_not_empty(&p_data->queue,
261+
&p_data->mutex) == 0) {
262+
ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
340263
ABTD_spinlock_release(&p_data->mutex);
341-
return thread;
264+
return ABTI_thread_get_handle(p_thread);
342265
} else {
343266
return ABT_THREAD_NULL;
344267
}
@@ -349,7 +272,8 @@ static ABT_thread pool_pop_private(ABT_pool pool, ABT_pool_context context)
349272
(void)context;
350273
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
351274
data_t *p_data = pool_get_data_ptr(p_pool->data);
352-
return pool_pop_unsafe(p_data);
275+
ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
276+
return ABTI_thread_get_handle(p_thread);
353277
}
354278

355279
static void pool_pop_many_shared(ABT_pool pool, ABT_thread *threads,
@@ -359,13 +283,15 @@ static void pool_pop_many_shared(ABT_pool pool, ABT_thread *threads,
359283
(void)context;
360284
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
361285
data_t *p_data = pool_get_data_ptr(p_pool->data);
362-
if (max_threads != 0 && spinlock_acquire_if_not_empty(p_data) == 0) {
286+
if (max_threads != 0 &&
287+
thread_queue_acquire_spinlock_if_not_empty(&p_data->queue,
288+
&p_data->mutex) == 0) {
363289
size_t i;
364290
for (i = 0; i < max_threads; i++) {
365-
ABT_thread thread = pool_pop_unsafe(p_data);
366-
if (thread == ABT_THREAD_NULL)
291+
ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
292+
if (!p_thread)
367293
break;
368-
threads[i] = thread;
294+
threads[i] = ABTI_thread_get_handle(p_thread);
369295
}
370296
*num_popped = i;
371297
ABTD_spinlock_release(&p_data->mutex);
@@ -383,48 +309,21 @@ static void pool_pop_many_private(ABT_pool pool, ABT_thread *threads,
383309
data_t *p_data = pool_get_data_ptr(p_pool->data);
384310
size_t i;
385311
for (i = 0; i < max_threads; i++) {
386-
ABT_thread thread = pool_pop_unsafe(p_data);
387-
if (thread == ABT_THREAD_NULL)
312+
ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
313+
if (!p_thread)
388314
break;
389-
threads[i] = thread;
315+
threads[i] = ABTI_thread_get_handle(p_thread);
390316
}
391317
*num_popped = i;
392318
}
393319

394-
static inline int pool_remove_unsafe(data_t *p_data, ABTI_thread *p_thread)
395-
{
396-
ABTI_CHECK_TRUE(p_data->num_threads != 0, ABT_ERR_POOL);
397-
ABTI_CHECK_TRUE(ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) == 1,
398-
ABT_ERR_POOL);
399-
400-
if (p_data->num_threads == 1) {
401-
p_data->p_head = NULL;
402-
p_data->p_tail = NULL;
403-
p_data->num_threads = 0;
404-
ABTD_atomic_release_store_int(&p_data->is_empty, 1);
405-
} else {
406-
p_thread->p_prev->p_next = p_thread->p_next;
407-
p_thread->p_next->p_prev = p_thread->p_prev;
408-
if (p_thread == p_data->p_head) {
409-
p_data->p_head = p_thread->p_next;
410-
} else if (p_thread == p_data->p_tail) {
411-
p_data->p_tail = p_thread->p_prev;
412-
}
413-
p_data->num_threads--;
414-
}
415-
ABTD_atomic_release_store_int(&p_thread->is_in_pool, 0);
416-
p_thread->p_prev = NULL;
417-
p_thread->p_next = NULL;
418-
return ABT_SUCCESS;
419-
}
420-
421320
static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
422321
{
423322
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
424323
data_t *p_data = pool_get_data_ptr(p_pool->data);
425324
ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
426325
ABTD_spinlock_acquire(&p_data->mutex);
427-
int abt_errno = pool_remove_unsafe(p_data, p_thread);
326+
int abt_errno = thread_queue_remove(&p_data->queue, p_thread);
428327
ABTD_spinlock_release(&p_data->mutex);
429328
return abt_errno;
430329
}
@@ -434,7 +333,7 @@ static int pool_remove_private(ABT_pool pool, ABT_unit unit)
434333
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
435334
data_t *p_data = pool_get_data_ptr(p_pool->data);
436335
ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
437-
return pool_remove_unsafe(p_data, p_thread);
336+
return thread_queue_remove(&p_data->queue, p_thread);
438337
}
439338

440339
static void pool_print_all(ABT_pool pool, void *arg,
@@ -448,16 +347,7 @@ static void pool_print_all(ABT_pool pool, void *arg,
448347
if (access != ABT_POOL_ACCESS_PRIV) {
449348
ABTD_spinlock_acquire(&p_data->mutex);
450349
}
451-
452-
size_t num_threads = p_data->num_threads;
453-
ABTI_thread *p_thread = p_data->p_head;
454-
while (num_threads--) {
455-
ABTI_ASSERT(p_thread);
456-
ABT_thread thread = ABTI_thread_get_handle(p_thread);
457-
print_fn(arg, thread);
458-
p_thread = p_thread->p_next;
459-
}
460-
350+
thread_queue_print_all(&p_data->queue, arg, print_fn);
461351
if (access != ABT_POOL_ACCESS_PRIV) {
462352
ABTD_spinlock_release(&p_data->mutex);
463353
}

0 commit comments

Comments
 (0)