diff --git a/hist/histv7/benchmark/hist_benchmark_atomic.cxx b/hist/histv7/benchmark/hist_benchmark_atomic.cxx index e18fa0016630e..0196e663836fe 100644 --- a/hist/histv7/benchmark/hist_benchmark_atomic.cxx +++ b/hist/histv7/benchmark/hist_benchmark_atomic.cxx @@ -38,6 +38,45 @@ BENCHMARK_DEFINE_F(RHistAtomic_int, AtomicAdd)(benchmark::State &state) } BENCHMARK_REGISTER_F(RHistAtomic_int, AtomicAdd); +BENCHMARK_DEFINE_F(RHistAtomic_int, AtomicAddRelease)(benchmark::State &state) +{ + for (auto _ : state) { + ROOT::Experimental::Internal::AtomicAddRelease(&fAtomic, 1); + benchmark::ClobberMemory(); + } +} +BENCHMARK_REGISTER_F(RHistAtomic_int, AtomicAddRelease); + +BENCHMARK_DEFINE_F(RHistAtomic_int, AtomicLoad)(benchmark::State &state) +{ + int load; + for (auto _ : state) { + ROOT::Experimental::Internal::AtomicLoad(&fAtomic, &load); + benchmark::ClobberMemory(); + } +} +BENCHMARK_REGISTER_F(RHistAtomic_int, AtomicLoad); + +BENCHMARK_DEFINE_F(RHistAtomic_int, AtomicLoadAcquire)(benchmark::State &state) +{ + int load; + for (auto _ : state) { + ROOT::Experimental::Internal::AtomicLoadAcquire(&fAtomic, &load); + benchmark::ClobberMemory(); + } +} +BENCHMARK_REGISTER_F(RHistAtomic_int, AtomicLoadAcquire); + +BENCHMARK_DEFINE_F(RHistAtomic_int, AtomicStoreRelease)(benchmark::State &state) +{ + int store = 1; + for (auto _ : state) { + ROOT::Experimental::Internal::AtomicStoreRelease(&fAtomic, &store); + benchmark::ClobberMemory(); + } +} +BENCHMARK_REGISTER_F(RHistAtomic_int, AtomicStoreRelease); + struct RHistAtomic_float : public benchmark::Fixture { float fAtomic = 0; }; @@ -60,6 +99,45 @@ BENCHMARK_DEFINE_F(RHistAtomic_float, AtomicAdd)(benchmark::State &state) } BENCHMARK_REGISTER_F(RHistAtomic_float, AtomicAdd); +BENCHMARK_DEFINE_F(RHistAtomic_float, AtomicAddRelease)(benchmark::State &state) +{ + for (auto _ : state) { + ROOT::Experimental::Internal::AtomicAddRelease(&fAtomic, 1.0f); + benchmark::ClobberMemory(); + } +} +BENCHMARK_REGISTER_F(RHistAtomic_float, AtomicAddRelease); + +BENCHMARK_DEFINE_F(RHistAtomic_float, AtomicLoad)(benchmark::State &state) +{ + float load; + for (auto _ : state) { + ROOT::Experimental::Internal::AtomicLoad(&fAtomic, &load); + benchmark::ClobberMemory(); + } +} +BENCHMARK_REGISTER_F(RHistAtomic_float, AtomicLoad); + +BENCHMARK_DEFINE_F(RHistAtomic_float, AtomicLoadAcquire)(benchmark::State &state) +{ + float load; + for (auto _ : state) { + ROOT::Experimental::Internal::AtomicLoadAcquire(&fAtomic, &load); + benchmark::ClobberMemory(); + } +} +BENCHMARK_REGISTER_F(RHistAtomic_float, AtomicLoadAcquire); + +BENCHMARK_DEFINE_F(RHistAtomic_float, AtomicStoreRelease)(benchmark::State &state) +{ + float store = 1.0f; + for (auto _ : state) { + ROOT::Experimental::Internal::AtomicStoreRelease(&fAtomic, &store); + benchmark::ClobberMemory(); + } +} +BENCHMARK_REGISTER_F(RHistAtomic_float, AtomicStoreRelease); + struct RHistAtomic_double : public benchmark::Fixture { double fAtomic = 0; }; @@ -82,6 +160,45 @@ BENCHMARK_DEFINE_F(RHistAtomic_double, AtomicAdd)(benchmark::State &state) } BENCHMARK_REGISTER_F(RHistAtomic_double, AtomicAdd); +BENCHMARK_DEFINE_F(RHistAtomic_double, AtomicAddRelease)(benchmark::State &state) +{ + for (auto _ : state) { + ROOT::Experimental::Internal::AtomicAddRelease(&fAtomic, 1.0); + benchmark::ClobberMemory(); + } +} +BENCHMARK_REGISTER_F(RHistAtomic_double, AtomicAddRelease); + +BENCHMARK_DEFINE_F(RHistAtomic_double, AtomicLoad)(benchmark::State &state) +{ + double load; + for (auto _ : state) { + ROOT::Experimental::Internal::AtomicLoad(&fAtomic, &load); + benchmark::ClobberMemory(); + } +} +BENCHMARK_REGISTER_F(RHistAtomic_double, AtomicLoad); + +BENCHMARK_DEFINE_F(RHistAtomic_double, AtomicLoadAcquire)(benchmark::State &state) +{ + double load; + for (auto _ : state) { + ROOT::Experimental::Internal::AtomicLoadAcquire(&fAtomic, &load); + benchmark::ClobberMemory(); + } +} +BENCHMARK_REGISTER_F(RHistAtomic_double, AtomicLoadAcquire); + +BENCHMARK_DEFINE_F(RHistAtomic_double, AtomicStoreRelease)(benchmark::State &state) +{ + double store = 1.0; + for (auto _ : state) { + ROOT::Experimental::Internal::AtomicStoreRelease(&fAtomic, &store); + benchmark::ClobberMemory(); + } +} +BENCHMARK_REGISTER_F(RHistAtomic_double, AtomicStoreRelease); + struct RBinWithError : public benchmark::Fixture { ROOT::Experimental::RBinWithError fBin; }; @@ -95,14 +212,14 @@ BENCHMARK_DEFINE_F(RBinWithError, Inc)(benchmark::State &state) } BENCHMARK_REGISTER_F(RBinWithError, Inc); -BENCHMARK_DEFINE_F(RBinWithError, AtomicInc)(benchmark::State &state) +BENCHMARK_DEFINE_F(RBinWithError, AtomicIncRelease)(benchmark::State &state) { for (auto _ : state) { - fBin.AtomicInc(); + fBin.AtomicIncRelease(); benchmark::ClobberMemory(); } } -BENCHMARK_REGISTER_F(RBinWithError, AtomicInc); +BENCHMARK_REGISTER_F(RBinWithError, AtomicIncRelease); BENCHMARK_DEFINE_F(RBinWithError, Add)(benchmark::State &state) { @@ -113,13 +230,23 @@ BENCHMARK_DEFINE_F(RBinWithError, Add)(benchmark::State &state) } BENCHMARK_REGISTER_F(RBinWithError, Add); -BENCHMARK_DEFINE_F(RBinWithError, AtomicAdd)(benchmark::State &state) +BENCHMARK_DEFINE_F(RBinWithError, AtomicAddRelease)(benchmark::State &state) { for (auto _ : state) { - fBin.AtomicAdd(1.0); + fBin.AtomicAddRelease(1.0); benchmark::ClobberMemory(); } } -BENCHMARK_REGISTER_F(RBinWithError, AtomicAdd); +BENCHMARK_REGISTER_F(RBinWithError, AtomicAddRelease); + +BENCHMARK_DEFINE_F(RBinWithError, AtomicLoad)(benchmark::State &state) +{ + ROOT::Experimental::RBinWithError load; + for (auto _ : state) { + fBin.AtomicLoad(&load); + benchmark::DoNotOptimize(load); + } +} +BENCHMARK_REGISTER_F(RBinWithError, AtomicLoad); BENCHMARK_MAIN(); diff --git a/hist/histv7/inc/ROOT/RBinWithError.hxx b/hist/histv7/inc/ROOT/RBinWithError.hxx index 44ccd2652348d..de95950577d59 100644 --- a/hist/histv7/inc/ROOT/RBinWithError.hxx +++ b/hist/histv7/inc/ROOT/RBinWithError.hxx @@ -8,6 +8,7 @@ #include "RHistUtils.hxx" #include +#include namespace ROOT { namespace Experimental { @@ -74,29 +75,71 @@ private: Internal::AtomicLoad(&fSum2, &origSum2); } - // The variable appears to be unlocked, confirm with a compare-exchange. + // The variable appears to be unlocked, confirm with a compare-exchange. It uses acquire memory order in case + // of success, so the release store synchronizes with this load. This ensures that the previous write of fSum + // becomes a visible side-effect in this thread and the access below will see it. double negated = std::copysign(origSum2, -1.0); if (Internal::AtomicCompareExchangeAcquire(&fSum2, &origSum2, &negated)) { break; } } - // By using a spin lock, we do not need atomic operations for fSum. - fSum += a; + // By using a spin lock, we would not need atomic operations for fSum. However, we must use a release store to + // guarantee correctness of AtomicLoad. + double sum; + Internal::AtomicLoad(&fSum, &sum); + sum += a; + Internal::AtomicStoreRelease(&fSum, &sum); double sum2 = origSum2 + a2; + // This store synchronizes with the compare-exchange, see above. Internal::AtomicStoreRelease(&fSum2, &sum2); } public: - void AtomicInc() { AtomicAdd(1.0, 1.0); } + void AtomicIncRelease() { AtomicAdd(1.0, 1.0); } - void AtomicAdd(double w) { AtomicAdd(w, w * w); } + void AtomicAddRelease(double w) { AtomicAdd(w, w * w); } /// Add another bin content using atomic instructions. /// /// \param[in] rhs another bin content that must not be modified during the operation void AtomicAdd(const RBinWithError &rhs) { AtomicAdd(rhs.fSum, rhs.fSum2); } + + void AtomicLoad(RBinWithError *ret) const + { + double origSum2; + Internal::AtomicLoadAcquire(&fSum2, &origSum2); + + while (true) { + // Repeat loads from memory until we see a non-negative value. + // NB: do not use origSum2 < 0, it does not work for -0.0! + while (std::signbit(origSum2)) { + Internal::AtomicLoadAcquire(&fSum2, &origSum2); + } + + // The release store to fSum2 at the end of AtomicAdd synchronizes with the last acquire load above. This + // ensures that the previous write of fSum becomes a visible side-effect in this thread and the atomic load + // below will see it. + + Internal::AtomicLoadAcquire(&fSum, &ret->fSum); + + // In the reverse direction, the release store of fSum synchronizes with the acquire load above. This ensures + // that the previous write of fSum2 becomes a visible side-effect and we would notice if the spinlock is taken. + + // NB: an acquire load is required here in case the comparison fails below and we repeat the outer loop. + Internal::AtomicLoadAcquire(&fSum2, &ret->fSum2); + + // Check if the value of fSum2 is still identical to the first load. + // NB: do not use origSum2 == ret->fSum2, it is problematic because 0.0 == -0.0! + if (!std::memcmp(&origSum2, &ret->fSum2, sizeof(origSum2))) { + return; + } + + // The comparison failed, an update happened or is in progress. + origSum2 = ret->fSum2; + } + } }; } // namespace Experimental diff --git a/hist/histv7/inc/ROOT/RHistEngine.hxx b/hist/histv7/inc/ROOT/RHistEngine.hxx index f438e60b2c789..6b77023a71eb1 100644 --- a/hist/histv7/inc/ROOT/RHistEngine.hxx +++ b/hist/histv7/inc/ROOT/RHistEngine.hxx @@ -17,9 +17,11 @@ #include "RWeight.hxx" #include +#include #include #include #include +#include #include #include #include @@ -500,7 +502,7 @@ public: RLinearizedIndex index = fAxes.ComputeGlobalIndexImpl(args); if (index.fValid) { assert(index.fIndex < fBinContents.size()); - Internal::AtomicInc(&fBinContents[index.fIndex]); + Internal::AtomicIncRelease(&fBinContents[index.fIndex]); } } @@ -524,7 +526,7 @@ public: RLinearizedIndex index = fAxes.ComputeGlobalIndexImpl(args); if (index.fValid) { assert(index.fIndex < fBinContents.size()); - Internal::AtomicAdd(&fBinContents[index.fIndex], weight.fValue); + Internal::AtomicAddRelease(&fBinContents[index.fIndex], weight.fValue); } } @@ -549,7 +551,7 @@ public: RLinearizedIndex index = fAxes.ComputeGlobalIndexImpl(args); if (index.fValid) { assert(index.fIndex < fBinContents.size()); - Internal::AtomicAdd(&fBinContents[index.fIndex], weight); + Internal::AtomicAddRelease(&fBinContents[index.fIndex], weight); } } @@ -573,7 +575,7 @@ public: RLinearizedIndex index = fAxes.ComputeGlobalIndexImpl(t); if (index.fValid) { assert(index.fIndex < fBinContents.size()); - Internal::AtomicAdd(&fBinContents[index.fIndex], weight.fValue); + Internal::AtomicAddRelease(&fBinContents[index.fIndex], weight.fValue); } } else { FillAtomic(t); @@ -812,6 +814,47 @@ public: return Slice(sliceSpecs); } + /// Create an atomic snapshot of this histogram engine. + /// + /// A snapshot is a consistent copy of the histogram, during concurrent filling. It is guaranteed that the returned + /// copy represents a state between the begin and end of the snapshot operation. + /// + /// Snapshotting a histogram engine with many bins can be an expensive operation. + /// + /// \return the atomic snapshot + RHistEngine SnapshotAtomic() const + { + static_assert(std::is_trivially_copyable_v, + "snapshotting requires a trivially copyable bin content type"); + + RHistEngine snapshot(fAxes.Get()); + // Do a first collect. + for (std::size_t i = 0; i < fBinContents.size(); i++) { + Internal::AtomicLoad(&fBinContents[i], &snapshot.fBinContents[i]); + } + + // Now do another collect. If no change is detected, the snapshot is consistent. Otherwise update the bin contents + // and try again. + BinContentType tmp; + bool changed; + do { + // To guarantee correctness, we let the release operation(s) in FillAtomic synchronize with this acquire fence. + // This ensures that all previous writes become visible side-effects and the atomic loads will see them. + std::atomic_thread_fence(std::memory_order_acquire); + + changed = false; + for (std::size_t i = 0; i < fBinContents.size(); i++) { + Internal::AtomicLoad(&fBinContents[i], &tmp); + if (std::memcmp(&tmp, &snapshot.fBinContents[i], sizeof(BinContentType))) { + std::memcpy(&snapshot.fBinContents[i], &tmp, sizeof(BinContentType)); + changed = true; + } + } + } while (changed); + + return snapshot; + } + /// \} /// %ROOT Streamer function to throw when trying to store an object of this class. diff --git a/hist/histv7/inc/ROOT/RHistUtils.hxx b/hist/histv7/inc/ROOT/RHistUtils.hxx index 452819ab4739b..f045f58208843 100644 --- a/hist/histv7/inc/ROOT/RHistUtils.hxx +++ b/hist/histv7/inc/ROOT/RHistUtils.hxx @@ -142,7 +142,7 @@ struct AtomicOps<8> { #endif template -void AtomicLoad(const T *ptr, T *ret) +std::enable_if_t> AtomicLoad(const T *ptr, T *ret) { #ifndef _MSC_VER __atomic_load(ptr, ret, __ATOMIC_RELAXED); @@ -152,7 +152,25 @@ void AtomicLoad(const T *ptr, T *ret) } template -void AtomicStoreRelease(T *ptr, T *val) +auto AtomicLoad(const T *ptr, T *ret) -> decltype(ptr->AtomicLoad(ret)) +{ + return ptr->AtomicLoad(ret); +} + +template +std::enable_if_t> AtomicLoadAcquire(const T *ptr, T *ret) +{ +#ifndef _MSC_VER + __atomic_load(ptr, ret, __ATOMIC_ACQUIRE); +#else + MSVC::AtomicOps::Load(ptr, ret); + // Cannot specify the memory order directly, use a fence. + std::atomic_thread_fence(std::memory_order_acquire); +#endif +} + +template +std::enable_if_t> AtomicStoreRelease(T *ptr, T *val) { #ifndef _MSC_VER __atomic_store(ptr, val, __ATOMIC_RELEASE); @@ -164,7 +182,7 @@ void AtomicStoreRelease(T *ptr, T *val) } template -bool AtomicCompareExchange(T *ptr, T *expected, T *desired) +std::enable_if_t, bool> AtomicCompareExchange(T *ptr, T *expected, T *desired) { #ifndef _MSC_VER return __atomic_compare_exchange(ptr, expected, desired, /*weak=*/false, __ATOMIC_RELAXED, __ATOMIC_RELAXED); @@ -174,7 +192,7 @@ bool AtomicCompareExchange(T *ptr, T *expected, T *desired) } template -bool AtomicCompareExchangeAcquire(T *ptr, T *expected, T *desired) +std::enable_if_t, bool> AtomicCompareExchangeAcquire(T *ptr, T *expected, T *desired) { #ifndef _MSC_VER return __atomic_compare_exchange(ptr, expected, desired, /*weak=*/false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED); @@ -187,7 +205,19 @@ bool AtomicCompareExchangeAcquire(T *ptr, T *expected, T *desired) } template -void AtomicAddCompareExchangeLoop(T *ptr, T val) +std::enable_if_t, bool> AtomicCompareExchangeRelease(T *ptr, T *expected, T *desired) +{ +#ifndef _MSC_VER + return __atomic_compare_exchange(ptr, expected, desired, /*weak=*/false, __ATOMIC_RELEASE, __ATOMIC_RELAXED); +#else + // Cannot specify the memory order directly, use an unconditional fence to avoid branching code. + std::atomic_thread_fence(std::memory_order_release); + return MSVC::AtomicOps::CompareExchange(ptr, expected, desired); +#endif +} + +template +std::enable_if_t> AtomicAddCompareExchangeLoop(T *ptr, T val) { T expected; AtomicLoad(ptr, &expected); @@ -198,6 +228,18 @@ void AtomicAddCompareExchangeLoop(T *ptr, T val) } } +template +std::enable_if_t> AtomicAddCompareExchangeReleaseLoop(T *ptr, T val) +{ + T expected; + AtomicLoad(ptr, &expected); + T desired = expected + val; + while (!AtomicCompareExchangeRelease(ptr, &expected, &desired)) { + // expected holds the new value; try again. + desired = expected + val; + } +} + #ifdef _MSC_VER namespace MSVC { inline void AtomicOps<8>::Add(void *ptr, const void *val) @@ -227,17 +269,35 @@ std::enable_if_t> AtomicAdd(T *ptr, T val) AtomicAddCompareExchangeLoop(ptr, val); } +template +std::enable_if_t> AtomicAddRelease(T *ptr, T val) +{ +#ifndef _MSC_VER + __atomic_fetch_add(ptr, val, __ATOMIC_RELEASE); +#else + // Cannot specify the memory order directly, use a fence. + std::atomic_thread_fence(std::memory_order_release); + MSVC::AtomicOps::Add(ptr, &val); +#endif +} + +template +std::enable_if_t> AtomicAddRelease(T *ptr, T val) +{ + AtomicAddCompareExchangeReleaseLoop(ptr, val); +} + // For adding a double-precision weight to a single-precision bin content type, cast the argument once before the // compare-exchange loop. -static inline void AtomicAdd(float *ptr, double val) +static inline void AtomicAddRelease(float *ptr, double val) { - AtomicAdd(ptr, static_cast(val)); + AtomicAddRelease(ptr, static_cast(val)); } template -std::enable_if_t> AtomicInc(T *ptr) +std::enable_if_t> AtomicIncRelease(T *ptr) { - AtomicAdd(ptr, static_cast(1)); + AtomicAddRelease(ptr, static_cast(1)); } template @@ -246,10 +306,16 @@ auto AtomicAdd(T *ptr, const U &add) -> decltype(ptr->AtomicAdd(add)) return ptr->AtomicAdd(add); } +template +auto AtomicAddRelease(T *ptr, const U &add) -> decltype(ptr->AtomicAddRelease(add)) +{ + return ptr->AtomicAddRelease(add); +} + template -auto AtomicInc(T *ptr) -> decltype(ptr->AtomicInc()) +auto AtomicIncRelease(T *ptr) -> decltype(ptr->AtomicIncRelease()) { - return ptr->AtomicInc(); + return ptr->AtomicIncRelease(); } } // namespace Internal diff --git a/hist/histv7/test/hist_atomic.cxx b/hist/histv7/test/hist_atomic.cxx index b31c55c19a0d9..c1502d283b664 100644 --- a/hist/histv7/test/hist_atomic.cxx +++ b/hist/histv7/test/hist_atomic.cxx @@ -1,5 +1,6 @@ #include "hist_test.hxx" +#include #include #ifndef TYPED_TEST_SUITE @@ -12,10 +13,10 @@ class RHistAtomic : public testing::Test {}; using AtomicTypes = testing::Types; TYPED_TEST_SUITE(RHistAtomic, AtomicTypes); -TYPED_TEST(RHistAtomic, AtomicInc) +TYPED_TEST(RHistAtomic, AtomicIncRelease) { TypeParam a = 1; - ROOT::Experimental::Internal::AtomicInc(&a); + ROOT::Experimental::Internal::AtomicIncRelease(&a); EXPECT_EQ(a, 2); } @@ -27,7 +28,15 @@ TYPED_TEST(RHistAtomic, AtomicAdd) EXPECT_EQ(a, 3); } -// AtomicInc is implemented in terms of AtomicAdd, so it's sufficient to stress one of them. +TYPED_TEST(RHistAtomic, AtomicAddRelease) +{ + TypeParam a = 1; + const TypeParam b = 2; + ROOT::Experimental::Internal::AtomicAddRelease(&a, b); + EXPECT_EQ(a, 3); +} + +// AtomicInc* is implemented in terms of AtomicAdd*, so it's sufficient to stress one of them. TYPED_TEST(RHistAtomic, StressAtomicAdd) { static constexpr TypeParam Addend = 1; @@ -46,25 +55,43 @@ TYPED_TEST(RHistAtomic, StressAtomicAdd) EXPECT_EQ(a, NAdds * Addend); } -TEST(AtomicAdd, FloatDouble) +TYPED_TEST(RHistAtomic, StressAtomicAddRelease) +{ + static constexpr TypeParam Addend = 1; + static constexpr std::size_t NThreads = 4; + // Reduce number of additions for char to avoid overflow. + static constexpr std::size_t NAddsPerThread = sizeof(TypeParam) == 1 ? 20 : 8000; + static constexpr std::size_t NAdds = NThreads * NAddsPerThread; + + TypeParam a = 0; + StressInParallel(NThreads, [&] { + for (std::size_t i = 0; i < NAddsPerThread; i++) { + ROOT::Experimental::Internal::AtomicAddRelease(&a, Addend); + } + }); + + EXPECT_EQ(a, NAdds * Addend); +} + +TEST(AtomicAddRelease, FloatDouble) { float a = 1; const double b = 2; - ROOT::Experimental::Internal::AtomicAdd(&a, b); + ROOT::Experimental::Internal::AtomicAddRelease(&a, b); EXPECT_EQ(a, 3); } -TEST(RBinWithError, AtomicAdd) +TEST(RBinWithError, AtomicAddRelease) { RBinWithError bin; bin.fSum = 1; bin.fSum2 = 2; - bin.AtomicAdd(1.5); + bin.AtomicAddRelease(1.5); EXPECT_EQ(bin.fSum, 2.5); EXPECT_EQ(bin.fSum2, 4.25); } -TEST(RBinWithError, StressAtomicAdd) +TEST(RBinWithError, StressAtomicAddRelease) { static constexpr double Addend = 2.0; static constexpr std::size_t NThreads = 4; @@ -74,10 +101,55 @@ TEST(RBinWithError, StressAtomicAdd) RBinWithError bin; StressInParallel(NThreads, [&] { for (std::size_t i = 0; i < NAddsPerThread; i++) { - bin.AtomicAdd(Addend); + bin.AtomicAddRelease(Addend); } }); EXPECT_EQ(bin.fSum, NAdds * Addend); EXPECT_EQ(bin.fSum2, NAdds * Addend * Addend); } + +TEST(RBinWithError, AtomicLoad) +{ + RBinWithError bin; + bin.fSum = 1; + bin.fSum2 = 2; + + RBinWithError load; + bin.AtomicLoad(&load); + EXPECT_EQ(load.fSum, 1); + EXPECT_EQ(load.fSum2, 2); +} + +TEST(RBinWithError, StressAtomicLoad) +{ + static constexpr double Addend = 2.0; + static constexpr std::size_t NThreads = 4; + static constexpr std::size_t NAddsPerThread = 8000; + static constexpr std::size_t NLoads = 8000; + + RBinWithError bin; + double sum = 0, sum2 = 0; + + std::atomic_flag loader; + StressInParallel(NThreads, [&] { + if (!loader.test_and_set()) { + RBinWithError load; + for (std::size_t i = 0; i < NLoads; i++) { + bin.AtomicLoad(&load); + if (load.fSum * Addend != load.fSum2) { + // Failure! Store the values and stop the loader. + sum = load.fSum; + sum2 = load.fSum2; + return; + } + } + } else { + for (std::size_t i = 0; i < NAddsPerThread; i++) { + bin.AtomicAddRelease(Addend); + } + } + }); + + EXPECT_EQ(sum * Addend, sum2); +} diff --git a/hist/histv7/test/hist_engine_atomic.cxx b/hist/histv7/test/hist_engine_atomic.cxx index 3d141ae7c1107..41e110740a587 100644 --- a/hist/histv7/test/hist_engine_atomic.cxx +++ b/hist/histv7/test/hist_engine_atomic.cxx @@ -267,6 +267,64 @@ TEST(RHistEngine, StressFillAddAtomicWeight) EXPECT_EQ(engineA.GetBinContent(0), NOps * Weight); } +TEST(RHistEngine, SnapshotAtomic) +{ + static constexpr std::size_t Bins = 20; + const RRegularAxis axis(Bins, {0, Bins}); + RHistEngine engineA({axis}); + + engineA.Fill(-100); + for (std::size_t i = 0; i < Bins; i++) { + engineA.Fill(i + 0.5); + } + engineA.Fill(100); + + RHistEngine engineB = engineA.SnapshotAtomic(); + ASSERT_EQ(engineB.GetNDimensions(), 1); + ASSERT_EQ(engineB.GetTotalNBins(), Bins + 2); + + EXPECT_EQ(engineB.GetBinContent(RBinIndex::Underflow()), 1); + for (auto index : axis.GetNormalRange()) { + EXPECT_EQ(engineB.GetBinContent(index), 1); + } + EXPECT_EQ(engineB.GetBinContent(RBinIndex::Overflow()), 1); +} + +TEST(RHistEngine, StressSnapshotAtomic) +{ + static constexpr std::size_t Bins = 20; + static constexpr std::size_t NThreads = 4; + static constexpr std::size_t NFillsPerThread = 100000; + static constexpr std::size_t NSnapshots = 10000; + + // Create a histogram with some bins that takes a bit of time to snapshot. The idea of this stress test is then to + // fill the first and last bin from multiple threads. If the snapshot is consistent, it must never have a bigger bin + // content in the last bin. + RHistEngine engine(Bins, {0, Bins}); + int first = 0, last = 0; + + std::atomic_flag snapshotter; + StressInParallel(NThreads, [&] { + if (!snapshotter.test_and_set()) { + for (std::size_t i = 0; i < NSnapshots; i++) { + RHistEngine snapshot = engine.SnapshotAtomic(); + first = snapshot.GetBinContent(0); + last = snapshot.GetBinContent(Bins - 1); + if (last > first) { + return; + } + } + } else { + for (std::size_t i = 0; i < NFillsPerThread; i++) { + engine.FillAtomic(0.5); + engine.FillAtomic(Bins - 0.5); + } + } + }); + + EXPECT_GE(first, last); +} + TEST(RHistEngine_RBinWithError, AddAtomic) { static constexpr std::size_t Bins = 20; @@ -383,3 +441,61 @@ TEST(RHistEngine_RBinWithError, StressFillAtomicWeight) EXPECT_EQ(engine.GetBinContent(0).fSum, NFills * Weight); EXPECT_EQ(engine.GetBinContent(0).fSum2, NFills * Weight * Weight); } + +TEST(RHistEngine_RBinWithError, SnapshotAtomic) +{ + static constexpr std::size_t Bins = 20; + const RRegularAxis axis(Bins, {0, Bins}); + RHistEngine engineA({axis}); + + engineA.Fill(-100); + for (std::size_t i = 0; i < Bins; i++) { + engineA.Fill(i + 0.5); + } + engineA.Fill(100); + + RHistEngine engineB = engineA.SnapshotAtomic(); + ASSERT_EQ(engineB.GetNDimensions(), 1); + ASSERT_EQ(engineB.GetTotalNBins(), Bins + 2); + + EXPECT_EQ(engineB.GetBinContent(RBinIndex::Underflow()).fSum, 1); + for (auto index : axis.GetNormalRange()) { + EXPECT_EQ(engineB.GetBinContent(index).fSum, 1); + } + EXPECT_EQ(engineB.GetBinContent(RBinIndex::Overflow()).fSum, 1); +} + +TEST(RHistEngine_RBinWithError, StressSnapshotAtomic) +{ + static constexpr std::size_t Bins = 20; + static constexpr std::size_t NThreads = 4; + static constexpr std::size_t NFillsPerThread = 100000; + static constexpr std::size_t NSnapshots = 10000; + + // Create a histogram with some bins that takes a bit of time to snapshot. The idea of this stress test is then to + // fill the first and last bin from multiple threads. If the snapshot is consistent, it must never have a bigger bin + // content in the last bin. + RHistEngine engine(Bins, {0, Bins}); + double first = 0, last = 0; + + std::atomic_flag snapshotter; + StressInParallel(NThreads, [&] { + if (!snapshotter.test_and_set()) { + for (std::size_t i = 0; i < NSnapshots; i++) { + RHistEngine snapshot = engine.SnapshotAtomic(); + first = snapshot.GetBinContent(0).fSum; + last = snapshot.GetBinContent(Bins - 1).fSum; + if (last > first) { + return; + } + } + } else { + for (std::size_t i = 0; i < NFillsPerThread; i++) { + engine.FillAtomic(0.5); + engine.FillAtomic(Bins - 0.5); + } + } + }); + + EXPECT_GE(first, last); +} diff --git a/hist/histv7/test/hist_user.cxx b/hist/histv7/test/hist_user.cxx index b5d348da0cdd9..fea882bb4df25 100644 --- a/hist/histv7/test/hist_user.cxx +++ b/hist/histv7/test/hist_user.cxx @@ -54,13 +54,15 @@ struct User { return *this; } - void AtomicInc() { ROOT::Experimental::Internal::AtomicInc(&fValue); } + void AtomicIncRelease() { ROOT::Experimental::Internal::AtomicIncRelease(&fValue); } - void AtomicAdd(double w) { ROOT::Experimental::Internal::AtomicAdd(&fValue, w); } + void AtomicAddRelease(double w) { ROOT::Experimental::Internal::AtomicAddRelease(&fValue, w); } - void AtomicAdd(const UserWeight &w) { ROOT::Experimental::Internal::AtomicAdd(&fValue, w.fWeight); } + void AtomicAddRelease(const UserWeight &w) { ROOT::Experimental::Internal::AtomicAddRelease(&fValue, w.fWeight); } void AtomicAdd(const User &rhs) { ROOT::Experimental::Internal::AtomicAdd(&fValue, rhs.fValue); } + + void AtomicLoad(User *ret) const { ROOT::Experimental::Internal::AtomicLoad(&fValue, &ret->fValue); } }; static_assert(std::is_nothrow_move_constructible_v>); @@ -192,7 +194,7 @@ TEST(RHistEngineUser, FillUserWeightInvalidNumberOfArguments) TEST(RHistEngineUser, FillAtomic) { - // Unweighted filling with atomic instructions uses AtomicInc + // Unweighted filling with atomic instructions uses AtomicIncRelease static constexpr std::size_t Bins = 20; const RRegularAxis axis(Bins, {0, Bins}); RHistEngine engine({axis}); @@ -207,7 +209,7 @@ TEST(RHistEngineUser, FillAtomic) TEST(RHistEngineUser, FillAtomicWeight) { - // Weighted filling with atomic instructions uses AtomicAdd(double) + // Weighted filling with atomic instructions uses AtomicAddRelease(double) static constexpr std::size_t Bins = 20; const RRegularAxis axis(Bins, {0, Bins}); RHistEngine engine({axis}); @@ -222,7 +224,7 @@ TEST(RHistEngineUser, FillAtomicWeight) TEST(RHistEngineUser, FillAtomicUserWeight) { - // Weighted filling with user-defined weight and atomic instructions uses AtomicAdd(const UserWeight &) + // Weighted filling with user-defined weight and atomic instructions uses AtomicAddRelease(const UserWeight &) static constexpr std::size_t Bins = 20; const RRegularAxis axis(Bins, {0, Bins}); RHistEngine engine({axis}); @@ -274,3 +276,16 @@ TEST(RHistEngineUser, SetBinContent) engine.SetBinContent(indices, 43); EXPECT_EQ(engine.GetBinContent(indices).fValue, 43); } + +TEST(RHistEngineUser, SnapshotAtomic) +{ + // Snapshotting uses AtomicLoad. + static constexpr std::size_t Bins = 20; + const RRegularAxis axis(Bins, {0, Bins}); + RHistEngine engineA({axis}); + + engineA.Fill(8.5); + + RHistEngine engineB = engineA.SnapshotAtomic(); + EXPECT_EQ(engineB.GetBinContent(8).fValue, 1); +}