Skip to content

Commit

Permalink
Fix hash build memory over use (facebookincubator#10534)
Browse files Browse the repository at this point in the history
Summary:
For duplicate rows memory usage, currently under parallel join build conditions, each build operator reserves memory big enough to accommodate total number of rows across all hash tables from all build operators. Instead each build operator should only reserve memory enough for its own hash table rows.

This optimization reduced hash build operator memory usage by 10x and we see total memory reduction of some queries reduced by 70%.

Pull Request resolved: facebookincubator#10534

Reviewed By: zacw7

Differential Revision: D60131886

Pulled By: tanjialiang

fbshipit-source-id: a8c1c777df557dfcfc754ef31164a116fdb917c3

(cherry picked from commit 3fb9657)
  • Loading branch information
tanjialiang authored and liuxiang71 committed Sep 19, 2024
1 parent 88856e6 commit 23abe2a
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 75 deletions.
152 changes: 90 additions & 62 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,6 @@ bool HashBuild::finishHashBuild() {

std::vector<HashBuild*> otherBuilds;
otherBuilds.reserve(peers.size());
uint64_t numRows = table_->rows()->numRows();
for (auto& peer : peers) {
auto op = peer->findOperator(planNodeId());
HashBuild* build = dynamic_cast<HashBuild*>(op);
Expand All @@ -696,13 +695,10 @@ bool HashBuild::finishHashBuild() {
!build->stateCleared_,
"Internal state for a peer is empty. It might have already"
" been closed.");
numRows += build->table_->rows()->numRows();
}
otherBuilds.push_back(build);
}

ensureTableFits(numRows);

std::vector<std::unique_ptr<BaseHashTable>> otherTables;
otherTables.reserve(peers.size());
SpillPartitionSet spillPartitions;
Expand All @@ -723,20 +719,34 @@ bool HashBuild::finishHashBuild() {
spiller->finishSpill(spillPartitions);
}
}
bool allowDuplicateRows = table_->rows()->nextOffset() != 0;
if (allowDuplicateRows) {
ensureNextRowVectorFits(numRows, otherBuilds);
}

if (spiller_ != nullptr) {
spiller_->finishSpill(spillPartitions);
removeEmptyPartitions(spillPartitions);
}

// TODO: re-enable parallel join build with spilling triggered after
// https://github.com/facebookincubator/velox/issues/3567 is fixed.
// TODO: Get accurate signal if parallel join build is going to be applied
// from hash table. Currently there is still a chance inside hash table that
// it might decide it is not going to trigger parallel join build.
const bool allowParallelJoinBuild =
!otherTables.empty() && spillPartitions.empty();
ensureTableFits(otherBuilds, otherTables, allowParallelJoinBuild);

SCOPE_EXIT {
// Make a guard to release the unused memory reservation since we have
// finished the merged table build. The guard makes sure we release the
// memory reserved for other operators even when exceptions are thrown to
// prevent memory leak. We cannot rely on other operator's cleanup mechanism
// because when exceptions are thrown, other operator's cleanup mechanism
// might already have finished.
pool()->release();
for (auto* build : otherBuilds) {
build->pool()->release();
}
};

// TODO: Re-enable parallel join build with spilling triggered after
// https://github.com/facebookincubator/velox/issues/3567 is fixed.
CpuWallTiming timing;
{
CpuWallTimer cpuWallTimer{timing};
Expand All @@ -757,25 +767,19 @@ bool HashBuild::finishHashBuild() {
if (spillEnabled()) {
stateCleared_ = true;
}

// Release the unused memory reservation since we have finished the merged
// table build.
pool()->release();
if (allowDuplicateRows) {
for (auto* build : otherBuilds) {
build->pool()->release();
}
}
return true;
}

void HashBuild::ensureTableFits(uint64_t numRows) {
void HashBuild::ensureTableFits(
const std::vector<HashBuild*>& otherBuilds,
const std::vector<std::unique_ptr<BaseHashTable>>& otherTables,
bool isParallelJoin) {
// NOTE: we don't need memory reservation if all the partitions have been
// spilled as nothing need to be built.
if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled() ||
numRows == 0) {
if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled()) {
return;
}
VELOX_CHECK_EQ(otherBuilds.size(), otherTables.size());

// Test-only spill path.
if (testingTriggerSpill(pool()->name())) {
Expand All @@ -784,58 +788,82 @@ void HashBuild::ensureTableFits(uint64_t numRows) {
return;
}

// NOTE: reserve a bit more memory to consider the extra memory used for
// parallel table build operation.
const uint64_t bytesToReserve = table_->estimateHashTableSize(numRows) * 1.1;
TestValue::adjust("facebook::velox::exec::HashBuild::ensureTableFits", this);

const auto dupRowOverheadBytes = sizeof(char*) + sizeof(NextRowVector);

uint64_t totalNumRows{0};
uint64_t lastBuildBytesToReserve{0};
bool allowDuplicateRows{false};
{
Operator::ReclaimableSectionGuard guard(this);
TestValue::adjust(
"facebook::velox::exec::HashBuild::ensureTableFits", this);
if (pool()->maybeReserve(bytesToReserve)) {
return;
std::lock_guard<std::mutex> l(mutex_);
const auto numRows = table_->rows()->numRows();
totalNumRows += numRows;
allowDuplicateRows = table_->rows()->nextOffset() != 0;
if (allowDuplicateRows) {
lastBuildBytesToReserve += numRows * dupRowOverheadBytes;
}
}

LOG(WARNING) << "Failed to reserve " << succinctBytes(bytesToReserve)
<< " for memory pool " << pool()->name()
<< ", usage: " << succinctBytes(pool()->usedBytes())
<< ", reservation: " << succinctBytes(pool()->reservedBytes());
}
for (auto i = 0; i < otherTables.size(); i++) {
auto& otherTable = otherTables[i];
VELOX_CHECK_NOT_NULL(otherTable);
auto& otherBuild = otherBuilds[i];
const auto& rowContainer = otherTable->rows();
int64_t numRows{0};
{
std::lock_guard<std::mutex> l(otherBuild->mutex_);
numRows = rowContainer->numRows();
}
if (numRows == 0) {
continue;
}

void HashBuild::ensureNextRowVectorFits(
uint64_t numRows,
const std::vector<HashBuild*>& otherBuilds) {
if (!spillEnabled()) {
return;
totalNumRows += numRows;
if (!allowDuplicateRows) {
continue;
}

const auto dupRowBytesToReserve = numRows * dupRowOverheadBytes;
if (!isParallelJoin) {
lastBuildBytesToReserve += dupRowBytesToReserve;
continue;
}

Operator::ReclaimableSectionGuard guard(otherBuild);
auto* otherPool = otherBuild->pool();

// Reserve memory for memory allocations for next-row-vectors in
// otherBuild operators if it is parallel join build. Otherwise all
// next-row-vectors shall be allocated from the last build operator.
if (!otherPool->maybeReserve(dupRowBytesToReserve)) {
LOG(WARNING)
<< "Failed to reserve " << succinctBytes(dupRowBytesToReserve)
<< " for for duplicate row memory allocation from non-last memory pool "
<< otherPool->name()
<< ", usage: " << succinctBytes(otherPool->usedBytes())
<< ", reservation: " << succinctBytes(otherPool->reservedBytes());
}
}

TestValue::adjust(
"facebook::velox::exec::HashBuild::ensureNextRowVectorFits", this);
if (totalNumRows == 0) {
return;
}

// The memory allocation for next-row-vectors may stuck in
// 'SharedArbitrator::growCapacity' when memory arbitrating is also
// triggered. Reserve memory for next-row-vectors to prevent this issue.
auto bytesToReserve = numRows * (sizeof(char*) + sizeof(NextRowVector));
// NOTE: reserve a bit more memory to consider the extra memory used for
// parallel table build operation.
lastBuildBytesToReserve += table_->estimateHashTableSize(totalNumRows) * 1.1;
{
Operator::ReclaimableSectionGuard guard(this);
if (!pool()->maybeReserve(bytesToReserve)) {
LOG(WARNING) << "Failed to reserve " << succinctBytes(bytesToReserve)
<< " for memory pool " << pool()->name()
<< ", usage: " << succinctBytes(pool()->usedBytes())
<< ", reservation: "
<< succinctBytes(pool()->reservedBytes());
}
}
for (auto* build : otherBuilds) {
Operator::ReclaimableSectionGuard guard(build);
if (!build->pool()->maybeReserve(bytesToReserve)) {
LOG(WARNING) << "Failed to reserve " << succinctBytes(bytesToReserve)
<< " for memory pool " << build->pool()->name()
<< ", usage: " << succinctBytes(build->pool()->usedBytes())
<< ", reservation: "
<< succinctBytes(build->pool()->reservedBytes());
if (pool()->maybeReserve(lastBuildBytesToReserve)) {
return;
}
}

LOG(WARNING) << "Failed to reserve " << succinctBytes(lastBuildBytesToReserve)
<< " for last build memory pool " << pool()->name()
<< ", usage: " << succinctBytes(pool()->usedBytes())
<< ", reservation: " << succinctBytes(pool()->reservedBytes());
}

void HashBuild::postHashBuildProcess() {
Expand Down
17 changes: 6 additions & 11 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,12 @@ class HashBuild final : public Operator {
// enabled.
void ensureInputFits(RowVectorPtr& input);

// Invoked to ensure there is sufficient memory to build the join table with
// the specified 'numRows' if spilling is enabled. The function throws to fail
// the query if the memory reservation fails.
void ensureTableFits(uint64_t numRows);

// Invoked to ensure there is sufficient memory to build the next-row-vectors
// with the specified 'numRows' if spilling is enabled. The function throws to
// fail the query if the memory reservation fails.
void ensureNextRowVectorFits(
uint64_t numRows,
const std::vector<HashBuild*>& otherBuilds);
// Invoked to ensure there is sufficient memory to build the join table. The
// function throws to fail the query if the memory reservation fails.
void ensureTableFits(
const std::vector<HashBuild*>& otherBuilds,
const std::vector<std::unique_ptr<BaseHashTable>>& otherTables,
bool isParallelJoin);

// Invoked to compute spill partitions numbers for each row 'input' and spill
// rows to spiller directly if the associated partition(s) is spilling. The
Expand Down
92 changes: 90 additions & 2 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6989,6 +6989,91 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringTableBuild) {
.run();
}

DEBUG_ONLY_TEST_F(HashJoinTest, exceptionDuringFinishJoinBuild) {
// This test is to make sure there is no memory leak when exceptions are
// thrown while parallelly preparing join table.
auto memoryManager = memory::memoryManager();
const auto& arbitrator = memoryManager->arbitrator();
const uint64_t numDrivers = 2;
const auto expectedFreeCapacityBytes = arbitrator->stats().freeCapacityBytes;

const uint64_t numBuildSideRows = 500;
auto buildKeyVector = makeFlatVector<int64_t>(
numBuildSideRows,
[](vector_size_t row) { return folly::Random::rand64(); });
auto buildSideVector =
makeRowVector({"b0", "b1"}, {buildKeyVector, buildKeyVector});
std::vector<RowVectorPtr> buildSideVectors;
for (int i = 0; i < numDrivers; ++i) {
buildSideVectors.push_back(buildSideVector);
}
createDuckDbTable("build", buildSideVectors);

const uint64_t numProbeSideRows = 10;
auto probeKeyVector = makeFlatVector<int64_t>(
numProbeSideRows,
[&](vector_size_t row) { return buildKeyVector->valueAt(row); });
auto probeSideVector =
makeRowVector({"p0", "p1"}, {probeKeyVector, probeKeyVector});
std::vector<RowVectorPtr> probeSideVectors;
for (int i = 0; i < numDrivers; ++i) {
probeSideVectors.push_back(probeSideVector);
}
createDuckDbTable("probe", probeSideVectors);

ASSERT_EQ(arbitrator->stats().freeCapacityBytes, expectedFreeCapacityBytes);

// We set the task to fail right before we reserve memory for other operators.
// We rely on the driver suspension before parallel join build to throw
// exceptions (suspension on an already terminated task throws).
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::HashBuild::ensureTableFits",
std::function<void(HashBuild*)>([&](HashBuild* buildOp) {
try {
VELOX_FAIL("Simulated failure");
} catch (VeloxException& e) {
buildOp->testingOperatorCtx()->task()->setError(
std::current_exception());
}
}));

std::vector<RowVectorPtr> probeInput = {probeSideVector};
std::vector<RowVectorPtr> buildInput = {buildSideVector};
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
const auto spillDirectory = exec::test::TempDirectoryPath::create();

ASSERT_EQ(arbitrator->stats().freeCapacityBytes, expectedFreeCapacityBytes);
VELOX_ASSERT_THROW(
AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->getPath())
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kJoinSpillEnabled, true)
.queryCtx(
newQueryCtx(memoryManager, executor_.get(), kMemoryCapacity))
.maxDrivers(numDrivers)
.plan(PlanBuilder(planNodeIdGenerator)
.values(probeInput, true)
.hashJoin(
{"p0"},
{"b0"},
PlanBuilder(planNodeIdGenerator)
.values(buildInput, true)
.planNode(),
"",
{"p0", "p1", "b0", "b1"},
core::JoinType::kInner)
.planNode())
.assertResults(
"SELECT probe.p0, probe.p1, build.b0, build.b1 FROM probe "
"INNER JOIN build ON probe.p0 = build.b0"),
"Simulated failure");
// This test uses on-demand created memory manager instead of the global
// one. We need to make sure any used memory got cleaned up before exiting
// the scope
waitForAllTasksToBeDeleted();
ASSERT_EQ(arbitrator->stats().freeCapacityBytes, expectedFreeCapacityBytes);
}

DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredDuringParallelJoinBuild) {
std::unique_ptr<memory::MemoryManager> memoryManager = createMemoryManager();
const auto& arbitrator = memoryManager->arbitrator();
Expand Down Expand Up @@ -7077,8 +7162,11 @@ DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredByEnsureJoinTableFit) {
.injectSpill(false)
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
auto opStats = toOperatorStats(task->taskStats());
ASSERT_GT(opStats.at("HashProbe").spilledBytes, 0);
ASSERT_GT(opStats.at("HashBuild").spilledBytes, 0);
ASSERT_GT(
opStats.at("HashBuild")
.runtimeStats["memoryArbitrationWallNanos"]
.count,
0);
})
.run();
}
Expand Down

0 comments on commit 23abe2a

Please sign in to comment.