Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 133 additions & 6 deletions hist/histv7/benchmark/hist_benchmark_atomic.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,45 @@ BENCHMARK_DEFINE_F(RHistAtomic_int, AtomicAdd)(benchmark::State &state)
}
BENCHMARK_REGISTER_F(RHistAtomic_int, AtomicAdd);

BENCHMARK_DEFINE_F(RHistAtomic_int, AtomicAddRelease)(benchmark::State &state)
{
for (auto _ : state) {
ROOT::Experimental::Internal::AtomicAddRelease(&fAtomic, 1);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RHistAtomic_int, AtomicAddRelease);

BENCHMARK_DEFINE_F(RHistAtomic_int, AtomicLoad)(benchmark::State &state)
{
int load;
for (auto _ : state) {
ROOT::Experimental::Internal::AtomicLoad(&fAtomic, &load);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RHistAtomic_int, AtomicLoad);

BENCHMARK_DEFINE_F(RHistAtomic_int, AtomicLoadAcquire)(benchmark::State &state)
{
int load;
for (auto _ : state) {
ROOT::Experimental::Internal::AtomicLoadAcquire(&fAtomic, &load);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RHistAtomic_int, AtomicLoadAcquire);

BENCHMARK_DEFINE_F(RHistAtomic_int, AtomicStoreRelease)(benchmark::State &state)
{
int store = 1;
for (auto _ : state) {
ROOT::Experimental::Internal::AtomicStoreRelease(&fAtomic, &store);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RHistAtomic_int, AtomicStoreRelease);

struct RHistAtomic_float : public benchmark::Fixture {
float fAtomic = 0;
};
Expand All @@ -60,6 +99,45 @@ BENCHMARK_DEFINE_F(RHistAtomic_float, AtomicAdd)(benchmark::State &state)
}
BENCHMARK_REGISTER_F(RHistAtomic_float, AtomicAdd);

BENCHMARK_DEFINE_F(RHistAtomic_float, AtomicAddRelease)(benchmark::State &state)
{
for (auto _ : state) {
ROOT::Experimental::Internal::AtomicAddRelease(&fAtomic, 1.0f);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RHistAtomic_float, AtomicAddRelease);

BENCHMARK_DEFINE_F(RHistAtomic_float, AtomicLoad)(benchmark::State &state)
{
float load;
for (auto _ : state) {
ROOT::Experimental::Internal::AtomicLoad(&fAtomic, &load);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RHistAtomic_float, AtomicLoad);

BENCHMARK_DEFINE_F(RHistAtomic_float, AtomicLoadAcquire)(benchmark::State &state)
{
float load;
for (auto _ : state) {
ROOT::Experimental::Internal::AtomicLoadAcquire(&fAtomic, &load);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RHistAtomic_float, AtomicLoadAcquire);

BENCHMARK_DEFINE_F(RHistAtomic_float, AtomicStoreRelease)(benchmark::State &state)
{
float store = 1.0f;
for (auto _ : state) {
ROOT::Experimental::Internal::AtomicStoreRelease(&fAtomic, &store);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RHistAtomic_float, AtomicStoreRelease);

struct RHistAtomic_double : public benchmark::Fixture {
double fAtomic = 0;
};
Expand All @@ -82,6 +160,45 @@ BENCHMARK_DEFINE_F(RHistAtomic_double, AtomicAdd)(benchmark::State &state)
}
BENCHMARK_REGISTER_F(RHistAtomic_double, AtomicAdd);

BENCHMARK_DEFINE_F(RHistAtomic_double, AtomicAddRelease)(benchmark::State &state)
{
for (auto _ : state) {
ROOT::Experimental::Internal::AtomicAddRelease(&fAtomic, 1.0);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RHistAtomic_double, AtomicAddRelease);

BENCHMARK_DEFINE_F(RHistAtomic_double, AtomicLoad)(benchmark::State &state)
{
double load;
for (auto _ : state) {
ROOT::Experimental::Internal::AtomicLoad(&fAtomic, &load);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RHistAtomic_double, AtomicLoad);

BENCHMARK_DEFINE_F(RHistAtomic_double, AtomicLoadAcquire)(benchmark::State &state)
{
double load;
for (auto _ : state) {
ROOT::Experimental::Internal::AtomicLoadAcquire(&fAtomic, &load);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RHistAtomic_double, AtomicLoadAcquire);

BENCHMARK_DEFINE_F(RHistAtomic_double, AtomicStoreRelease)(benchmark::State &state)
{
double store = 1.0;
for (auto _ : state) {
ROOT::Experimental::Internal::AtomicStoreRelease(&fAtomic, &store);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RHistAtomic_double, AtomicStoreRelease);

struct RBinWithError : public benchmark::Fixture {
ROOT::Experimental::RBinWithError fBin;
};
Expand All @@ -95,14 +212,14 @@ BENCHMARK_DEFINE_F(RBinWithError, Inc)(benchmark::State &state)
}
BENCHMARK_REGISTER_F(RBinWithError, Inc);

BENCHMARK_DEFINE_F(RBinWithError, AtomicInc)(benchmark::State &state)
BENCHMARK_DEFINE_F(RBinWithError, AtomicIncRelease)(benchmark::State &state)
{
for (auto _ : state) {
fBin.AtomicInc();
fBin.AtomicIncRelease();
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RBinWithError, AtomicInc);
BENCHMARK_REGISTER_F(RBinWithError, AtomicIncRelease);

BENCHMARK_DEFINE_F(RBinWithError, Add)(benchmark::State &state)
{
Expand All @@ -113,13 +230,23 @@ BENCHMARK_DEFINE_F(RBinWithError, Add)(benchmark::State &state)
}
BENCHMARK_REGISTER_F(RBinWithError, Add);

BENCHMARK_DEFINE_F(RBinWithError, AtomicAdd)(benchmark::State &state)
BENCHMARK_DEFINE_F(RBinWithError, AtomicAddRelease)(benchmark::State &state)
{
for (auto _ : state) {
fBin.AtomicAdd(1.0);
fBin.AtomicAddRelease(1.0);
benchmark::ClobberMemory();
}
}
BENCHMARK_REGISTER_F(RBinWithError, AtomicAdd);
BENCHMARK_REGISTER_F(RBinWithError, AtomicAddRelease);

BENCHMARK_DEFINE_F(RBinWithError, AtomicLoad)(benchmark::State &state)
{
ROOT::Experimental::RBinWithError load;
for (auto _ : state) {
fBin.AtomicLoad(&load);
benchmark::DoNotOptimize(load);
}
}
BENCHMARK_REGISTER_F(RBinWithError, AtomicLoad);

BENCHMARK_MAIN();
53 changes: 48 additions & 5 deletions hist/histv7/inc/ROOT/RBinWithError.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "RHistUtils.hxx"

#include <cmath>
#include <cstring>

namespace ROOT {
namespace Experimental {
Expand Down Expand Up @@ -74,29 +75,71 @@ private:
Internal::AtomicLoad(&fSum2, &origSum2);
}

// The variable appears to be unlocked, confirm with a compare-exchange.
// The variable appears to be unlocked, confirm with a compare-exchange. It uses acquire memory order in case
// of success, so the release store synchronizes with this load. This ensures that the previous write of fSum
// becomes a visible side-effect in this thread and the access below will see it.
double negated = std::copysign(origSum2, -1.0);
if (Internal::AtomicCompareExchangeAcquire(&fSum2, &origSum2, &negated)) {
break;
}
}

// By using a spin lock, we do not need atomic operations for fSum.
fSum += a;
// By using a spin lock, we would not need atomic operations for fSum. However, we must use a release store to
// guarantee correctness of AtomicLoad.
double sum;
Internal::AtomicLoad(&fSum, &sum);
sum += a;
Internal::AtomicStoreRelease(&fSum, &sum);

double sum2 = origSum2 + a2;
// This store synchronizes with the compare-exchange, see above.
Internal::AtomicStoreRelease(&fSum2, &sum2);
}

public:
void AtomicInc() { AtomicAdd(1.0, 1.0); }
void AtomicIncRelease() { AtomicAdd(1.0, 1.0); }

void AtomicAdd(double w) { AtomicAdd(w, w * w); }
void AtomicAddRelease(double w) { AtomicAdd(w, w * w); }

/// Add another bin content using atomic instructions.
///
/// \param[in] rhs another bin content that must not be modified during the operation
void AtomicAdd(const RBinWithError &rhs) { AtomicAdd(rhs.fSum, rhs.fSum2); }

void AtomicLoad(RBinWithError *ret) const
{
double origSum2;
Internal::AtomicLoadAcquire(&fSum2, &origSum2);

while (true) {
// Repeat loads from memory until we see a non-negative value.
// NB: do not use origSum2 < 0, it does not work for -0.0!
while (std::signbit(origSum2)) {
Internal::AtomicLoadAcquire(&fSum2, &origSum2);
}

// The release store to fSum2 at the end of AtomicAdd synchronizes with the last acquire load above. This
// ensures that the previous write of fSum becomes a visible side-effect in this thread and the atomic load
// below will see it.

Internal::AtomicLoadAcquire(&fSum, &ret->fSum);

// In the reverse direction, the release store of fSum synchronizes with the acquire load above. This ensures
// that the previous write of fSum2 becomes a visible side-effect and we would notice if the spinlock is taken.

// NB: an acquire load is required here in case the comparison fails below and we repeat the outer loop.
Internal::AtomicLoadAcquire(&fSum2, &ret->fSum2);

// Check if the value of fSum2 is still identical to the first load.
// NB: do not use origSum2 == ret->fSum2, it is problematic because 0.0 == -0.0!
if (!std::memcmp(&origSum2, &ret->fSum2, sizeof(origSum2))) {
return;
}

// The comparison failed, an update happened or is in progress.
origSum2 = ret->fSum2;
}
}
};

} // namespace Experimental
Expand Down
51 changes: 47 additions & 4 deletions hist/histv7/inc/ROOT/RHistEngine.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
#include "RWeight.hxx"

#include <array>
#include <atomic>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <initializer_list>
#include <stdexcept>
#include <tuple>
Expand Down Expand Up @@ -500,7 +502,7 @@ public:
RLinearizedIndex index = fAxes.ComputeGlobalIndexImpl<sizeof...(A)>(args);
if (index.fValid) {
assert(index.fIndex < fBinContents.size());
Internal::AtomicInc(&fBinContents[index.fIndex]);
Internal::AtomicIncRelease(&fBinContents[index.fIndex]);
}
}

Expand All @@ -524,7 +526,7 @@ public:
RLinearizedIndex index = fAxes.ComputeGlobalIndexImpl<sizeof...(A)>(args);
if (index.fValid) {
assert(index.fIndex < fBinContents.size());
Internal::AtomicAdd(&fBinContents[index.fIndex], weight.fValue);
Internal::AtomicAddRelease(&fBinContents[index.fIndex], weight.fValue);
}
}

Expand All @@ -549,7 +551,7 @@ public:
RLinearizedIndex index = fAxes.ComputeGlobalIndexImpl<sizeof...(A)>(args);
if (index.fValid) {
assert(index.fIndex < fBinContents.size());
Internal::AtomicAdd(&fBinContents[index.fIndex], weight);
Internal::AtomicAddRelease(&fBinContents[index.fIndex], weight);
}
}

Expand All @@ -573,7 +575,7 @@ public:
RLinearizedIndex index = fAxes.ComputeGlobalIndexImpl<N>(t);
if (index.fValid) {
assert(index.fIndex < fBinContents.size());
Internal::AtomicAdd(&fBinContents[index.fIndex], weight.fValue);
Internal::AtomicAddRelease(&fBinContents[index.fIndex], weight.fValue);
}
} else {
FillAtomic(t);
Expand Down Expand Up @@ -812,6 +814,47 @@ public:
return Slice(sliceSpecs);
}

/// Create an atomic snapshot of this histogram engine.
///
/// A snapshot is a consistent copy of the histogram, during concurrent filling. It is guaranteed that the returned
/// copy represents a state between the begin and end of the snapshot operation.
///
/// Snapshotting a histogram engine with many bins can be an expensive operation.
///
/// \return the atomic snapshot
RHistEngine SnapshotAtomic() const
{
static_assert(std::is_trivially_copyable_v<BinContentType>,
"snapshotting requires a trivially copyable bin content type");

RHistEngine snapshot(fAxes.Get());
// Do a first collect.
for (std::size_t i = 0; i < fBinContents.size(); i++) {
Internal::AtomicLoad(&fBinContents[i], &snapshot.fBinContents[i]);
}

// Now do another collect. If no change is detected, the snapshot is consistent. Otherwise update the bin contents
// and try again.
BinContentType tmp;
bool changed;
do {
// To guarantee correctness, we let the release operation(s) in FillAtomic synchronize with this acquire fence.
// This ensures that all previous writes become visible side-effects and the atomic loads will see them.
std::atomic_thread_fence(std::memory_order_acquire);

changed = false;
for (std::size_t i = 0; i < fBinContents.size(); i++) {
Internal::AtomicLoad(&fBinContents[i], &tmp);
if (std::memcmp(&tmp, &snapshot.fBinContents[i], sizeof(BinContentType))) {
std::memcpy(&snapshot.fBinContents[i], &tmp, sizeof(BinContentType));
changed = true;
}
}
} while (changed);

return snapshot;
}

/// \}

/// %ROOT Streamer function to throw when trying to store an object of this class.
Expand Down
Loading
Loading