Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature request] ABT_pool_create_unit() and ABT_pool_free_unit() #378

Open
mdorier opened this issue Nov 15, 2022 · 8 comments
Open

[feature request] ABT_pool_create_unit() and ABT_pool_free_unit() #378

mdorier opened this issue Nov 15, 2022 · 8 comments

Comments

@mdorier
Copy link
Contributor

mdorier commented Nov 15, 2022

I am trying to write an ABT_pool_user_def pool definition for a pool that redirects its calls to another ABT_pool (kind of a "wrapper" pool). I have multiple use-cases for that (including a pool that logs the timings of ULT it sees being pushed and popped). The problem is, I can't wrap all the functionalities of the inner pool without having access to more information about it.

Example:

/* Function I can wrap with the currently provided API */
ABT_bool pool_wrapper_is_empty_fn(ABT_pool p)
{
     ABT_pool inner = ... /* get inner pool using ABT_pool_get_data on p */
     ABT_bool result;
     ABT_pool_is_empty(inner, &result);
     return result;
}

/* Function I cannot wrap with the currently provided API */
ABT_unit pool_wrapper_create_unit_fn(ABT_pool p, ABT_thread t)
{
    ABT_pool inner = ... /* get inner pool using ABT_pool_get_data on p */
    /* there is no ABT_pool_create_unit() */
}

I think all I need is for Argobots to provide public ABT_pool_create_unit() and ABT_pool_free_unit() that would call the underlying pool definition's function pointers.

Alternatively, this could be achieved with an ABT_pool_get_user_def() that would give access to the pool's ABT_pool_user_def, and some ABT_pool_user_def_get_*() function to retrieve the function pointers from the pool definition.

@mdorier
Copy link
Contributor Author

mdorier commented Nov 15, 2022

Actually thinking a bit more about it, I think I could just have ABT_unit = ABT_thread in the context of the wrapper pool, since I don't have to attach any information to the thread. My pool_wrapper_create_unit_fn would simply do a cast, and my pool_wrapper_free_unit_fn would be empty...

@shintaro-iwasaki
Copy link
Collaborator

This is a very good point. I don't think nested ABT_pool (or a ABT_pool wrapper using ABT_pool_user_def) should be allowed within the current Argobots mechanism because ABT_unit (or ABT_thread) must be associated with only one ABT_pool ... as far as I remember.

  1. ABT_pool_push() that pushes thread1 to pool1, for example, internally changes thread1's associated pool to pool1 and call its user-defined p_pool_push().
  2. The user-defined p_pool_push() internally calls ABT_pool_push() that pushes thread1 to a child pool, say pool2. Internally, thread1's associated pool will be changed from pool1 to `pool12
  3. When ABT_pool_pop() tries to pop thread1 from pool1, thread1 is associated with pool2. This can cause some issues.

In reality, maybe you don't see an issue particularly if you don't enable extra assertion (e.g., undefined-behavior assert etc), but I wouldn't recommend it.

Note: fixing this would need pool management change, so allowing nested pools might not be trivial

@mdorier
Copy link
Contributor Author

mdorier commented Nov 16, 2022

I think I see your point about threads being associated with a pool, but I fail to pinpoint exactly where things would fall apart. Here is a sample of what I have so far.

typedef struct wrapper_pool_t {
    ABT_pool pool;
} wrapper_pool_t;

static ABT_unit wrapper_pool_create_unit(ABT_pool wrapper, ABT_thread thread)
{
    (void)wrapper;
    return (ABT_unit)thread;
}

static void wrapper_pool_free_unit(ABT_pool wrapper, ABT_unit unit)
{
    (void)wrapper;
    (void)unit;
}

static ABT_bool wrapper_pool_is_empty(ABT_pool wrapper)
{
    wrapper_pool_t* data = NULL;
    ABT_pool_get_data(wrapper, (void**)&data);
    ABT_bool is_empty = ABT_FALSE;
    ABT_pool_is_empty(data->pool, &is_empty);
    return is_empty;
}

static ABT_thread wrapper_pool_pop(ABT_pool wrapper, ABT_pool_context context)
{
    wrapper_pool_t* data = NULL;
    ABT_pool_get_data(wrapper, (void**)&data);
    ABT_thread thread = ABT_THREAD_NULL;
    ABT_pool_pop_thread_ex(data->pool, &thread, context);
    ABT_thread_set_associated_pool(thread, wrapper);
    return thread;
}

static void wrapper_pool_push(ABT_pool wrapper, ABT_unit unit, ABT_pool_context context)
{
    wrapper_pool_t* data = NULL;
    ABT_pool_get_data(wrapper, (void**)&data);
    ABT_thread_set_associated_pool(thread, data->pool);
    ABT_pool_push_thread_ex(data->pool, (ABT_thread)unit, context);
}

My pop function re-associates the thread with the wrapper pool, using ABT_thread_set_associated_pool. What I don't understand yet is the role of the create_unit and free_unit functions, and when exactly they are called. Do I understand correctly that the following happens:

  • When ABT_thread_create is called, an ABT_thread handle is created and the pools' create_unit function is called to create a unit to be associated with the thread, then the pool's push function is called with that unit as parameter;
  • When a scheduler pops from the pool, the pools' pop function is called, returning an ABT_thread, then the pool's free_unit function is called on the unit associated with the thread;
  • When the scheduler pushes the thread back in the pool, an ABT_unit has to be re-created, so the pool's create_unit is called again, followed by the pool's push.
  • I'm not sure how ABT_thread_set_associated_pool works, in particular whether it works on a thread that's in the pool, or only threads that are running, ...

EDIT: reading more of the code, it looks like (1) ABT_pool_pop_thread_ex does NOT free the ABT_unit associated with the ABT_thread, (2) ABT_pool_push_thread_ex will not create a unit, and (2) ABT_thread_set_associated_pool will call free_unit on the old unit associated with the pool and create_unit to create an ABT_unit for the new pool (assuming none of the pools are builtin). So what will happen in my code is that I'll have the allocation of an ABT_unit in Argobots every time I push into the child pool, and a deallocation every time I pop.

EDIT 2: if the child pool is a builtin pool, then it looks like there won't be any allocation/deallocation:

static inline ABTI_thread *ABTI_unit_get_thread_from_builtin_unit(ABT_unit unit)
{
ABTI_ASSERT(ABTI_unit_is_builtin(unit));
return (ABTI_thread *)(((uintptr_t)unit) & (~ABTI_UNIT_BUILTIN_POOL_BIT));
}
static inline ABTI_thread *ABTI_unit_get_thread(ABTI_global *p_global,
ABT_unit unit)
{
if (ABTU_likely(ABTI_unit_is_builtin(unit))) {
/* This unit is associated with a built-in pool. */
return ABTI_unit_get_thread_from_builtin_unit(unit);
} else {
return ABTI_unit_get_thread_from_user_defined_unit(p_global, unit);
}
}

@mdorier
Copy link
Contributor Author

mdorier commented Nov 16, 2022

I have tested my code and it seems to be working fine. How can I enable the assertions you were talking about?

Here is my full code example, let me know if you see a scenario where it could fail:

/*
 * (C) 2022 The University of Chicago
 *
 * See COPYRIGHT in top-level directory.
 */

#include <abt.h>
#include <stdlib.h>

typedef struct wrapper_pool_t {
    ABT_pool pool;
} wrapper_pool_t;

static ABT_unit wrapper_pool_create_unit(ABT_pool wrapper, ABT_thread thread)
{
    printf("in %s\n", __func__);
    (void)wrapper;
    return (ABT_unit)thread;
}

static void wrapper_pool_free_unit(ABT_pool wrapper, ABT_unit unit)
{
    printf("in %s\n", __func__);
    (void)wrapper;
    (void)unit;
}

static ABT_bool wrapper_pool_is_empty(ABT_pool wrapper)
{
    printf("in %s\n", __func__);
    wrapper_pool_t* data = NULL;
    ABT_pool_get_data(wrapper, (void**)&data);
    ABT_bool is_empty = ABT_FALSE;
    ABT_pool_is_empty(data->pool, &is_empty);
    return is_empty;
}

static ABT_thread wrapper_pool_pop(ABT_pool wrapper, ABT_pool_context context)
{
    printf("in %s\n", __func__);
    wrapper_pool_t* data = NULL;
    ABT_pool_get_data(wrapper, (void**)&data);
    ABT_thread thread = ABT_THREAD_NULL;
    ABT_pool_pop_thread_ex(data->pool, &thread, context);
    ABT_thread_set_associated_pool(thread, wrapper);
    return thread;
}

static void wrapper_pool_push(ABT_pool wrapper, ABT_unit unit, ABT_pool_context context)
{
    printf("in %s\n", __func__);
    wrapper_pool_t* data = NULL;
    ABT_pool_get_data(wrapper, (void**)&data);
    ABT_pool_push_thread_ex(data->pool, (ABT_thread)unit, context);
}

static int wrapper_pool_init(ABT_pool wrapper, ABT_pool_config config)
{
    printf("in %s\n", __func__);
    wrapper_pool_t* data = (wrapper_pool_t*)calloc(1, sizeof(*data));
    ABT_pool_set_data(wrapper, (void*)data);
    ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPMC, ABT_FALSE, &(data->pool));
    ABT_SUCCESS;
}

static void wrapper_pool_free(ABT_pool wrapper)
{
    printf("in %s\n", __func__);
    wrapper_pool_t* data = NULL;
    ABT_pool_get_data(wrapper, (void**)&data);
    ABT_pool_free(&(data->pool));
    free(data);
}

static size_t wrapper_pool_get_size(ABT_pool wrapper)
{
    printf("in %s\n", __func__);
    wrapper_pool_t* data = NULL;
    ABT_pool_get_data(wrapper, (void**)&data);
    size_t size = 0;
    ABT_pool_get_size(data->pool, &size);
    return size;
}

static ABT_thread wrapper_pool_pop_wait(ABT_pool wrapper, double time_sec, ABT_pool_context context)
{
    printf("in %s\n", __func__);
    wrapper_pool_t* data = NULL;
    ABT_pool_get_data(wrapper, (void**)&data);
    ABT_thread thread = ABT_THREAD_NULL;
    ABT_pool_pop_wait_thread_ex(data->pool, &thread, time_sec, context);
    return thread;
}

static void wrapper_pool_pop_many(ABT_pool wrapper, ABT_thread* threads, size_t max, size_t* count, ABT_pool_context context)
{
    printf("in %s\n", __func__);
    wrapper_pool_t* data = NULL;
    ABT_pool_get_data(wrapper, (void**)&data);
    ABT_pool_pop_threads_ex(data->pool, threads, max, count, context);
}

static void wrapper_pool_push_many(ABT_pool wrapper, const ABT_unit* units, size_t num, ABT_pool_context context)
{
    printf("in %s\n", __func__);
    wrapper_pool_t* data = NULL;
    ABT_pool_get_data(wrapper, (void**)&data);
    ABT_pool_push_threads_ex(data->pool, (const ABT_thread*)units, num, context);
}

static void wrapper_pool_print_all(ABT_pool wrapper, void *arg, void(*print_fn)(void *, ABT_thread))
{
    printf("in %s\n", __func__);
    wrapper_pool_t* data = NULL;
    ABT_pool_get_data(wrapper, (void**)&data);
    ABT_pool_print_all(data->pool, arg, (void(*)(void *, ABT_unit))print_fn);
}

void create_wrapper_pool_def(ABT_pool_user_def* p_def)
{
    ABT_pool_user_def_create(
        wrapper_pool_create_unit,
        wrapper_pool_free_unit,
        wrapper_pool_is_empty,
        wrapper_pool_pop,
        wrapper_pool_push,
        p_def);
    ABT_pool_user_def_set_init(*p_def, wrapper_pool_init);
    ABT_pool_user_def_set_free(*p_def, wrapper_pool_free);
    ABT_pool_user_def_set_get_size(*p_def, wrapper_pool_get_size);
    ABT_pool_user_def_set_pop_wait(*p_def, wrapper_pool_pop_wait);
    ABT_pool_user_def_set_pop_many(*p_def, wrapper_pool_pop_many);
    ABT_pool_user_def_set_push_many(*p_def, wrapper_pool_push_many);
    ABT_pool_user_def_set_print_all(*p_def, wrapper_pool_print_all);
}

void free_wrapper_pool_def(ABT_pool_user_def def)
{
    ABT_pool_user_def_free(&def);
}

void f(void* args) {
    uint64_t x = (uint64_t)args;
    for(int i=0; i < 5; i++) {
        printf("f(%lu)\n", x);
        ABT_thread_yield();
    }
}

int main(int argc, char** argv) {
    ABT_init(argc, argv);

    ABT_pool_user_def pool_def;
    create_wrapper_pool_def(&pool_def);

    ABT_pool wrapper;
    ABT_pool_create(pool_def, ABT_POOL_CONFIG_NULL, &wrapper);

    ABT_xstream xstream = ABT_XSTREAM_NULL;
    ABT_xstream_create_basic(ABT_SCHED_DEFAULT, 1, &wrapper, ABT_SCHED_CONFIG_NULL, &xstream);

    ABT_thread threads[5];

    for(uint64_t i=0; i < 5; i++) {
        ABT_thread_create(wrapper, f, (void*)i, ABT_THREAD_ATTR_NULL, threads+i);
    }

    for(uint64_t i=0; i < 5; i++) {
        ABT_thread_join(threads[i]);
        ABT_thread_free(threads+i);
    }

    ABT_xstream_join(xstream);
    ABT_xstream_free(&xstream);

    free_wrapper_pool_def(pool_def);
    ABT_finalize();
}

@shintaro-iwasaki
Copy link
Collaborator

What I don't understand yet is the role of the create_unit and free_unit functions, and when exactly they are called. Do I understand correctly that the following happens:

Yes mostly correct. Argobots creates and frees units lazily, so they are called when ULT's associated pool is changed (for example, by ABT_thread_set_associated_pool), not when a ULT is popped ... as far as I remember.

EDIT 2: if the child pool is a builtin pool, then it looks like there won't be any allocation/deallocation:

Yes this is part of the optimizations. Basically ABT_unit == ABT_thread (though there exist some bit tricks as you see) for built-in pool implementations.

Here is my full code example, let me know if you see a scenario where it could fail

I cannot come up with exactly when, but more specifically:

  • It is undefined which pool ABT_thread_get_last_pool() or ABT_self_get_last_pool() would return (i.e., wrapper_pool or data->pool).
  • Similarly to the first case, it is undefined to which pool a ULT is pushed when ABT_self_yield() is called.

It'd be okay if it is working in your specific use case, but I would not highly recommend it since it's undefined.

How can I enable the assertions you were talking about?

To enable an UB assert, please use ./configure --enable-debug=most(see #335) but maybe this does not flag nested pools.

@mdorier
Copy link
Contributor Author

mdorier commented Nov 16, 2022

I'm looking at the code, and I'm under the impression that those functions (ABT_thread_get_last_pool, ABT_self_get_last_pool, and ABT_self_yield) would actually have a well defined behavior. The key is that I'm calling ABT_thread_set_associated_pool on the threads on their way in and out of the pool to maintain a correct information of which pool the thread is in. But maybe I'm missing something...

Note also: I simplified a bit my code above but in practice my wrapper pool would have a mutex protecting every operation, so that I ensure correct behavior in an MPMC setting).

Definition of ABTI_thread:

argobots/src/include/abti.h

Lines 422 to 437 in 915ff28

struct ABTI_thread {
ABTI_thread *p_prev;
ABTI_thread *p_next;
ABTD_atomic_int is_in_pool; /* Whether this thread is in a pool. */
ABTI_thread_type type; /* Thread type */
ABT_unit unit; /* Unit enclosing this thread */
ABTI_xstream *p_last_xstream; /* Last ES where it ran */
ABTI_thread *p_parent; /* Parent thread */
void (*f_thread)(void *); /* Thread function */
void *p_arg; /* Thread function argument */
ABTD_atomic_int state; /* State (ABT_thread_state) */
ABTD_atomic_uint32 request; /* Request */
ABTI_pool *p_pool; /* Associated pool */
ABTD_atomic_ptr p_keytable; /* Thread-specific data (ABTI_ktable *) */
ABT_unit_id id; /* ID */
};

Definition of ABT_thread_get_last_pool():

argobots/src/thread.c

Lines 1109 to 1119 in 915ff28

int ABT_thread_get_last_pool(ABT_thread thread, ABT_pool *pool)
{
ABTI_UB_ASSERT(ABTI_initialized());
ABTI_UB_ASSERT(pool);
ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
ABTI_CHECK_NULL_THREAD_PTR(p_thread);
*pool = ABTI_pool_get_handle(p_thread->p_pool);
return ABT_SUCCESS;
}

Definition of ABT_self_get_last_pool():

argobots/src/self.c

Lines 468 to 479 in 915ff28

int ABT_self_get_last_pool(ABT_pool *pool)
{
ABTI_UB_ASSERT(ABTI_initialized());
ABTI_UB_ASSERT(pool);
ABTI_xstream *p_local_xstream;
ABTI_SETUP_LOCAL_XSTREAM(&p_local_xstream);
ABTI_thread *p_self = p_local_xstream->p_thread;
ABTI_ASSERT(p_self->p_pool);
*pool = ABTI_pool_get_handle(p_self->p_pool);
return ABT_SUCCESS;
}

Definition of ABT_self_yield():

argobots/src/self.c

Lines 626 to 638 in 915ff28

int ABT_self_yield(void)
{
ABTI_UB_ASSERT(ABTI_initialized());
ABTI_xstream *p_local_xstream;
ABTI_ythread *p_ythread;
ABTI_SETUP_LOCAL_YTHREAD(&p_local_xstream, &p_ythread);
ABTI_ythread_yield(&p_local_xstream, p_ythread,
ABTI_YTHREAD_YIELD_KIND_USER, ABT_SYNC_EVENT_TYPE_USER,
NULL);
return ABT_SUCCESS;
}

The ABT_self_* functions have to be called by the ULT when the ULT is running, i.e. when it's not in the pool, and because of the call to ABT_thread_set_associated_pool in my wrapper pool's pop function, the p_pool pointer in the ABTI_thread will necessarily point to the wrapper pool.

ABT_thread_get_last_pool could be called in any context, and the result would depend on whether the thread is running (in which case the wrapper pool will be returned) or not (in which case it will be inside the child pool and this is what the function will return). This seems like a well-defined behavior.

I think the only way I could get an undefined behavior is if someone calls ABT_thread_get_last_pool at the exact same time my wrapper calls ABT_thread_set_associated_pool, more specifically at this exact line:

p_thread->p_pool = p_pool;
because it's not an atomic assignment. That said, this begs the question: when is ABT_thread_set_associated_pool actually safe to call? Because a custom scheduler, or any ULT, could just as well call this function to change the pool a thread is associated with.

@shintaro-iwasaki
Copy link
Collaborator

Sorry for the late reply. Maybe I used a word "undefined behavior" in a confusing way:

  • From the runtime developer's viewpoint, they are all classified into undefined behavior since API does not specify it and its behavior cannot be naturally deduced. Since it's undefined, the runtime might not work as intended (since it's unclear what can be natural intention). At least, I didn't assume this nested pool situation when I developed Argobots, so anything can happen. This is why I don't recommend it.

    • My "undefined behavior" means:
      • The API does not define which pool ABT_thread_get_last_pool() or ABT_self_get_last_pool() would return (i.e., wrapper_pool or data->pool).
      • The API does not define which pool a ULT is pushed when ABT_self_yield() is called.
  • If your "well-defined behavior" means "Argobots works without SEGV", perhaps it works if the user doesn't call ABT_thread_set_associated_pool() and the user doesn't care about to which pool a ULT is pushed when ABT_self_yield() is called. Argobots neither tests nor supports this behavior, so please test if it works in your workload.

when is ABT_thread_set_associated_pool actually safe to call? Because a custom scheduler, or any ULT, could just as well call this function to change the pool a thread is associated with.

ABT_thread_set_associated_pool() should be called when a ULT is "not" in a pool. Since a ULT is not in a pool, this ULT's pool won't be changed by another scheduler.

// pseudo code, typical use case
thread = ABT_pool_pop(pool1);
// If this thread yields, this thread will be pushed to pool2, not pool1.
ABT_thread_set_associated_pool(thread, pool2);
ABT_self_schedule(thread);

@mdorier
Copy link
Contributor Author

mdorier commented Nov 20, 2022

Oh I see, the problem is that an Argobots developer could change something in the implementation that would change the behavior, because that behavior is undocumented. So my code works fine with the current implementation of Argobots but may not in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants