Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,26 @@ && child(0).child(0) instanceof LogicalPartitionTopN)) {
&& chosenRowNumberPartitionLimit == Long.MAX_VALUE)) {
return null;
} else {
// 4. check all windowExpression's order key is empty or is the same as chosenWindowFunc's order key
// 4. check all windowExpression's partition key and order key are compatible with chosenWindowFunc.
// The generated partitionTopN is pushed below the whole window node, so it prunes the input rows
// (keeping per-partition top-k of the chosen function) of every co-located window function.
// For another window function W to stay correct, every row that could affect W's value for a
// surviving row must also survive the pruning. This holds iff the chosen partition key is a
// SUBSET of W's partition key (chosen is coarser): then any row in W's partition with a smaller
// order value is also in the same chosen-partition with an order value <= the surviving row's,
// so its chosen-rank is within top-k and it is kept. The order key must also be compatible
// (empty or identical). Otherwise we must disable the optimization, e.g.
// 'row_number() over (partition by a order by c)' as rn together with
// 'row_number() over (partition by b order by c)' as rk, filter on rn
// (independent partitions), or a chosen partition (a, b) finer than a co-located 'partition by a'
// would prune rows the latter still needs and produce a wrong result.
for (NamedExpression windowExpr : windowExpressions) {
if (windowExpr != null && windowExpr instanceof Alias
&& windowExpr.child(0) instanceof WindowExpression) {
WindowExpression windowFunc = (WindowExpression) windowExpr.child(0);
if (!windowFunc.getPartitionKeys().containsAll(chosenWindowFunc.getPartitionKeys())) {
return null;
}
if (windowFunc.getOrderKeys().isEmpty()
|| windowFunc.getOrderKeys().equals(chosenWindowFunc.getOrderKeys())) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,111 @@ public void testMultipleWindowsWithDifferentOrders() {
logicalOlapScan()
))));
}

@Test
public void testMultipleWindowsWithDifferentPartitions() {
ConnectContext context = MemoTestUtils.createConnectContext();
context.getSessionVariable().setEnablePartitionTopN(true);
NamedExpression gender = scan.getOutput().get(1).toSlot();
NamedExpression name = scan.getOutput().get(2).toSlot();
NamedExpression age = scan.getOutput().get(3).toSlot();

List<OrderExpression> orderKeyList = ImmutableList.of(new OrderExpression(
new OrderKey(age, true, true)));
WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.ROWS,
WindowFrame.FrameBoundary.newPrecedingBoundary(),
WindowFrame.FrameBoundary.newCurrentRowBoundary());

// window1: partition by gender, order by age
WindowExpression window1 = new WindowExpression(new RowNumber(),
ImmutableList.of(gender), orderKeyList, windowFrame);
Alias windowAlias1 = new Alias(window1, window1.toSql());

// window2: same order key (age) but different partition key (name)
WindowExpression window2 = new WindowExpression(new RowNumber(),
ImmutableList.of(name), orderKeyList, windowFrame);
Alias windowAlias2 = new Alias(window2, window2.toSql());

List<NamedExpression> expressions = Lists.newArrayList(windowAlias1, windowAlias2);
LogicalWindow<LogicalOlapScan> window = new LogicalWindow<>(expressions, scan);
// filter on window1 (row_number partitioned by gender)
Expression filterPredicate = new LessThanEqual(window.getOutput().get(4).toSlot(), Literal.of(1));

LogicalPlan plan = new LogicalPlanBuilder(window)
.filter(filterPredicate)
.project(ImmutableList.of(0))
.build();

// The optimization must be disabled: pushing a partitionTopN partitioned by gender below the whole
// window would prune the input rows of window2 (partitioned by name) and corrupt its result.
PlanChecker.from(context, plan)
.applyTopDown(new CreatePartitionTopNFromWindow())
.matches(
logicalProject(
logicalFilter(
logicalWindow(
logicalOlapScan()
))));
}

@Test
public void testMultipleWindowsSubsetPartitionGeneratesTopn() {
ConnectContext context = MemoTestUtils.createConnectContext();
context.getSessionVariable().setEnablePartitionTopN(true);
NamedExpression gender = scan.getOutput().get(1).toSlot();
NamedExpression name = scan.getOutput().get(2).toSlot();
NamedExpression age = scan.getOutput().get(3).toSlot();

List<Expression> partitionKeyList = ImmutableList.of(gender);
List<OrderExpression> orderKeyList = ImmutableList.of(new OrderExpression(
new OrderKey(age, true, true)));
WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.ROWS,
WindowFrame.FrameBoundary.newPrecedingBoundary(),
WindowFrame.FrameBoundary.newCurrentRowBoundary());

// window1 (chosen via filter): partition by gender, the coarser key
WindowExpression window1 = new WindowExpression(new RowNumber(),
partitionKeyList, orderKeyList, windowFrame);
Alias windowAlias1 = new Alias(window1, window1.toSql());

// window2: partition by (gender, name) -- a SUPERSET of window1's partition key
WindowExpression window2 = new WindowExpression(new RowNumber(),
ImmutableList.of(gender, name), orderKeyList, windowFrame);
Alias windowAlias2 = new Alias(window2, window2.toSql());

List<NamedExpression> expressions = Lists.newArrayList(windowAlias1, windowAlias2);
LogicalWindow<LogicalOlapScan> window = new LogicalWindow<>(expressions, scan);
// filter on window1 (the coarser, chosen window)
Expression filterPredicate = new LessThanEqual(window.getOutput().get(4).toSlot(), Literal.of(100));

LogicalPlan plan = new LogicalPlanBuilder(window)
.filter(filterPredicate)
.project(ImmutableList.of(0))
.build();

// The chosen partition key (gender) is a subset of the other window's
// (gender, name), so pruning per gender cannot corrupt window2 -> the
// partition topn is still generated.
PlanChecker.from(context, plan)
.applyTopDown(new CreatePartitionTopNFromWindow())
.matches(
logicalProject(
logicalFilter(
logicalWindow(
logicalPartitionTopN(
logicalOlapScan()
).when(logicalPartitionTopN -> {
WindowFuncType funName = logicalPartitionTopN.getFunction();
List<Expression> partitionKeys = logicalPartitionTopN.getPartitionKeys();
boolean hasGlobalLimit = logicalPartitionTopN.hasGlobalLimit();
long partitionLimit = logicalPartitionTopN.getPartitionLimit();
return funName == WindowFuncType.ROW_NUMBER
&& partitionKeys.equals(partitionKeyList)
&& !hasGlobalLimit && partitionLimit == 100;
})
)
).when(filter -> filter.getConjuncts().equals(ImmutableSet.of(filterPredicate)))
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !multi_window --
1 A X 1 1 1
4 B X 4 1 3
7 C X 7 1 4

-- !subset_safe_2window --
1 A X 1 1 1
2 A X 2 2 2
4 B X 4 1 1
5 B Y 5 2 1
7 C X 7 1 1
8 C Z 8 2 1

-- !subset_safe_3window --
1 A X 1 1 1 1
4 B X 4 1 1 1
7 C X 7 1 1 1

Comment thread
CalvinKirs marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,19 @@ suite("push_down_multi_filter_through_window") {
notContains "VPartitionTopN"
}

// The chosen window row_number(partition by c1, c2) is NOT a subset of the
// co-located rank(partition by c1), so pushing its partition topn below the
// whole window would prune rows that rk still needs -> optimization disabled.
explain {
sql ("select * from (select row_number() over(partition by c1, c2 order by c3) as rn, rank() over(partition by c1 order by c3) as rk from push_down_multi_predicate_through_window_t) t where rn <= 1 and rk <= 1;")
contains "VPartitionTopN"
contains "functions: row_number"
contains "partition limit: 1"
notContains "VPartitionTopN"
}

// row_number(partition by c1, c2) preferred as chosen, but it is not a subset
// of rank(partition by c1) -> disabled.
explain {
sql ("select * from (select rank() over(partition by c1 order by c3) as rk, row_number() over(partition by c1, c2 order by c3) as rn from push_down_multi_predicate_through_window_t) t where rn <= 10 and rk <= 1;")
contains "VPartitionTopN"
contains "functions: row_number"
contains "partition limit: 10"
notContains "VPartitionTopN"
}

explain {
Expand All @@ -69,18 +70,16 @@ suite("push_down_multi_filter_through_window") {
contains "partition limit: 1"
}

// chosen row_number(partition by c1, c2) is not a subset of rank(partition by c1) -> disabled.
explain {
sql ("select * from (select rank() over(partition by c1 order by c3) as rk, row_number() over(partition by c1, c2 order by c3) as rn from push_down_multi_predicate_through_window_t) t where rn <= 10;")
contains "VPartitionTopN"
contains "functions: row_number"
contains "partition limit: 10"
notContains "VPartitionTopN"
}

// chosen rank(partition by c1, c2) (min limit) is not a subset of rank(partition by c1) -> disabled.
explain {
sql ("select * from (select rank() over(partition by c1 order by c3) as rk, rank() over(partition by c1, c2 order by c3) as rn from push_down_multi_predicate_through_window_t) t where rn <= 1 and rk <= 10;")
contains "VPartitionTopN"
contains "functions: rank"
contains "partition limit: 1"
notContains "VPartitionTopN"
}

explain {
Expand All @@ -97,18 +96,16 @@ suite("push_down_multi_filter_through_window") {
contains "partition limit: 1"
}

// chosen rank(partition by c1, c2) is not a subset of rank(partition by c1) -> disabled.
explain {
sql ("select * from (select rank() over(partition by c1 order by c3) as rk, rank() over(partition by c1, c2 order by c3) as rn from push_down_multi_predicate_through_window_t) t where rn <= 1 and rk > 1;")
contains "VPartitionTopN"
contains "functions: rank"
contains "partition limit: 1"
notContains "VPartitionTopN"
}

// chosen row_number(partition by c1, c2) is not a subset of rank(partition by c1) -> disabled.
explain {
sql ("select * from (select row_number() over(partition by c1, c2 order by c3) as rn, rank() over(partition by c1 order by c3) as rk from push_down_multi_predicate_through_window_t) t limit 10;")
contains "VPartitionTopN"
contains "functions: row_number"
contains "partition limit: 10"
notContains "VPartitionTopN"
}

explain {
Expand All @@ -125,32 +122,32 @@ suite("push_down_multi_filter_through_window") {
contains "partition limit: 1"
}

// chosen row_number(partition by c1, c2) (min limit) is not a subset of
// row_number(partition by c1) -> disabled.
explain {
sql ("select * from (select row_number() over(partition by c1, c2 order by c3) as rn, row_number() over(partition by c1 order by c3) as rk from push_down_multi_predicate_through_window_t) t where rn <= 1 and rk <= 10;")
contains "VPartitionTopN"
contains "functions: row_number"
contains "partition limit: 1"
notContains "VPartitionTopN"
}

// 3 windows: chosen row_number(partition by c1, c2) is not a subset of
// rank(partition by c1) nor rank(partition by c2) -> disabled.
explain {
sql ("select * from (select row_number() over(partition by c1, c2 order by c3) as rn, rank() over(partition by c1 order by c3) as rk1, rank() over(partition by c2 order by c3) as rk2 from push_down_multi_predicate_through_window_t) t where rn <= 1 and rk1 <= 10 and rk2 <= 100;")
contains "VPartitionTopN"
contains "functions: row_number"
contains "partition limit: 1"
notContains "VPartitionTopN"
}

// 3 windows: chosen row_number(partition by c2) (min limit) is not a subset
// of row_number(partition by c1) -> disabled.
explain {
sql ("select * from (select row_number() over(partition by c1 order by c3) as rn1, row_number() over(partition by c2 order by c3) as rn2, rank() over(partition by c1, c2 order by c3) as rk from push_down_multi_predicate_through_window_t) t where rn1 <= 10 and rn2 <= 1 and rk <= 100;")
contains "VPartitionTopN"
contains "functions: row_number"
contains "partition limit: 1"
notContains "VPartitionTopN"
}

// 3 windows: chosen row_number(partition by c1) (min limit) is not a subset
// of row_number(partition by c2) -> disabled.
explain {
sql ("select * from (select rank() over(partition by c1, c2 order by c3) as rk, row_number() over(partition by c1 order by c3) as rn1, row_number() over(partition by c2 order by c3) as rn2 from push_down_multi_predicate_through_window_t) t where rn1 <= 1 and rn2 <= 10 and rk <= 100;")
contains "VPartitionTopN"
contains "functions: row_number"
contains "partition limit: 1"
notContains "VPartitionTopN"
}

explain {
Expand Down
Loading
Loading