Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions tree/ntuple/inc/ROOT/RPageSinkBuf.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <ROOT/RNTupleMetrics.hxx>
#include <ROOT/RPageStorage.hxx>

#include <atomic>
#include <cstddef>
#include <deque>
#include <functional>
#include <iterator>
Expand Down Expand Up @@ -109,6 +111,8 @@ private:
/// The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
/// For the unbuffered case, the RNTupleModel is instead managed by a RNTupleWriter.
std::unique_ptr<ROOT::RNTupleModel> fInnerModel;
/// The sum of uncompressed bytes in buffered pages. Used to heuristically reduce the memory usage.
std::atomic<std::size_t> fBufferedUncompressed = 0;
/// Vector of buffered column pages. Indexed by column id.
std::vector<RColumnBuf> fBufferedColumns;
/// Columns committed as suppressed are stored and passed to the inner sink at cluster commit
Expand All @@ -123,8 +127,8 @@ public:
explicit RPageSinkBuf(std::unique_ptr<RPageSink> inner);
RPageSinkBuf(const RPageSinkBuf&) = delete;
RPageSinkBuf& operator=(const RPageSinkBuf&) = delete;
RPageSinkBuf(RPageSinkBuf&&) = default;
RPageSinkBuf& operator=(RPageSinkBuf&&) = default;
RPageSinkBuf(RPageSinkBuf &&) = delete;
RPageSinkBuf &operator=(RPageSinkBuf &&) = delete;
~RPageSinkBuf() override;

ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final;
Expand Down
18 changes: 17 additions & 1 deletion tree/ntuple/src/RPageSinkBuf.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,13 @@ void ROOT::Internal::RPageSinkBuf::CommitPage(ColumnHandle_t columnHandle, const
}
};

if (!fTaskScheduler) {
// If we already buffer more uncompressed bytes than the approximate zipped cluster size, we assume there is enough
// work for other threads to pick up. This limits the buffer usage when sealing / compression tasks are not processed
// fast enough, and heuristically reduces the memory usage, especially for big compression factors.
std::size_t bufferedUncompressed = fBufferedUncompressed.load();
bool enoughWork = bufferedUncompressed > GetWriteOptions().GetApproxZippedClusterSize();

if (!fTaskScheduler || enoughWork) {
allocateBuf();
// Seal the page right now, avoiding the allocation and copy, but making sure that the page buffer is not aliased.
RSealPageConfig config;
Expand All @@ -194,16 +200,25 @@ void ROOT::Internal::RPageSinkBuf::CommitPage(ColumnHandle_t columnHandle, const
return;
}

// We will buffer the uncompressed page. Unless work is consumed fast enough, the next page might be compressed
// directly.
fBufferedUncompressed += page.GetNBytes();

// TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
zipItem.fPage = fPageAllocator->NewPage(page.GetElementSize(), page.GetNElements());
// make sure the page is aware of how many elements it will have
zipItem.fPage.GrowUnchecked(page.GetNElements());
assert(zipItem.fPage.GetNBytes() == page.GetNBytes());
memcpy(zipItem.fPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());

fCounters->fParallelZip.SetValue(1);
// Thread safety: Each thread works on a distinct zipItem which owns its
// compression buffer.
fTaskScheduler->AddTask([this, &zipItem, &sealedPage, &element, allocateBuf, shrinkSealedPage] {
// The task will consume the uncompressed page. Decrease the atomic counter early so that more work has arrived
// when we are done.
fBufferedUncompressed -= zipItem.fPage.GetNBytes();

allocateBuf();
RSealPageConfig config;
config.fPage = &zipItem.fPage;
Expand Down Expand Up @@ -241,6 +256,7 @@ void ROOT::Internal::RPageSinkBuf::CommitSealedPageV(
void ROOT::Internal::RPageSinkBuf::FlushClusterImpl(std::function<void(void)> FlushClusterFn)
{
WaitForAllTasks();
assert(fBufferedUncompressed == 0 && "all buffered pages should have been processed");

std::vector<RSealedPageGroup> toCommit;
toCommit.reserve(fBufferedColumns.size());
Expand Down
Loading