Skip to content

Commit 4397808

Browse files
ccat3zJkSelf
andauthored
[Gluten-1.2] Backport facebookincubator#9025 and facebookincubator#10979 to fix the result mismatch in RowsStreamingWindowBuild (#499)
* Revert "Add RowsStreamingWindowBuild to avoid OOM in Window operator (9025)" This reverts commit f34c9b1. * Add RowsStreamingWindowBuild to avoid OOM in Window operator (facebookincubator#9025) Summary: Unlike `StreamingWindowBuild`, `RowLevelStreamingWindowBuild ` in this PR is capable of processing window functions as rows arrive within a single partition, without the need to wait for the entire partition to be ready. This approach can significantly reduce memory usage, especially when a single partition contains a large amount of data. It is particularly suited for optimizing `rank `and `row_number `functions, as well as aggregate window functions with a default frame. The detailed discussions is [here](facebookincubator#8975). The design doc is [here](https://docs.google.com/document/d/17ONSJHK8XP5Lixm8XBl01RMNl4ntpixiVFe693ahw6k/edit?usp=sharing). Pull Request resolved: facebookincubator#9025 Test Plan: Run through 10hrs fuzzer testing Reviewed By: kagamiori Differential Revision: D61473798 Pulled By: xiaoxmeng fbshipit-source-id: 569a752770395330c48a3521bd5421eb89f5623d * Fix error message * Fix the result mismatch in RowsStreamingWindowBuild (facebookincubator#10979) Summary: For a Range frame, it is necessary to ensure that the peer is ready before commencing the window function computation Pull Request resolved: facebookincubator#10979 Reviewed By: kagamiori Differential Revision: D62622816 Pulled By: xiaoxmeng fbshipit-source-id: 1a9911da416c867c9e295242a05d0f33fbc2e22d --------- Co-authored-by: Jia Ke <[email protected]>
1 parent eefc229 commit 4397808

30 files changed

+574
-310
lines changed

velox/exec/AggregateWindow.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,7 @@ void registerAggregateWindowFunction(const std::string& name) {
410410
exec::registerWindowFunction(
411411
name,
412412
std::move(signatures),
413+
{exec::WindowFunction::ProcessMode::kRows, true},
413414
[name](
414415
const std::vector<exec::WindowFunctionArg>& args,
415416
const TypePtr& resultType,
@@ -426,8 +427,7 @@ void registerAggregateWindowFunction(const std::string& name) {
426427
pool,
427428
stringAllocator,
428429
config);
429-
},
430-
{exec::ProcessedUnit::kRows, true});
430+
});
431431
}
432432
}
433433
} // namespace facebook::velox::exec

velox/exec/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ add_library(
5959
OutputBufferManager.cpp
6060
PartitionedOutput.cpp
6161
PartitionFunction.cpp
62+
PartitionStreamingWindowBuild.cpp
6263
PlanNodeStats.cpp
6364
PrefixSort.cpp
6465
ProbeOperatorState.cpp
@@ -72,7 +73,6 @@ add_library(
7273
SpillFile.cpp
7374
Spiller.cpp
7475
StreamingAggregation.cpp
75-
StreamingWindowBuild.cpp
7676
Strings.cpp
7777
TableScan.cpp
7878
TableWriteMerge.cpp

velox/exec/StreamingWindowBuild.cpp renamed to velox/exec/PartitionStreamingWindowBuild.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,24 @@
1414
* limitations under the License.
1515
*/
1616

17-
#include "velox/exec/StreamingWindowBuild.h"
17+
#include "velox/exec/PartitionStreamingWindowBuild.h"
1818

1919
namespace facebook::velox::exec {
2020

21-
StreamingWindowBuild::StreamingWindowBuild(
21+
PartitionStreamingWindowBuild::PartitionStreamingWindowBuild(
2222
const std::shared_ptr<const core::WindowNode>& windowNode,
2323
velox::memory::MemoryPool* pool,
2424
const common::SpillConfig* spillConfig,
2525
tsan_atomic<bool>* nonReclaimableSection)
2626
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {}
2727

28-
void StreamingWindowBuild::buildNextPartition() {
28+
void PartitionStreamingWindowBuild::buildNextPartition() {
2929
partitionStartRows_.push_back(sortedRows_.size());
3030
sortedRows_.insert(sortedRows_.end(), inputRows_.begin(), inputRows_.end());
3131
inputRows_.clear();
3232
}
3333

34-
void StreamingWindowBuild::addInput(RowVectorPtr input) {
34+
void PartitionStreamingWindowBuild::addInput(RowVectorPtr input) {
3535
for (auto i = 0; i < inputChannels_.size(); ++i) {
3636
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
3737
}
@@ -53,14 +53,15 @@ void StreamingWindowBuild::addInput(RowVectorPtr input) {
5353
}
5454
}
5555

56-
void StreamingWindowBuild::noMoreInput() {
56+
void PartitionStreamingWindowBuild::noMoreInput() {
5757
buildNextPartition();
5858

5959
// Help for last partition related calculations.
6060
partitionStartRows_.push_back(sortedRows_.size());
6161
}
6262

63-
std::shared_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
63+
std::shared_ptr<WindowPartition>
64+
PartitionStreamingWindowBuild::nextPartition() {
6465
VELOX_CHECK_GT(
6566
partitionStartRows_.size(), 0, "No window partitions available")
6667

@@ -93,7 +94,7 @@ std::shared_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
9394
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
9495
}
9596

96-
bool StreamingWindowBuild::hasNextPartition() {
97+
bool PartitionStreamingWindowBuild::hasNextPartition() {
9798
return partitionStartRows_.size() > 0 &&
9899
currentPartition_ < int(partitionStartRows_.size() - 2);
99100
}

velox/exec/StreamingWindowBuild.h renamed to velox/exec/PartitionStreamingWindowBuild.h

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020

2121
namespace facebook::velox::exec {
2222

23-
/// The StreamingWindowBuild is used when the input data is already sorted by
24-
/// {partition keys + order by keys}. The logic identifies partition changes
25-
/// when receiving input rows and splits out WindowPartitions for the Window
26-
/// operator to process.
27-
class StreamingWindowBuild : public WindowBuild {
23+
/// The PartitionStreamingWindowBuild is used when the input data is already
24+
/// sorted by {partition keys + order by keys}. The logic identifies partition
25+
/// changes when receiving input rows and splits out WindowPartitions for the
26+
/// Window operator to process.
27+
class PartitionStreamingWindowBuild : public WindowBuild {
2828
public:
29-
StreamingWindowBuild(
29+
PartitionStreamingWindowBuild(
3030
const std::shared_ptr<const core::WindowNode>& windowNode,
3131
velox::memory::MemoryPool* pool,
3232
const common::SpillConfig* spillConfig,
@@ -55,10 +55,6 @@ class StreamingWindowBuild : public WindowBuild {
5555
currentPartition_ == partitionStartRows_.size() - 2;
5656
}
5757

58-
std::string_view windowBuildType() const override {
59-
return "StreamingWindowBuild";
60-
}
61-
6258
private:
6359
void buildNextPartition();
6460

velox/exec/RowsStreamingWindowBuild.cpp

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,54 @@
1616

1717
#include "velox/exec/RowsStreamingWindowBuild.h"
1818
#include "velox/common/testutil/TestValue.h"
19+
#include "velox/exec/WindowFunction.h"
1920

2021
namespace facebook::velox::exec {
2122

23+
namespace {
24+
bool hasRangeFrame(const std::shared_ptr<const core::WindowNode>& windowNode) {
25+
for (const auto& function : windowNode->windowFunctions()) {
26+
if (function.frame.type == core::WindowNode::WindowType::kRange) {
27+
return true;
28+
}
29+
}
30+
return false;
31+
}
32+
} // namespace
33+
2234
RowsStreamingWindowBuild::RowsStreamingWindowBuild(
2335
const std::shared_ptr<const core::WindowNode>& windowNode,
2436
velox::memory::MemoryPool* pool,
2537
const common::SpillConfig* spillConfig,
2638
tsan_atomic<bool>* nonReclaimableSection)
27-
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {}
39+
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection),
40+
hasRangeFrame_(hasRangeFrame(windowNode)) {
41+
velox::common::testutil::TestValue::adjust(
42+
"facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild",
43+
this);
44+
}
45+
46+
void RowsStreamingWindowBuild::addPartitionInputs(bool finished) {
47+
if (inputRows_.empty()) {
48+
return;
49+
}
2850

29-
void RowsStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) {
3051
if (windowPartitions_.size() <= inputPartition_) {
3152
windowPartitions_.push_back(std::make_shared<WindowPartition>(
3253
data_.get(), inversedInputChannels_, sortKeyInfo_));
3354
}
3455

3556
windowPartitions_[inputPartition_]->addRows(inputRows_);
3657

37-
if (isFinished) {
58+
if (finished) {
3859
windowPartitions_[inputPartition_]->setComplete();
39-
inputPartition_++;
60+
++inputPartition_;
4061
}
4162

4263
inputRows_.clear();
4364
}
4465

4566
void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
46-
velox::common::testutil::TestValue::adjust(
47-
"facebook::velox::exec::RowsStreamingWindowBuild::addInput", this);
4867
for (auto i = 0; i < inputChannels_.size(); ++i) {
4968
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
5069
}
@@ -58,11 +77,18 @@ void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
5877

5978
if (previousRow_ != nullptr &&
6079
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) {
61-
buildNextInputOrPartition(true);
80+
addPartitionInputs(true);
6281
}
6382

6483
if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_) {
65-
buildNextInputOrPartition(false);
84+
// Needs to wait the peer group ready for range frame.
85+
if (hasRangeFrame_) {
86+
if (compareRowsWithKeys(previousRow_, newRow, sortKeyInfo_)) {
87+
addPartitionInputs(false);
88+
}
89+
} else {
90+
addPartitionInputs(false);
91+
}
6692
}
6793

6894
inputRows_.push_back(newRow);
@@ -71,18 +97,17 @@ void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
7197
}
7298

7399
void RowsStreamingWindowBuild::noMoreInput() {
74-
buildNextInputOrPartition(true);
100+
addPartitionInputs(true);
75101
}
76102

77103
std::shared_ptr<WindowPartition> RowsStreamingWindowBuild::nextPartition() {
78-
// The previous partition has already been set to nullptr by the
79-
// Window.cpp#callResetPartition() method.
104+
VELOX_CHECK(hasNextPartition());
80105
return windowPartitions_[++outputPartition_];
81106
}
82107

83108
bool RowsStreamingWindowBuild::hasNextPartition() {
84-
return windowPartitions_.size() > 0 &&
85-
outputPartition_ <= int(windowPartitions_.size() - 2);
109+
return !windowPartitions_.empty() &&
110+
outputPartition_ + 2 <= windowPartitions_.size();
86111
}
87112

88113
} // namespace facebook::velox::exec

velox/exec/RowsStreamingWindowBuild.h

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020

2121
namespace facebook::velox::exec {
2222

23-
/// Unlike StreamingWindowBuild, RowsStreamingWindowBuild is capable of
23+
/// Unlike PartitionStreamingWindowBuild, RowsStreamingWindowBuild is capable of
2424
/// processing window functions as rows arrive within a single partition,
25-
/// without the need to wait for the entire partition to be ready. This approach
26-
/// can significantly reduce memory usage, especially when a single partition
27-
/// contains a large amount of data. It is particularly suited for optimizing
28-
/// rank and row_number functions, as well as aggregate window functions with a
29-
/// default frame.
25+
/// without the need to wait for the entirewindow partition to be ready. This
26+
/// approach can significantly reduce memory usage, especially when a single
27+
/// partition contains a large amount of data. It is particularly suited for
28+
/// optimizing rank, dense_rank and row_number functions, as well as aggregate
29+
/// window functions with a default frame.
3030
class RowsStreamingWindowBuild : public WindowBuild {
3131
public:
3232
RowsStreamingWindowBuild(
@@ -54,30 +54,31 @@ class RowsStreamingWindowBuild : public WindowBuild {
5454
bool needsInput() override {
5555
// No partitions are available or the currentPartition is the last available
5656
// one, so can consume input rows.
57-
return windowPartitions_.size() == 0 ||
57+
return windowPartitions_.empty() ||
5858
outputPartition_ == windowPartitions_.size() - 1;
5959
}
6060

61-
std::string_view windowBuildType() const override {
62-
return "RowsStreamingWindowBuild";
63-
}
64-
6561
private:
66-
void buildNextInputOrPartition(bool isFinished);
62+
// Adds input rows to the current partition, or creates a new partition if it
63+
// does not exist.
64+
void addPartitionInputs(bool finished);
65+
66+
// Sets to true if this window node has range frames.
67+
const bool hasRangeFrame_;
6768

68-
// Holds input rows within the current partition.
69+
// Points to the input rows in the current partition.
6970
std::vector<char*> inputRows_;
7071

7172
// Used to compare rows based on partitionKeys.
7273
char* previousRow_ = nullptr;
7374

74-
// Current partition being output. Used to return the WidnowPartitions.
75+
// Point to the current output partition if not -1.
7576
vector_size_t outputPartition_ = -1;
7677

77-
// Current partition when adding input. Used to construct WindowPartitions.
78+
// Current input partition that receives inputs.
7879
vector_size_t inputPartition_ = 0;
7980

80-
// Holds all the WindowPartitions.
81+
// Holds all the built window partitions.
8182
std::vector<std::shared_ptr<WindowPartition>> windowPartitions_;
8283
};
8384

velox/exec/SortWindowBuild.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ class SortWindowBuild : public WindowBuild {
5555

5656
std::shared_ptr<WindowPartition> nextPartition() override;
5757

58-
std::string_view windowBuildType() const override {
59-
return "SortWindowBuild";
60-
}
61-
6258
private:
6359
void ensureInputFits(const RowVectorPtr& input);
6460

0 commit comments

Comments
 (0)