Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rfa parallel #6595

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
1 change: 0 additions & 1 deletion .github/workflows/macos_debug_fetch_hwloc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ jobs:
run: |
brew install --overwrite python-tk && \
brew install --overwrite boost gperftools ninja autoconf automake && \
autoreconf -f -i \
brew upgrade cmake
- name: Configure
shell: bash
Expand Down
3 changes: 3 additions & 0 deletions libs/core/algorithms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ set(algorithms_headers
hpx/parallel/algorithms/detail/parallel_stable_sort.hpp
hpx/parallel/algorithms/detail/pivot.hpp
hpx/parallel/algorithms/detail/reduce.hpp
hpx/parallel/algorithms/detail/reduce_deterministic.hpp
hpx/parallel/algorithms/detail/replace.hpp
hpx/parallel/algorithms/detail/rfa.hpp
hpx/parallel/algorithms/detail/rotate.hpp
hpx/parallel/algorithms/detail/sample_sort.hpp
hpx/parallel/algorithms/detail/search.hpp
Expand Down Expand Up @@ -72,6 +74,7 @@ set(algorithms_headers
hpx/parallel/algorithms/partition.hpp
hpx/parallel/algorithms/reduce_by_key.hpp
hpx/parallel/algorithms/reduce.hpp
hpx/parallel/algorithms/reduce_deterministic.hpp
hpx/parallel/algorithms/remove_copy.hpp
hpx/parallel/algorithms/remove.hpp
hpx/parallel/algorithms/replace.hpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright (c) 2024 Shreyas Atre
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <hpx/config.hpp>
#include <hpx/functional/detail/tag_fallback_invoke.hpp>
#include <hpx/functional/invoke.hpp>
#include <hpx/parallel/algorithms/detail/rfa.hpp>
#include <hpx/parallel/util/loop.hpp>

#include <cstddef>
#include <cstring>
#include <limits>
#include <type_traits>
#include <utility>
#include "rfa.hpp"

namespace hpx::parallel::detail {

template <typename ExPolicy>
struct sequential_reduce_deterministic_t final
: hpx::functional::detail::tag_fallback<
sequential_reduce_deterministic_t<ExPolicy>>
{
private:
template <typename InIterB, typename InIterE, typename T,
typename Reduce>
friend constexpr T tag_fallback_invoke(
sequential_reduce_deterministic_t, ExPolicy&&, InIterB first,
InIterE last, T init, Reduce&& r)
{
/// TODO: Put constraint on Reduce to be a binary plus operator
(void) r;
hpx::parallel::detail::rfa::RFA_bins<T> bins;
bins.initialize_bins();
std::memcpy(rfa::__rfa_bin_host_buffer__, &bins, sizeof(bins));

hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<T> rfa;
rfa.set_max_abs_val(init);
rfa.unsafe_add(init);
rfa.renorm();
size_t count = 0;
T max_val = std::abs(*first);
for (auto e = first; e != last; ++e)
{
T temp_max_val = std::abs(static_cast<T>(*e));
if (max_val < temp_max_val)
{
rfa.set_max_abs_val(temp_max_val);
max_val = temp_max_val;
}
rfa.unsafe_add(*e);
count++;
if (count == rfa.endurance())
{
rfa.renorm();
count = 0;
}
}
return rfa.conv();
}
};

template <typename ExPolicy>
struct sequential_reduce_deterministic_rfa_t final
: hpx::functional::detail::tag_fallback<
sequential_reduce_deterministic_rfa_t<ExPolicy>>
{
private:
template <typename InIterB, typename T>
friend constexpr hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T>
tag_fallback_invoke(sequential_reduce_deterministic_rfa_t,
ExPolicy&&, InIterB first, std::size_t partition_size, T init,
std::true_type&&)
{
hpx::parallel::detail::rfa::RFA_bins<T> bins;
bins.initialize_bins();
std::memcpy(rfa::__rfa_bin_host_buffer__, &bins, sizeof(bins));

hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<T> rfa;
rfa.set_max_abs_val(init);
rfa.unsafe_add(init);
rfa.renorm();
size_t count = 0;
T max_val = std::abs(*first);
std::size_t partition_size_lim = 0;
for (auto e = first; partition_size_lim <= partition_size;
partition_size_lim++, e++)
{
T temp_max_val = std::abs(static_cast<T>(*e));
if (max_val < temp_max_val)
{
rfa.set_max_abs_val(temp_max_val);
max_val = temp_max_val;
}
rfa.unsafe_add(*e);
count++;
if (count == rfa.endurance())
{
rfa.renorm();
count = 0;
}
}
return rfa;
}

template <typename InIterB, typename T>
friend constexpr T tag_fallback_invoke(
sequential_reduce_deterministic_rfa_t, ExPolicy&&, InIterB first,
std::size_t partition_size, T init, std::false_type&&)
{
hpx::parallel::detail::rfa::RFA_bins<typename T::ftype> bins;
bins.initialize_bins();
std::memcpy(rfa::__rfa_bin_host_buffer__, &bins, sizeof(bins));

T rfa;
rfa += init;
std::size_t partition_size_lim = 0;
for (auto e = first; partition_size_lim <= partition_size;
partition_size_lim++, e++)
{
rfa += (*e);
}
return rfa;
}
};

#if !defined(HPX_COMPUTE_DEVICE_CODE)
template <typename ExPolicy>
inline constexpr sequential_reduce_deterministic_t<ExPolicy>
sequential_reduce_deterministic =
sequential_reduce_deterministic_t<ExPolicy>{};
#else
template <typename ExPolicy, typename... Args>
HPX_HOST_DEVICE HPX_FORCEINLINE auto sequential_reduce_deterministic(
Args&&... args)
{
return sequential_reduce_deterministic_t<ExPolicy>{}(
std::forward<Args>(args)...);
}
#endif

#if !defined(HPX_COMPUTE_DEVICE_CODE)
template <typename ExPolicy>
inline constexpr sequential_reduce_deterministic_rfa_t<ExPolicy>
sequential_reduce_deterministic_rfa =
sequential_reduce_deterministic_rfa_t<ExPolicy>{};
#else
template <typename ExPolicy, typename... Args>
HPX_HOST_DEVICE HPX_FORCEINLINE auto sequential_reduce_deterministic_rfa(
Args&&... args)
{
return sequential_reduce_deterministic_rfa_t<ExPolicy>{}(
std::forward<Args>(args)...);
}
#endif
} // namespace hpx::parallel::detail
Loading
Loading