diff --git a/libs/core/coroutines/include/hpx/coroutines/thread_enums.hpp b/libs/core/coroutines/include/hpx/coroutines/thread_enums.hpp index 7abf67d5c3d6..3e8ec091b69d 100644 --- a/libs/core/coroutines/include/hpx/coroutines/thread_enums.hpp +++ b/libs/core/coroutines/include/hpx/coroutines/thread_enums.hpp @@ -206,7 +206,7 @@ namespace hpx::threads { /// local thread number associated with this hint. Local thread numbers /// are indexed from zero. It is up to the scheduler to decide how to /// interpret thread numbers that are larger than the number of threads - /// available to the scheduler. Typically thread numbers will wrap + /// available to the scheduler. Typically, thread numbers will wrap /// around when too large. thread = 1, @@ -214,7 +214,7 @@ namespace hpx::threads { /// NUMA domain associated with this hint. NUMA domains are indexed from /// zero. It is up to the scheduler to decide how to interpret NUMA /// domain indices that are larger than the number of available NUMA - /// domains to the scheduler. Typically indices will wrap around when + /// domains to the scheduler. Typically, indices will wrap around when /// too large. numa = 2, }; @@ -295,7 +295,7 @@ namespace hpx::threads { } /////////////////////////////////////////////////////////////////////////// - /// \enum thread_placement_hint + /// \enum thread_execution_hint /// /// The type of hint given to the scheduler related running a thread as a /// child directly in the context of the parent thread diff --git a/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp b/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp index c514f8abb227..1b37e340f974 100644 --- a/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp +++ b/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp @@ -65,7 +65,8 @@ namespace hpx::parallel::execution::detail { template void do_work_chunk(F&& f, Ts&& ts, std::uint32_t const index) const { -#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX) +#if defined(HPX_HAVE_ITTNOTIFY) && HPX_HAVE_ITTNOTIFY != 0 && \ + !defined(HPX_HAVE_APEX) static hpx::util::itt::event notify_event( "set_value_loop_visitor_static::do_work_chunk(chunking)"); @@ -150,7 +151,7 @@ namespace hpx::parallel::execution::detail { // Finish the work for one worker thread. If this is not the last worker // thread to finish, it will only decrement the counter. If it is the // last thread it will call set_exception if there is an exception. - // Otherwise it will call set_value on the shared state. + // Otherwise, it will call set_value on the shared state. void finish() const { if (--(state->tasks_remaining.data_) == 0) @@ -438,17 +439,21 @@ namespace hpx::parallel::execution::detail { // Initialize the queues for all worker threads so that worker // threads can start stealing immediately when they start. - for (std::uint32_t worker_thread = 0; worker_thread != num_threads; - ++worker_thread) + if (hint.placement_mode() == placement::breadth_first || + hint.placement_mode() == placement::breadth_first_reverse) { - if (hint.placement_mode() == placement::breadth_first || - hint.placement_mode() == placement::breadth_first_reverse) + for (std::uint32_t worker_thread = 0; + worker_thread != num_threads; ++worker_thread) { init_queue_breadth_first(worker_thread, num_chunks); } - else + } + else + { + // the default for this executor is depth-first placement + for (std::uint32_t worker_thread = 0; + worker_thread != num_threads; ++worker_thread) { - // the default for this executor is depth-first placement init_queue_depth_first(worker_thread, num_chunks); } } @@ -546,8 +551,8 @@ namespace hpx::parallel::execution::detail { auto launch_data = generate_launch_data(); std::size_t const size = launch_data.size(); - // Do straight spawning if hierarchical spawning was disabled or we - // have less chunks than our threshold. + // Do straight spawning if hierarchical spawning was disabled or if + // we have less chunks than our threshold. if (hierarchical_threshold == 0 || hierarchical_threshold >= size) { for (std::size_t i = 0; i != size; ++i) @@ -558,36 +563,50 @@ namespace hpx::parallel::execution::detail { return; } - auto task = [desc, pool, launch_data](auto b, auto e) { - for (std::size_t i = b; i != e - 1; ++i) + auto task = [desc, pool, launch_data = HPX_MOVE(launch_data)]( + auto b, auto e) { + HPX_ASSERT(b != e); + for (std::size_t i = b + 1; i != e; ++i) { auto state = launch_data[i].func.state; state->template do_work_task(desc, pool, - launch_data[i].bind_to_core, launch_data[i].func); + launch_data[i].bind_to_core, + HPX_MOVE(launch_data[i].func)); } - // directly execute last task - auto state = launch_data[e - 1].func.state; + // directly execute first task + auto state = launch_data[b].func.state; state->template do_work_task( - desc, pool, false, launch_data[e - 1].func); + desc, pool, false, HPX_MOVE(launch_data[b].func)); }; // run task on small stack auto post_policy = hpx::execution::experimental::with_stacksize( policy, threads::thread_stacksize::small_); + auto post_policy_hint = + hpx::execution::experimental::get_hint(post_policy); + post_policy_hint.mode = + hpx::threads::thread_schedule_hint_mode::thread; + std::size_t start = 0; - while (true) + while (start < size) { + // place the helper thread on the first core of the thread block + post_policy_hint.hint = + first_thread + static_cast(start); + auto core_policy = hpx::execution::experimental::with_hint( + post_policy, post_policy_hint); + auto const stop = start + hierarchical_threshold; if (stop > size) { hpx::detail::post_policy_dispatch::call( - post_policy, desc, pool, HPX_MOVE(task), start, size); + core_policy, desc, pool, HPX_MOVE(task), start, size); break; } hpx::detail::post_policy_dispatch::call( - post_policy, desc, pool, task, start, stop); + core_policy, desc, pool, task, start, stop); start = stop; } } diff --git a/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp b/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp index 63bc1c0efbf0..ad3effaf822d 100644 --- a/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp +++ b/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp @@ -131,7 +131,8 @@ namespace hpx::execution::experimental::detail { template void do_work_chunk(Ts& ts, std::uint32_t const index) const { -#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX) +#if defined(HPX_HAVE_ITTNOTIFY) && HPX_HAVE_ITTNOTIFY != 0 && \ + !defined(HPX_HAVE_APEX) static hpx::util::itt::event notify_event( "set_value_loop_visitor_static::do_work_chunk(chunking)"); @@ -145,7 +146,7 @@ namespace hpx::execution::experimental::detail { (std::min)(i_begin + task_f->chunk_size, task_f->size); auto it = std::next(hpx::util::begin(op_state->shape), i_begin); - for (std::uint32_t i = i_begin; i != i_end; (void) ++it, ++i) + for (std::size_t i = i_begin; i != i_end; (void) ++it, ++i) { bulk_scheduler_invoke_helper( index_pack_type{}, op_state->f, *it, ts); @@ -274,7 +275,7 @@ namespace hpx::execution::experimental::detail { // Finish the work for one worker thread. If this is not the last worker // thread to finish, it will only decrement the counter. If it is the // last thread it will call set_error if there is an exception. - // Otherwise it will call set_value on the connected receiver. + // Otherwise, it will call set_value on the connected receiver. void finish() const { if (--(op_state->tasks_remaining.data_) == 0)