Skip to content

Commit

Permalink
Added parallel execution of rfa reduction summation
Browse files Browse the repository at this point in the history
Signed-off-by: Shreyas Atre <[email protected]>
  • Loading branch information
SAtacker committed Dec 10, 2024
1 parent 7f9a3c8 commit 6a99390
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <hpx/functional/invoke.hpp>
#include <hpx/parallel/algorithms/detail/rfa.hpp>
#include <hpx/parallel/util/loop.hpp>
#include <hpx/type_support/pack.hpp>

#include <cstddef>
#include <limits>
Expand Down Expand Up @@ -62,6 +63,84 @@ namespace hpx::parallel::detail {
}
};

template <typename ExPolicy>
struct sequential_reduce_deterministic_rfa_t final
: hpx::functional::detail::tag_fallback<
sequential_reduce_deterministic_rfa_t<ExPolicy>>
{
private:
template <typename InIterB, typename InIterE, typename T,
typename Reduce>
friend constexpr hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T>
tag_fallback_invoke(sequential_reduce_deterministic_rfa_t,
ExPolicy&&, InIterB first, InIterE last, T init, Reduce&& r)
{
hpx::parallel::detail::rfa::RFA_bins<T> bins;
bins.initialize_bins();
std::memcpy(rfa::bin_host_buffer, &bins, sizeof(bins));

hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<T> rfa;

for (auto e = first; e != last; ++e)
{
rfa += *e;
}
return rfa;
}

template <typename InIterB, typename T, typename Reduce>
friend constexpr hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T>
tag_fallback_invoke(sequential_reduce_deterministic_rfa_t,
ExPolicy&&, InIterB first, std::size_t size, T init, Reduce&& r)
{
hpx::parallel::detail::rfa::RFA_bins<T> bins;
bins.initialize_bins();
std::memcpy(rfa::bin_host_buffer, &bins, sizeof(bins));

hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<T> rfa;
auto e = first;
for (std::size_t i = 0; i < size; ++i, ++e)
{
rfa += *e;
}
return rfa;
}

// template <typename InIterB, typename InIterE, typename T,
// typename Reduce
// // typename = std::enable_if_t<hpx::util::contains<T,
// // hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<
// // float>,
// // hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<
// // double>>::value>
// >
// friend constexpr T tag_fallback_invoke(
// sequential_reduce_deterministic_rfa_t, ExPolicy&&, InIterB first,
// InIterE last, T init, Reduce&& r)
// {
// static_assert(hpx::util::contains<T,
// hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<
// float>,
// hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<
// double>>::value);
// hpx::parallel::detail::rfa::RFA_bins<T> bins;
// bins.initialize_bins();
// std::memcpy(rfa::bin_host_buffer, &bins, sizeof(bins));

// hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<T> rfa;
// rfa.set_max_abs_val(init);
// rfa.unsafe_add(init);
// rfa.renorm();
// for (auto e = first; e != last; ++e)
// {
// rfa += *e;
// }
// return rfa.conv();
// }
};

#if !defined(HPX_COMPUTE_DEVICE_CODE)
template <typename ExPolicy>
inline constexpr sequential_reduce_deterministic_t<ExPolicy>
Expand All @@ -77,4 +156,18 @@ namespace hpx::parallel::detail {
}
#endif

#if !defined(HPX_COMPUTE_DEVICE_CODE)
template <typename ExPolicy>
inline constexpr sequential_reduce_deterministic_rfa_t<ExPolicy>
sequential_reduce_deterministic_rfa =
sequential_reduce_deterministic_rfa_t<ExPolicy>{};
#else
template <typename ExPolicy, typename... Args>
HPX_HOST_DEVICE HPX_FORCEINLINE auto sequential_reduce_deterministic_rfa(
Args&&... args)
{
return sequential_reduce_deterministic_rfa_t<ExPolicy>{}(
std::forward<Args>(args)...);
}
#endif
} // namespace hpx::parallel::detail
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#pragma once

#include "detail/reduce_deterministic.hpp"
#if defined(DOXYGEN)

namespace hpx {
Expand Down Expand Up @@ -396,42 +397,49 @@ namespace hpx::parallel {
static constexpr T sequential(ExPolicy&& policy, InIterB first,
InIterE last, T_&& init, Reduce&& r)
{
// hpx::parallel::detail::sequential_reduce_deterministic_t<ExPolicy> seq;
// return seq(policy,first,last,0.0f,r);
return hpx::parallel::detail::sequential_reduce_deterministic<ExPolicy>(
HPX_FORWARD(ExPolicy, policy), first, last,
return hpx::parallel::detail::sequential_reduce_deterministic<
ExPolicy>(HPX_FORWARD(ExPolicy, policy), first, last,
HPX_FORWARD(T_, init), HPX_FORWARD(Reduce, r));
}

// template <typename ExPolicy, typename FwdIterB, typename FwdIterE,
// typename T_, typename Reduce>
// static util::detail::algorithm_result_t<ExPolicy, T> parallel(
// ExPolicy&& policy, FwdIterB first, FwdIterE last, T_&& init,
// Reduce&& r)
// {
// if (first == last)
// {
// return util::detail::algorithm_result<ExPolicy, T>::get(
// HPX_FORWARD(T_, init));
// }

// auto f1 = [r](FwdIterB part_begin, std::size_t part_size) -> T {
// T val = *part_begin;
// return detail::sequential_reduce<ExPolicy>(
// ++part_begin, --part_size, HPX_MOVE(val), r);
// };

// return util::partitioner<ExPolicy, T>::call(
// HPX_FORWARD(ExPolicy, policy), first,
// detail::distance(first, last), HPX_MOVE(f1),
// hpx::unwrapping(
// [init = HPX_FORWARD(T_, init),
// r = HPX_FORWARD(Reduce, r)](auto&& results) -> T {
// return detail::sequential_reduce<ExPolicy>(
// hpx::util::begin(results),
// hpx::util::size(results), init, r);
// }));
// }
template <typename ExPolicy, typename FwdIterB, typename FwdIterE,
typename T_, typename Reduce>
static util::detail::algorithm_result_t<ExPolicy, T> parallel(
ExPolicy&& policy, FwdIterB first, FwdIterE last, T_&& init,
Reduce&& r)
{
if (first == last)
{
return util::detail::algorithm_result<ExPolicy, T>::get(
HPX_FORWARD(T_, init));
}

auto f1 = [r, policy](
FwdIterB part_begin, std::size_t part_size)
-> hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T_> {
T val = *part_begin;
return hpx::parallel::detail::
sequential_reduce_deterministic_rfa<ExPolicy>(
HPX_FORWARD(ExPolicy, policy), ++part_begin,
--part_size, HPX_MOVE(val), r);
};

return util::partitioner<ExPolicy, T,
hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<
T_>>::call(HPX_FORWARD(ExPolicy, policy), first,
detail::distance(first, last), HPX_MOVE(f1),
hpx::unwrapping([init = HPX_FORWARD(T_, init),
r = HPX_FORWARD(Reduce, r),
policy](auto&& results) -> T {
return hpx::parallel::detail::
sequential_reduce_deterministic_rfa<ExPolicy>(
HPX_FORWARD(ExPolicy, policy),
hpx::util::begin(results),
hpx::util::size(results), init, r)
.conv();
}));
}
};
/// \endcond
} // namespace detail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,49 @@ void test_reduce1(IteratorTag)
HPX_TEST_EQ(r2, r3);
}

template <typename IteratorTag, typename FloatTypeDeterministic,
typename FloatTypeNonDeterministic, size_t LEN = 10007>
void test_reduce_parallel1(IteratorTag)
{
// check if different type for deterministic and nondeeterministic
// and same result i.e. correct computation
using base_iterator_det = std::vector<FloatTypeDeterministic>::iterator;
using iterator_det = test::test_iterator<base_iterator_det, IteratorTag>;

using base_iterator_ndet = std::vector<FloatTypeNonDeterministic>::iterator;
using iterator_ndet = test::test_iterator<base_iterator_ndet, IteratorTag>;

std::vector<FloatTypeDeterministic> deterministic(LEN);
std::vector<FloatTypeNonDeterministic> nondeterministic(LEN);

std::iota(
deterministic.begin(), deterministic.end(), FloatTypeDeterministic(0));

std::iota(nondeterministic.begin(), nondeterministic.end(),
FloatTypeNonDeterministic(0));

FloatTypeDeterministic val_det(0);
FloatTypeNonDeterministic val_non_det(0);
auto op = [](FloatTypeNonDeterministic v1, FloatTypeNonDeterministic v2) {
return v1 + v2;
};

FloatTypeDeterministic r1 = hpx::reduce_deterministic(hpx::execution::par,
iterator_det(std::begin(deterministic)),
iterator_det(std::end(deterministic)), val_det, op);

// verify values
// FloatTypeNonDeterministic r2 = hpx::reduce(hpx::execution::par,
// iterator_ndet(std::begin(nondeterministic)),
// iterator_ndet(std::end(nondeterministic)), val_non_det, op);

FloatTypeNonDeterministic r3 = std::accumulate(
nondeterministic.begin(), nondeterministic.end(), val_non_det);

HPX_TEST_EQ(r1, r3);
// HPX_TEST_EQ(r2, r3);
}

template <typename IteratorTag, typename FloatTypeDeterministic,
size_t LEN = 10007>
void test_reduce_determinism(IteratorTag)
Expand Down Expand Up @@ -178,6 +221,7 @@ void test_reduce1()
test_reduce1<IteratorTag, double, float, 1000>(IteratorTag());
test_reduce1<IteratorTag, float, double, 1000>(IteratorTag());
test_reduce1<IteratorTag, double, double, 1000>(IteratorTag());
test_reduce_parallel1<IteratorTag, float, float, 1000>(IteratorTag());
}

template <typename IteratorTag>
Expand Down

0 comments on commit 6a99390

Please sign in to comment.