Skip to content

Commit

Permalink
Attempting to fix problems in barrier causing hangs
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed May 22, 2023
1 parent a3ae0d9 commit f4ae7e9
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 17 deletions.
73 changes: 61 additions & 12 deletions libs/core/synchronization/include/hpx/synchronization/barrier.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2022 Hartmut Kaiser
// Copyright (c) 2007-2023 Hartmut Kaiser
// Copyright (c) 2016 Thomas Heller
//
// SPDX-License-Identifier: BSL-1.0
Expand All @@ -11,13 +11,16 @@

#include <hpx/config.hpp>
#include <hpx/assert.hpp>
#include <hpx/modules/memory.hpp>
#include <hpx/synchronization/detail/condition_variable.hpp>
#include <hpx/synchronization/spinlock.hpp>
#include <hpx/thread_support/assert_owns_lock.hpp>
#include <hpx/thread_support/atomic_count.hpp>

#include <climits>
#include <cstddef>
#include <cstdint>
#include <limits>
#include <utility>

#include <hpx/config/warnings_prefix.hpp>
Expand All @@ -30,7 +33,39 @@ namespace hpx {

struct empty_oncompletion
{
inline constexpr void operator()() const noexcept {}
constexpr void operator()() const noexcept {}
};

struct barrier_data;

HPX_CORE_EXPORT void intrusive_ptr_add_ref(barrier_data* p) noexcept;
HPX_CORE_EXPORT void intrusive_ptr_release(barrier_data* p) noexcept;

struct barrier_data
{
using mutex_type = hpx::spinlock;

barrier_data() noexcept
: count_(1)
{
}

barrier_data(barrier_data const&) = delete;
barrier_data(barrier_data&&) = delete;
barrier_data& operator=(barrier_data const&) = delete;
barrier_data& operator=(barrier_data&&) = delete;

~barrier_data() = default;

mutable mutex_type mtx_;

private:
friend HPX_CORE_EXPORT void intrusive_ptr_add_ref(
barrier_data*) noexcept;
friend HPX_CORE_EXPORT void intrusive_ptr_release(
barrier_data*) noexcept;

hpx::util::atomic_count count_;
};
} // namespace detail
/// \endcond
Expand Down Expand Up @@ -98,7 +133,10 @@ namespace hpx {
{
public:
/// \cond NOINTERNAL
HPX_NON_COPYABLE(barrier);
barrier(barrier const&) = delete;
barrier(barrier&&) = delete;
barrier& operator=(barrier const&) = delete;
barrier& operator=(barrier&&) = delete;
/// \endcond

private:
Expand Down Expand Up @@ -127,7 +165,8 @@ namespace hpx {
/// constructor.
constexpr explicit barrier(
std::ptrdiff_t expected, OnCompletion completion = OnCompletion())
: expected_(expected)
: mtx_(new detail::barrier_data(), false)
, expected_(expected)
, arrived_(expected)
, completion_(HPX_MOVE(completion))
, phase_(false)
Expand All @@ -138,6 +177,8 @@ namespace hpx {
// clang-format on
}

~barrier() = default;

private:
/// \cond NOINTERNAL
[[nodiscard]] arrival_token arrive_locked(
Expand Down Expand Up @@ -184,7 +225,8 @@ namespace hpx {
/// to start.- end note]
[[nodiscard]] arrival_token arrive(std::ptrdiff_t update = 1)
{
std::unique_lock<mutex_type> l(mtx_);
auto const mtx = mtx_; // keep alive
std::unique_lock<mutex_type> l(mtx->mtx_);
return arrive_locked(l, update);
}

Expand All @@ -205,18 +247,19 @@ namespace hpx {
/// types ([thread.mutex.requirements.mutex]).
void wait(arrival_token&& old_phase) const
{
std::unique_lock<mutex_type> l(mtx_);
auto const mtx = mtx_; // keep alive
std::unique_lock<mutex_type> l(mtx->mtx_);
if (phase_ == old_phase)
{
cond_.wait(l, "barrier::wait");
HPX_ASSERT_LOCKED(l, phase_ != old_phase);
}
}

/// Effects: Equivalent to: wait(arrive()).
void arrive_and_wait()
{
std::unique_lock<mutex_type> l(mtx_);
auto const mtx = mtx_; // keep alive
std::unique_lock<mutex_type> l(mtx->mtx_);
arrival_token const old_phase = arrive_locked(l, 1);
if (phase_ == old_phase)
{
Expand All @@ -243,14 +286,15 @@ namespace hpx {
/// step for the current phase to start.- end note]
void arrive_and_drop()
{
std::unique_lock<mutex_type> l(mtx_);
auto const mtx = mtx_; // keep alive
std::unique_lock<mutex_type> l(mtx->mtx_);
HPX_ASSERT_LOCKED(l, expected_ > 0);
--expected_;
[[maybe_unused]] bool result = arrive_locked(l, 1);
}

private:
mutable mutex_type mtx_;
hpx::intrusive_ptr<detail::barrier_data> mtx_;
mutable hpx::lcos::local::detail::condition_variable cond_;

std::ptrdiff_t expected_;
Expand All @@ -273,8 +317,7 @@ namespace hpx {
// allowing to synchronize a given number of \a threads.
class HPX_CORE_EXPORT barrier
{
private:
typedef hpx::spinlock mutex_type;
using mutex_type = hpx::spinlock;

static constexpr std::size_t barrier_flag =
static_cast<std::size_t>(1)
Expand All @@ -286,6 +329,12 @@ namespace hpx {
// \param expected The number of participating threads
//
explicit barrier(std::size_t expected);

barrier(barrier const&) = delete;
barrier(barrier&&) = delete;
barrier& operator=(barrier const&) = delete;
barrier& operator=(barrier&&) = delete;

~barrier();

// The function \a wait will block the number of entering \a threads
Expand Down
28 changes: 23 additions & 5 deletions libs/core/synchronization/src/local_barrier.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2013 Hartmut Kaiser
// Copyright (c) 2007-2023 Hartmut Kaiser
// Copyright (c) 2016 Thomas Heller
//
// SPDX-License-Identifier: BSL-1.0
Expand All @@ -12,9 +12,27 @@
#include <utility>

///////////////////////////////////////////////////////////////////////////////
namespace hpx { namespace lcos { namespace local {
barrier::barrier(std::size_t number_of_threads)
: number_of_threads_(number_of_threads)
namespace hpx::detail {

void intrusive_ptr_add_ref(barrier_data* p) noexcept
{
++p->count_;
}

void intrusive_ptr_release(barrier_data* p) noexcept
{
if (0 == --p->count_)
{
delete p;
}
}
} // namespace hpx::detail

///////////////////////////////////////////////////////////////////////////////
namespace hpx::lcos::local {

barrier::barrier(std::size_t expected)
: number_of_threads_(expected)
, total_(barrier_flag)
, mtx_()
, cond_()
Expand Down Expand Up @@ -82,4 +100,4 @@ namespace hpx { namespace lcos { namespace local {
this->number_of_threads_ = number_of_threads;
}

}}} // namespace hpx::lcos::local
} // namespace hpx::lcos::local

0 comments on commit f4ae7e9

Please sign in to comment.