Skip to content

Commit eefc229

Browse files
Release excessively reserved memory in HashBuild even if non-reclaimable (facebookincubator#10782) (#496)
Summary: When hash build is under table building stage, we reserve excessive amount of memory to account for the worst case scenario duplicate rows for NextRowVector (i.e. we assume every row in build table has duplicates, which in most cases is not true). Other than let the query fail because the current stage is unreclaimable, we can perform a desperate try to release the unused reserved memory, giving the query a chance to succeed. Pull Request resolved: facebookincubator#10782 Reviewed By: xiaoxmeng Differential Revision: D61510068 Pulled By: tanjialiang fbshipit-source-id: 1d62804d22ab11d08080e7cf872da0656cbd1010 (cherry picked from commit 55888da) Co-authored-by: Jialiang Tan <[email protected]>
1 parent 9e22c2e commit eefc229

File tree

3 files changed

+32
-11
lines changed

3 files changed

+32
-11
lines changed

velox/exec/HashBuild.cpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,10 +1056,24 @@ void HashBuild::reclaim(
10561056
VELOX_CHECK(canReclaim());
10571057
auto* driver = operatorCtx_->driver();
10581058
VELOX_CHECK_NOT_NULL(driver);
1059-
VELOX_CHECK(!nonReclaimableSection_);
10601059

10611060
TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this);
10621061

1062+
const auto& task = driver->task();
1063+
const std::vector<Operator*> operators =
1064+
task->findPeerOperators(operatorCtx_->driverCtx()->pipelineId, this);
1065+
// Worst case scenario reservation was performed in ensureTableFits() when
1066+
// accounting for NextRowVector for duplicated rows, i.e. we assume every
1067+
// single row has duplicates. That is normally not the case. So when the
1068+
// query is under memory pressure, the excessive (in most cases) reservations
1069+
// can be returned.
1070+
for (auto i = 0; i <= operators.size(); i++) {
1071+
auto* memoryPool = i == 0 ? pool() : operators[i - 1]->pool();
1072+
const auto oldReservedBytes = memoryPool->reservedBytes();
1073+
memoryPool->release();
1074+
stats.reclaimedBytes += (oldReservedBytes - memoryPool->reservedBytes());
1075+
}
1076+
10631077
if (exceededMaxSpillLevelLimit_) {
10641078
return;
10651079
}
@@ -1082,10 +1096,7 @@ void HashBuild::reclaim(
10821096
return;
10831097
}
10841098

1085-
const auto& task = driver->task();
10861099
VELOX_CHECK(task->pauseRequested());
1087-
const std::vector<Operator*> operators =
1088-
task->findPeerOperators(operatorCtx_->driverCtx()->pipelineId, this);
10891100
for (auto* op : operators) {
10901101
HashBuild* buildOp = dynamic_cast<HashBuild*>(op);
10911102
VELOX_CHECK_NOT_NULL(buildOp);

velox/exec/Operator.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,8 +636,11 @@ uint64_t Operator::MemoryReclaimer::reclaim(
636636
"facebook::velox::exec::Operator::MemoryReclaimer::reclaim", pool);
637637

638638
// NOTE: we can't reclaim memory from an operator which is under
639+
// non-reclaimable section, except for HashBuild operator. If it is HashBuild
640+
// operator, we allow it to enter HashBuild::reclaim because there is a good
641+
// chance we can release some unused reserved memory even if it's in
639642
// non-reclaimable section.
640-
if (op_->nonReclaimableSection_) {
643+
if (op_->nonReclaimableSection_ && op_->operatorType() != "HashBuild") {
641644
// TODO: reduce the log frequency if it is too verbose.
642645
++stats.numNonReclaimableAttempts;
643646
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);

velox/exec/tests/HashJoinTest.cpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5925,22 +5925,29 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringAllocation) {
59255925
ASSERT_EQ(reclaimable, enableSpilling);
59265926
if (enableSpilling) {
59275927
ASSERT_GE(reclaimableBytes, 0);
5928+
op->reclaim(
5929+
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
5930+
reclaimerStats_);
59285931
} else {
59295932
ASSERT_EQ(reclaimableBytes, 0);
5933+
VELOX_ASSERT_THROW(
5934+
op->reclaim(
5935+
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
5936+
reclaimerStats_),
5937+
"");
59305938
}
5931-
VELOX_ASSERT_THROW(
5932-
op->reclaim(
5933-
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
5934-
reclaimerStats_),
5935-
"");
59365939

59375940
driverWait.notify();
59385941
Task::resume(task);
59395942
task.reset();
59405943

59415944
taskThread.join();
5945+
if (enableSpilling) {
5946+
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
5947+
} else {
5948+
ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{0});
5949+
}
59425950
}
5943-
ASSERT_EQ(reclaimerStats_, memory::MemoryReclaimer::Stats{0});
59445951
}
59455952

59465953
DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringOutputProcessing) {

0 commit comments

Comments
 (0)