From c205a13231f4af20fa33eae35c89852e5029c1c6 Mon Sep 17 00:00:00 2001 From: Shreyas Atre Date: Wed, 1 Jan 2025 21:36:17 +0530 Subject: [PATCH] Fix parallel deterministic reduce and add benchmarks Signed-off-by: Shreyas Atre --- .../detail/reduce_deterministic.hpp | 79 +++++++++++++++++++ .../algorithms/reduce_deterministic.hpp | 24 +++--- .../benchmark_reduce_deterministic.cpp | 21 +++-- 3 files changed, 108 insertions(+), 16 deletions(-) diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/detail/reduce_deterministic.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/detail/reduce_deterministic.hpp index b3773088917..ffce8febaae 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/detail/reduce_deterministic.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/detail/reduce_deterministic.hpp @@ -65,6 +65,71 @@ namespace hpx::parallel::detail { } }; + template + struct sequential_reduce_deterministic_rfa_t final + : hpx::functional::detail::tag_fallback< + sequential_reduce_deterministic_rfa_t> + { + private: + template + friend constexpr hpx::parallel::detail::rfa:: + ReproducibleFloatingAccumulator + tag_fallback_invoke(sequential_reduce_deterministic_rfa_t, + ExPolicy&&, InIterB first, std::size_t partition_size, T init, + std::true_type&&) + { + hpx::parallel::detail::rfa::RFA_bins bins; + bins.initialize_bins(); + std::memcpy(rfa::__rfa_bin_host_buffer__, &bins, sizeof(bins)); + + hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator rfa; + rfa.set_max_abs_val(init); + rfa.unsafe_add(init); + rfa.renorm(); + size_t count = 0; + T max_val = std::abs(*first); + std::size_t partition_size_lim = 0; + for (auto e = first; partition_size_lim <= partition_size; + partition_size_lim++, e++) + { + T temp_max_val = std::abs(static_cast(*e)); + if (max_val < temp_max_val) + { + rfa.set_max_abs_val(temp_max_val); + max_val = temp_max_val; + } + rfa.unsafe_add(*e); + count++; + if (count == rfa.endurance()) + { + rfa.renorm(); + count = 0; + } + } + return rfa; + } + + template + friend constexpr T tag_fallback_invoke( + sequential_reduce_deterministic_rfa_t, ExPolicy&&, InIterB first, + std::size_t partition_size, T init, std::false_type&&) + { + hpx::parallel::detail::rfa::RFA_bins bins; + bins.initialize_bins(); + std::memcpy(rfa::__rfa_bin_host_buffer__, &bins, sizeof(bins)); + + T rfa; + rfa += init; + std::size_t partition_size_lim = 0; + for (auto e = first; partition_size_lim <= partition_size; + partition_size_lim++, e++) + { + rfa += (*e); + } + return rfa; + } + }; + #if !defined(HPX_COMPUTE_DEVICE_CODE) template inline constexpr sequential_reduce_deterministic_t @@ -80,4 +145,18 @@ namespace hpx::parallel::detail { } #endif +#if !defined(HPX_COMPUTE_DEVICE_CODE) + template + inline constexpr sequential_reduce_deterministic_rfa_t + sequential_reduce_deterministic_rfa = + sequential_reduce_deterministic_rfa_t{}; +#else + template + HPX_HOST_DEVICE HPX_FORCEINLINE auto sequential_reduce_deterministic_rfa( + Args&&... args) + { + return sequential_reduce_deterministic_rfa_t{}( + std::forward(args)...); + } +#endif } // namespace hpx::parallel::detail diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce_deterministic.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce_deterministic.hpp index 996865d519c..1e59ddba735 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce_deterministic.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce_deterministic.hpp @@ -407,35 +407,37 @@ namespace hpx::parallel { ExPolicy&& policy, FwdIterB first, FwdIterE last, T_&& init, Reduce&& r) { + (void)r; if (first == last) { return util::detail::algorithm_result::get( HPX_FORWARD(T_, init)); } - auto f1 = [r, policy]( - FwdIterB part_begin, std::size_t part_size) + auto f1 = [policy](FwdIterB part_begin, std::size_t part_size) -> hpx::parallel::detail::rfa:: ReproducibleFloatingAccumulator { - T val = *part_begin; + T_ val = *part_begin; return hpx::parallel::detail:: - sequential_reduce_deterministic( + sequential_reduce_deterministic_rfa( HPX_FORWARD(ExPolicy, policy), ++part_begin, - --part_size, HPX_MOVE(val), r); + --part_size, HPX_MOVE(val), + std::true_type{}); }; - return util::partitioner>::call(HPX_FORWARD(ExPolicy, policy), first, detail::distance(first, last), HPX_MOVE(f1), - hpx::unwrapping([init = HPX_FORWARD(T_, init), - r = HPX_FORWARD(Reduce, r), - policy](auto&& results) -> T { + hpx::unwrapping([policy](auto&& results) -> T_ { return hpx::parallel::detail:: - sequential_reduce_deterministic( + sequential_reduce_deterministic_rfa( HPX_FORWARD(ExPolicy, policy), hpx::util::begin(results), - hpx::util::size(results), init, r) + hpx::util::size(results), + hpx::parallel::detail::rfa:: + ReproducibleFloatingAccumulator{}, + std::false_type{}) .conv(); })); } diff --git a/libs/core/algorithms/tests/performance/benchmark_reduce_deterministic.cpp b/libs/core/algorithms/tests/performance/benchmark_reduce_deterministic.cpp index daaee2b1269..5a267dd6a63 100644 --- a/libs/core/algorithms/tests/performance/benchmark_reduce_deterministic.cpp +++ b/libs/core/algorithms/tests/performance/benchmark_reduce_deterministic.cpp @@ -5,6 +5,7 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) #include +#include #if !defined(HPX_COMPUTE_DEVICE_CODE) #include @@ -33,15 +34,15 @@ T get_rand(T LO = (std::numeric_limits::min)(), /////////////////////////////////////////////////////////////////////////////// -void bench_reduce_deterministic( +void bench_reduce_deterministic(const auto& policy, const auto& deterministic_shuffled, const auto& val_det, const auto& op) { // check if different type for deterministic and nondeeterministic // and same result auto r1_shuffled = - hpx::reduce_deterministic((std::begin(deterministic_shuffled)), - (std::end(deterministic_shuffled)), val_det, op); + hpx::reduce_deterministic(policy, std::begin(deterministic_shuffled), + std::end(deterministic_shuffled), val_det, op); HPX_UNUSED(r1_shuffled); } @@ -61,6 +62,7 @@ int hpx_main(hpx::program_options::variables_map& vm) std::srand(seed); auto test_count = vm["test_count"].as(); + std::size_t vector_size = vm["vector-size"].as(); hpx::util::perftests_init(vm); @@ -74,7 +76,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { using FloatTypeDeterministic = float; - std::size_t LEN = 10000; + std::size_t LEN = vector_size; constexpr FloatTypeDeterministic num_bounds_det = std::is_same_v ? 1000.0 : 1000000.0; @@ -113,7 +115,14 @@ int hpx_main(hpx::program_options::variables_map& vm) { hpx::util::perftests_report( "reduce deterministic", "seq", test_count, [&]() { - bench_reduce_deterministic( + bench_reduce_deterministic(hpx::execution::seq, + deterministic_shuffled, val_det, op); + }); + } + { + hpx::util::perftests_report( + "reduce deterministic", "par", test_count, [&]() { + bench_reduce_deterministic(hpx::execution::par, deterministic_shuffled, val_det, op); }); } @@ -135,6 +144,8 @@ int main(int argc, char* argv[]) cmdline.add_options() ("test_count", value()->default_value(100), "number of tests to be averaged") + ("vector-size", value()->default_value(1000000), + "number of elements to be reduced") ; // clang-format on