From d3f75947072f69cf1194320cc3f8287890fc1cd2 Mon Sep 17 00:00:00 2001 From: guoqiang Date: Wed, 24 Jun 2026 11:20:52 +0800 Subject: [PATCH] [fix](nereids) partition topn opt requires chosen window func partition key to be a subset of co-located ones When a single LogicalWindow holds multiple window functions, a filter such as `rn <= k` may be turned into a partitionTopN and pushed below the whole window node. The generated partitionTopN keeps the per-partition top-k of the chosen window function and thus prunes the input rows shared by ALL co-located window functions. This is only correct when the chosen window function's partition key is a SUBSET of every other co-located window function's partition key (i.e. the chosen one is coarser). Then any row that could change another window's value for a surviving row lies in the same chosen-partition with an order value not greater than the surviving row's, so its chosen-rank is within top-k and it is kept. When this does not hold the pruning drops rows the other windows still need and produces a wrong result, e.g. row_number() over (partition by g1 order by c) as rn1, -- chosen row_number() over (partition by g2 order by c) as rn2 -- independent row_number() over (partition by c1, c2 order by c) as rn, -- chosen (finer) rank() over (partition by c1 order by c) as rk -- coarser getPushDownWindowFuncAndLimit() previously only required the order keys of all co-located window functions to be compatible (#56622). It now also requires the partition keys to satisfy the above subset relation, otherwise the optimization is disabled. --- .../trees/plans/logical/LogicalWindow.java | 17 +- .../GeneratePartitionTopnFromWindowTest.java | 107 ++++++++++++ .../partition_topn/check_partitionkey.out | 19 +++ ...sh_down_multi_filter_through_window.groovy | 57 +++---- .../partition_topn/check_partitionkey.groovy | 160 ++++++++++++++++++ 5 files changed, 329 insertions(+), 31 deletions(-) create mode 100644 regression-test/data/query_p0/partition_topn/check_partitionkey.out create mode 100644 regression-test/suites/query_p0/partition_topn/check_partitionkey.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java index 7afb6d404bacf2..9df6effed89d27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java @@ -325,11 +325,26 @@ && child(0).child(0) instanceof LogicalPartitionTopN)) { && chosenRowNumberPartitionLimit == Long.MAX_VALUE)) { return null; } else { - // 4. check all windowExpression's order key is empty or is the same as chosenWindowFunc's order key + // 4. check all windowExpression's partition key and order key are compatible with chosenWindowFunc. + // The generated partitionTopN is pushed below the whole window node, so it prunes the input rows + // (keeping per-partition top-k of the chosen function) of every co-located window function. + // For another window function W to stay correct, every row that could affect W's value for a + // surviving row must also survive the pruning. This holds iff the chosen partition key is a + // SUBSET of W's partition key (chosen is coarser): then any row in W's partition with a smaller + // order value is also in the same chosen-partition with an order value <= the surviving row's, + // so its chosen-rank is within top-k and it is kept. The order key must also be compatible + // (empty or identical). Otherwise we must disable the optimization, e.g. + // 'row_number() over (partition by a order by c)' as rn together with + // 'row_number() over (partition by b order by c)' as rk, filter on rn + // (independent partitions), or a chosen partition (a, b) finer than a co-located 'partition by a' + // would prune rows the latter still needs and produce a wrong result. for (NamedExpression windowExpr : windowExpressions) { if (windowExpr != null && windowExpr instanceof Alias && windowExpr.child(0) instanceof WindowExpression) { WindowExpression windowFunc = (WindowExpression) windowExpr.child(0); + if (!windowFunc.getPartitionKeys().containsAll(chosenWindowFunc.getPartitionKeys())) { + return null; + } if (windowFunc.getOrderKeys().isEmpty() || windowFunc.getOrderKeys().equals(chosenWindowFunc.getOrderKeys())) { continue; diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/GeneratePartitionTopnFromWindowTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/GeneratePartitionTopnFromWindowTest.java index 00d47bb8b78839..9c3993d4d2fae8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/GeneratePartitionTopnFromWindowTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/GeneratePartitionTopnFromWindowTest.java @@ -157,4 +157,111 @@ public void testMultipleWindowsWithDifferentOrders() { logicalOlapScan() )))); } + + @Test + public void testMultipleWindowsWithDifferentPartitions() { + ConnectContext context = MemoTestUtils.createConnectContext(); + context.getSessionVariable().setEnablePartitionTopN(true); + NamedExpression gender = scan.getOutput().get(1).toSlot(); + NamedExpression name = scan.getOutput().get(2).toSlot(); + NamedExpression age = scan.getOutput().get(3).toSlot(); + + List orderKeyList = ImmutableList.of(new OrderExpression( + new OrderKey(age, true, true))); + WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.ROWS, + WindowFrame.FrameBoundary.newPrecedingBoundary(), + WindowFrame.FrameBoundary.newCurrentRowBoundary()); + + // window1: partition by gender, order by age + WindowExpression window1 = new WindowExpression(new RowNumber(), + ImmutableList.of(gender), orderKeyList, windowFrame); + Alias windowAlias1 = new Alias(window1, window1.toSql()); + + // window2: same order key (age) but different partition key (name) + WindowExpression window2 = new WindowExpression(new RowNumber(), + ImmutableList.of(name), orderKeyList, windowFrame); + Alias windowAlias2 = new Alias(window2, window2.toSql()); + + List expressions = Lists.newArrayList(windowAlias1, windowAlias2); + LogicalWindow window = new LogicalWindow<>(expressions, scan); + // filter on window1 (row_number partitioned by gender) + Expression filterPredicate = new LessThanEqual(window.getOutput().get(4).toSlot(), Literal.of(1)); + + LogicalPlan plan = new LogicalPlanBuilder(window) + .filter(filterPredicate) + .project(ImmutableList.of(0)) + .build(); + + // The optimization must be disabled: pushing a partitionTopN partitioned by gender below the whole + // window would prune the input rows of window2 (partitioned by name) and corrupt its result. + PlanChecker.from(context, plan) + .applyTopDown(new CreatePartitionTopNFromWindow()) + .matches( + logicalProject( + logicalFilter( + logicalWindow( + logicalOlapScan() + )))); + } + + @Test + public void testMultipleWindowsSubsetPartitionGeneratesTopn() { + ConnectContext context = MemoTestUtils.createConnectContext(); + context.getSessionVariable().setEnablePartitionTopN(true); + NamedExpression gender = scan.getOutput().get(1).toSlot(); + NamedExpression name = scan.getOutput().get(2).toSlot(); + NamedExpression age = scan.getOutput().get(3).toSlot(); + + List partitionKeyList = ImmutableList.of(gender); + List orderKeyList = ImmutableList.of(new OrderExpression( + new OrderKey(age, true, true))); + WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.ROWS, + WindowFrame.FrameBoundary.newPrecedingBoundary(), + WindowFrame.FrameBoundary.newCurrentRowBoundary()); + + // window1 (chosen via filter): partition by gender, the coarser key + WindowExpression window1 = new WindowExpression(new RowNumber(), + partitionKeyList, orderKeyList, windowFrame); + Alias windowAlias1 = new Alias(window1, window1.toSql()); + + // window2: partition by (gender, name) -- a SUPERSET of window1's partition key + WindowExpression window2 = new WindowExpression(new RowNumber(), + ImmutableList.of(gender, name), orderKeyList, windowFrame); + Alias windowAlias2 = new Alias(window2, window2.toSql()); + + List expressions = Lists.newArrayList(windowAlias1, windowAlias2); + LogicalWindow window = new LogicalWindow<>(expressions, scan); + // filter on window1 (the coarser, chosen window) + Expression filterPredicate = new LessThanEqual(window.getOutput().get(4).toSlot(), Literal.of(100)); + + LogicalPlan plan = new LogicalPlanBuilder(window) + .filter(filterPredicate) + .project(ImmutableList.of(0)) + .build(); + + // The chosen partition key (gender) is a subset of the other window's + // (gender, name), so pruning per gender cannot corrupt window2 -> the + // partition topn is still generated. + PlanChecker.from(context, plan) + .applyTopDown(new CreatePartitionTopNFromWindow()) + .matches( + logicalProject( + logicalFilter( + logicalWindow( + logicalPartitionTopN( + logicalOlapScan() + ).when(logicalPartitionTopN -> { + WindowFuncType funName = logicalPartitionTopN.getFunction(); + List partitionKeys = logicalPartitionTopN.getPartitionKeys(); + boolean hasGlobalLimit = logicalPartitionTopN.hasGlobalLimit(); + long partitionLimit = logicalPartitionTopN.getPartitionLimit(); + return funName == WindowFuncType.ROW_NUMBER + && partitionKeys.equals(partitionKeyList) + && !hasGlobalLimit && partitionLimit == 100; + }) + ) + ).when(filter -> filter.getConjuncts().equals(ImmutableSet.of(filterPredicate))) + ) + ); + } } diff --git a/regression-test/data/query_p0/partition_topn/check_partitionkey.out b/regression-test/data/query_p0/partition_topn/check_partitionkey.out new file mode 100644 index 00000000000000..c33a0912cc4fdc --- /dev/null +++ b/regression-test/data/query_p0/partition_topn/check_partitionkey.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !multi_window -- +1 A X 1 1 1 +4 B X 4 1 3 +7 C X 7 1 4 + +-- !subset_safe_2window -- +1 A X 1 1 1 +2 A X 2 2 2 +4 B X 4 1 1 +5 B Y 5 2 1 +7 C X 7 1 1 +8 C Z 8 2 1 + +-- !subset_safe_3window -- +1 A X 1 1 1 1 +4 B X 4 1 1 1 +7 C X 7 1 1 1 + diff --git a/regression-test/suites/nereids_rules_p0/push_down_filter_through_window/push_down_multi_filter_through_window.groovy b/regression-test/suites/nereids_rules_p0/push_down_filter_through_window/push_down_multi_filter_through_window.groovy index 015a6e7fae7d47..607391bab0b862 100644 --- a/regression-test/suites/nereids_rules_p0/push_down_filter_through_window/push_down_multi_filter_through_window.groovy +++ b/regression-test/suites/nereids_rules_p0/push_down_filter_through_window/push_down_multi_filter_through_window.groovy @@ -48,18 +48,19 @@ suite("push_down_multi_filter_through_window") { notContains "VPartitionTopN" } + // The chosen window row_number(partition by c1, c2) is NOT a subset of the + // co-located rank(partition by c1), so pushing its partition topn below the + // whole window would prune rows that rk still needs -> optimization disabled. explain { sql ("select * from (select row_number() over(partition by c1, c2 order by c3) as rn, rank() over(partition by c1 order by c3) as rk from push_down_multi_predicate_through_window_t) t where rn <= 1 and rk <= 1;") - contains "VPartitionTopN" - contains "functions: row_number" - contains "partition limit: 1" + notContains "VPartitionTopN" } + // row_number(partition by c1, c2) preferred as chosen, but it is not a subset + // of rank(partition by c1) -> disabled. explain { sql ("select * from (select rank() over(partition by c1 order by c3) as rk, row_number() over(partition by c1, c2 order by c3) as rn from push_down_multi_predicate_through_window_t) t where rn <= 10 and rk <= 1;") - contains "VPartitionTopN" - contains "functions: row_number" - contains "partition limit: 10" + notContains "VPartitionTopN" } explain { @@ -69,18 +70,16 @@ suite("push_down_multi_filter_through_window") { contains "partition limit: 1" } + // chosen row_number(partition by c1, c2) is not a subset of rank(partition by c1) -> disabled. explain { sql ("select * from (select rank() over(partition by c1 order by c3) as rk, row_number() over(partition by c1, c2 order by c3) as rn from push_down_multi_predicate_through_window_t) t where rn <= 10;") - contains "VPartitionTopN" - contains "functions: row_number" - contains "partition limit: 10" + notContains "VPartitionTopN" } + // chosen rank(partition by c1, c2) (min limit) is not a subset of rank(partition by c1) -> disabled. explain { sql ("select * from (select rank() over(partition by c1 order by c3) as rk, rank() over(partition by c1, c2 order by c3) as rn from push_down_multi_predicate_through_window_t) t where rn <= 1 and rk <= 10;") - contains "VPartitionTopN" - contains "functions: rank" - contains "partition limit: 1" + notContains "VPartitionTopN" } explain { @@ -97,18 +96,16 @@ suite("push_down_multi_filter_through_window") { contains "partition limit: 1" } + // chosen rank(partition by c1, c2) is not a subset of rank(partition by c1) -> disabled. explain { sql ("select * from (select rank() over(partition by c1 order by c3) as rk, rank() over(partition by c1, c2 order by c3) as rn from push_down_multi_predicate_through_window_t) t where rn <= 1 and rk > 1;") - contains "VPartitionTopN" - contains "functions: rank" - contains "partition limit: 1" + notContains "VPartitionTopN" } + // chosen row_number(partition by c1, c2) is not a subset of rank(partition by c1) -> disabled. explain { sql ("select * from (select row_number() over(partition by c1, c2 order by c3) as rn, rank() over(partition by c1 order by c3) as rk from push_down_multi_predicate_through_window_t) t limit 10;") - contains "VPartitionTopN" - contains "functions: row_number" - contains "partition limit: 10" + notContains "VPartitionTopN" } explain { @@ -125,32 +122,32 @@ suite("push_down_multi_filter_through_window") { contains "partition limit: 1" } + // chosen row_number(partition by c1, c2) (min limit) is not a subset of + // row_number(partition by c1) -> disabled. explain { sql ("select * from (select row_number() over(partition by c1, c2 order by c3) as rn, row_number() over(partition by c1 order by c3) as rk from push_down_multi_predicate_through_window_t) t where rn <= 1 and rk <= 10;") - contains "VPartitionTopN" - contains "functions: row_number" - contains "partition limit: 1" + notContains "VPartitionTopN" } + // 3 windows: chosen row_number(partition by c1, c2) is not a subset of + // rank(partition by c1) nor rank(partition by c2) -> disabled. explain { sql ("select * from (select row_number() over(partition by c1, c2 order by c3) as rn, rank() over(partition by c1 order by c3) as rk1, rank() over(partition by c2 order by c3) as rk2 from push_down_multi_predicate_through_window_t) t where rn <= 1 and rk1 <= 10 and rk2 <= 100;") - contains "VPartitionTopN" - contains "functions: row_number" - contains "partition limit: 1" + notContains "VPartitionTopN" } + // 3 windows: chosen row_number(partition by c2) (min limit) is not a subset + // of row_number(partition by c1) -> disabled. explain { sql ("select * from (select row_number() over(partition by c1 order by c3) as rn1, row_number() over(partition by c2 order by c3) as rn2, rank() over(partition by c1, c2 order by c3) as rk from push_down_multi_predicate_through_window_t) t where rn1 <= 10 and rn2 <= 1 and rk <= 100;") - contains "VPartitionTopN" - contains "functions: row_number" - contains "partition limit: 1" + notContains "VPartitionTopN" } + // 3 windows: chosen row_number(partition by c1) (min limit) is not a subset + // of row_number(partition by c2) -> disabled. explain { sql ("select * from (select rank() over(partition by c1, c2 order by c3) as rk, row_number() over(partition by c1 order by c3) as rn1, row_number() over(partition by c2 order by c3) as rn2 from push_down_multi_predicate_through_window_t) t where rn1 <= 1 and rn2 <= 10 and rk <= 100;") - contains "VPartitionTopN" - contains "functions: row_number" - contains "partition limit: 1" + notContains "VPartitionTopN" } explain { diff --git a/regression-test/suites/query_p0/partition_topn/check_partitionkey.groovy b/regression-test/suites/query_p0/partition_topn/check_partitionkey.groovy new file mode 100644 index 00000000000000..14457badd2c128 --- /dev/null +++ b/regression-test/suites/query_p0/partition_topn/check_partitionkey.groovy @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("check_partitionkey") { + sql "drop table if exists multi_window_cases" + sql """ + create table multi_window_cases ( + id int, + g1 varchar(8), + g2 varchar(8), + ord_key int, + amt int + ) properties ('replication_num'='1'); + + insert into multi_window_cases values + (1,'A','X',1,10), + (2,'A','X',2,20), + (3,'A','Y',3,30), + (4,'B','X',4,40), + (5,'B','Y',5,50), + (6,'B','Y',6,60), + (7,'C','X',7,70), + (8,'C','Z',8,80); + """ + + // Two row_number() with the SAME order key (ord_key) but DIFFERENT partition keys + // (g1 vs g2). The filter is on rn1 (partition by g1). The partition topn generated + // from rn1 would be pushed below the whole window node and prune the input rows of + // rn2 (partition by g2), corrupting rn2. So partition topn must NOT be applied here. + explain { + sql """ + select id, g1, g2, ord_key, rn1, rn2 + from ( + select + id, g1, g2, ord_key, + row_number() over (partition by g1 order by ord_key) as rn1, + row_number() over (partition by g2 order by ord_key) as rn2 + from multi_window_cases + ) q + where rn1 <= 1; + """ + notContains("VPartitionTopN") + } + + qt_multi_window """ + select id, g1, g2, ord_key, rn1, rn2 + from ( + select + id, g1, g2, ord_key, + row_number() over (partition by g1 order by ord_key) as rn1, + row_number() over (partition by g2 order by ord_key) as rn2 + from multi_window_cases + ) q + where rn1 <= 1 + order by id; + """ + + // --------------------------------------------------------------------- + // The optimization is STILL valid when the chosen window's partition key + // is a SUBSET of every other co-located window's partition key (i.e. the + // chosen one is coarser). Pruning per chosen-partition top-k then only + // removes rows the finer windows do not need, so partition topn MUST still + // fire and the results must stay correct. + + // 2 windows: the chosen rank(partition by g1) is coarser than the + // row_number(partition by g1, g2). The filter is on the coarser window, so + // the partition topn (partition by g1) is safe to push down -> it fires. + explain { + sql """ + select id, g1, g2, ord_key, rk, rn + from ( + select + id, g1, g2, ord_key, + rank() over (partition by g1 order by ord_key) as rk, + row_number() over (partition by g1, g2 order by ord_key) as rn + from multi_window_cases + ) q + where rk <= 2; + """ + contains("VPartitionTopN") + } + + qt_subset_safe_2window """ + select id, g1, g2, ord_key, rk, rn + from ( + select + id, g1, g2, ord_key, + rank() over (partition by g1 order by ord_key) as rk, + row_number() over (partition by g1, g2 order by ord_key) as rn + from multi_window_cases + ) q + where rk <= 2 + order by id; + """ + + // 3 windows: the chosen rank(partition by g1) is a subset of both the + // (g1, g2) windows, so pruning by g1 cannot corrupt them -> it still fires. + explain { + sql """ + select id, g1, g2, ord_key, rk, rn, rk2 + from ( + select + id, g1, g2, ord_key, + rank() over (partition by g1 order by ord_key) as rk, + row_number() over (partition by g1, g2 order by ord_key) as rn, + rank() over (partition by g1, g2 order by ord_key) as rk2 + from multi_window_cases + ) q + where rk <= 1; + """ + contains("VPartitionTopN") + } + + qt_subset_safe_3window """ + select id, g1, g2, ord_key, rk, rn, rk2 + from ( + select + id, g1, g2, ord_key, + rank() over (partition by g1 order by ord_key) as rk, + row_number() over (partition by g1, g2 order by ord_key) as rn, + rank() over (partition by g1, g2 order by ord_key) as rk2 + from multi_window_cases + ) q + where rk <= 1 + order by id; + """ + + // 3 windows but one is partitioned by g2, which is NOT a superset of the + // chosen g1. Pruning by g1 would corrupt the g2 window, so the optimization + // must be disabled. + explain { + sql """ + select id, g1, g2, ord_key, rk, rn, rk2 + from ( + select + id, g1, g2, ord_key, + rank() over (partition by g1 order by ord_key) as rk, + row_number() over (partition by g1, g2 order by ord_key) as rn, + rank() over (partition by g2 order by ord_key) as rk2 + from multi_window_cases + ) q + where rk <= 1; + """ + notContains("VPartitionTopN") + } +}