Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
* <p>
* topN - child topN
* If child topN orderby list is prefix subList of topN =>
* topN with limit = min(topN.limit, childTopN.limit), offset = topN.offset + childTopN.offset
* topN with limit = min(topN.limit, max(childTopN.limit - topN.offset, 0)),
* offset = topN.offset + childTopN.offset
*/
public class MergeTopNs extends OneRewriteRuleFactory {
@Override
Expand All @@ -48,8 +49,12 @@ public Rule build() {
long childOffset = childTopN.getOffset();
long childLimit = childTopN.getLimit();
long newOffset = offset + childOffset;
// choose min limit
long newLimit = Math.min(limit, childLimit);
// The parent's offset is applied on top of the child's output, so only
// (childLimit - offset) of the child's rows survive. Clamp the merged limit
// by that remaining count; otherwise rows beyond the child's limit leak through
// when the parent has a non-zero offset (DORIS-26301). Same semantics as
// MergeLimits.mergeLimit for consecutive limits.
long newLimit = Math.min(limit, Math.max(childLimit - offset, 0));
return topN.withLimitChild(newLimit, newOffset, childTopN.child());
}).toRule(RuleType.MERGE_TOP_N);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,25 @@ void testOffset() {
PlanChecker.from(MemoTestUtils.createConnectContext(), plan)
.applyTopDown(new MergeTopNs())
.matches(
logicalTopN(logicalOlapScan()).when(topN -> topN.getLimit() == 10 && topN.getOffset() == 4)
logicalTopN(logicalOlapScan()).when(topN -> topN.getLimit() == 9 && topN.getOffset() == 4)
);
}

@Test
void testParentOffsetReducesChildLimit() {
// DORIS-26301: when the parent (upper) TopN has a non-zero offset, the merged limit must be
// clamped by (childLimit - parentOffset); otherwise rows beyond the child's limit leak through.
// child : ORDER BY k LIMIT 5 (limit=5, offset=0) -> yields 5 rows
// parent: ORDER BY k LIMIT 3 OFFSET 4 (limit=3, offset=4) -> skips 4, only 1 row remains
// merged: offset = 4, limit = min(3, max(5 - 4, 0)) = 1
LogicalPlan plan = new LogicalPlanBuilder(score)
.topN(5, 0, ImmutableList.of(0))
.topN(3, 4, ImmutableList.of(0))
.build();
PlanChecker.from(MemoTestUtils.createConnectContext(), plan)
.applyTopDown(new MergeTopNs())
.matches(
logicalTopN(logicalOlapScan()).when(topN -> topN.getLimit() == 1 && topN.getOffset() == 4)
);
}

Expand Down
10 changes: 10 additions & 0 deletions regression-test/data/correctness_p0/test_merge_topn_offset.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !merge_topn_off4lim3 --
5 50

-- !merge_topn_off2lim3 --
3 30
4 40
5 50

-- !merge_topn_off6lim3 --
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_merge_topn_offset") {
// DORIS-26301: when MERGE_TOP_N merges a parent TopN that carries a non-zero OFFSET into its
// child TopN, the merged limit must be clamped by (childLimit - parentOffset). Otherwise rows
// beyond the child's limit leak through. Before the fix, an outer "LIMIT 3 OFFSET 4" over an
// inner "LIMIT 5" wrongly returned 3 rows (k=5,6,7) instead of the single correct row (k=5).
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"

def tableName = "test_merge_topn_offset"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
k INT NOT NULL,
v INT NOT NULL
)
DUPLICATE KEY(k)
DISTRIBUTED BY HASH(k) BUCKETS 1
PROPERTIES ("replication_num" = "1")
"""
sql """ INSERT INTO ${tableName} VALUES
(1, 10), (2, 20), (3, 30), (4, 40), (5, 50),
(6, 60), (7, 70), (8, 80), (9, 90), (10, 100) """

// MERGE_TOP_N is enabled by default; stacking an outer TopN with an OFFSET over an inner TopN
// triggers it. The inner "ORDER BY k LIMIT 5" yields k = 1..5.

// outer offset (4) consumes 4 of the inner's 5 rows -> only k=5 remains
qt_merge_topn_off4lim3 """ SELECT * FROM (SELECT k, v FROM ${tableName} ORDER BY k LIMIT 5) s ORDER BY k LIMIT 3 OFFSET 4 """
// outer offset (2) is within the inner limit -> k = 3,4,5
qt_merge_topn_off2lim3 """ SELECT * FROM (SELECT k, v FROM ${tableName} ORDER BY k LIMIT 5) s ORDER BY k LIMIT 3 OFFSET 2 """
// outer offset (6) exceeds the inner limit (5) -> empty
qt_merge_topn_off6lim3 """ SELECT * FROM (SELECT k, v FROM ${tableName} ORDER BY k LIMIT 5) s ORDER BY k LIMIT 3 OFFSET 6 """
}