From 088d7425a812becca08a5a7e33b65c51db8b3199 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Tue, 1 Jul 2025 21:40:14 +0800 Subject: [PATCH 01/13] Fix TopK Sort incorrectly pushed down past Join with anti join --- .../src/enforce_sorting/sort_pushdown.rs | 11 ++++- datafusion/sqllogictest/test_files/joins.slt | 46 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index a9c0e4cb28589..dd7a2259f7442 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -663,11 +663,20 @@ fn handle_custom_pushdown( } // For hash join we only maintain the input order for the right child -// for join type: Inner, Right, RightSemi, RightAnti +// for join types: Inner, Right, RightSemi fn handle_hash_join( plan: &HashJoinExec, parent_required: OrderingRequirements, ) -> Result>>> { + // Anti-joins (LeftAnti or RightAnti) do not preserve meaningful input order, + // so sorting beforehand cannot be relied on. Bail out early for both flavors: + match plan.join_type() { + JoinType::LeftAnti | JoinType::RightAnti => { + return Ok(None); + } + _ => {} + } + // If the plan has no children or does not maintain the right side ordering, // return early: if !plan.maintains_input_order()[1] { diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 3be5c1b1c370e..c7e8366ca79ee 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4790,3 +4790,49 @@ DROP TABLE compound_field_table_t; statement ok DROP TABLE compound_field_table_u; + + +statement ok +CREATE TABLE t1 (k INT, v INT); + +statement ok +CREATE TABLE t2 (k INT, v INT); + +statement ok +INSERT INTO t1 + SELECT value AS k, value AS v + FROM range(1, 10001) AS t(value); + +statement ok +INSERT INTO t2 VALUES (1, 1); + +query TT +explain +SELECT * +FROM t1 +LEFT ANTI JOIN t2 ON t1.k = t2.k +ORDER BY t1.k +LIMIT 2; +---- +logical_plan +01)Sort: t1.k ASC NULLS LAST, fetch=2 +02)--LeftAnti Join: t1.k = t2.k +03)----TableScan: t1 projection=[k, v] +04)----TableScan: t2 projection=[k] +physical_plan +01)SortExec: TopK(fetch=2), expr=[k@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--CoalesceBatchesExec: target_batch_size=3 +03)----HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)] +04)------DataSourceExec: partitions=1, partition_sizes=[1] +05)------DataSourceExec: partitions=1, partition_sizes=[3334] + + +query II +SELECT * +FROM t1 +LEFT ANTI JOIN t2 ON t1.k = t2.k +ORDER BY t1.k +LIMIT 2; +---- +2 2 +3 3 From f2201c0ae04fa46adfbb635e2a462c8272cd5ba8 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Tue, 1 Jul 2025 22:31:17 +0800 Subject: [PATCH 02/13] fix comments --- .../physical-optimizer/src/enforce_sorting/sort_pushdown.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index dd7a2259f7442..32d0460ab43e5 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -663,7 +663,7 @@ fn handle_custom_pushdown( } // For hash join we only maintain the input order for the right child -// for join types: Inner, Right, RightSemi +// for join type: Inner, Right, RightSemi, RightAnti fn handle_hash_join( plan: &HashJoinExec, parent_required: OrderingRequirements, From 7859e0f9262ecd34e4878b3e54781a76e3efe02a Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 2 Jul 2025 22:38:03 +0800 Subject: [PATCH 03/13] Address comments --- .../src/enforce_sorting/sort_pushdown.rs | 15 +++++-------- datafusion/sqllogictest/test_files/joins.slt | 22 +++++++++++++++++++ 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 32d0460ab43e5..6ec145bd796bb 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -190,6 +190,7 @@ fn pushdown_sorts_helper( } else if let Some(adjusted) = pushdown_requirement_to_children( &sort_push_down.plan, parent_requirement.clone(), + parent_fetch, )? { // For operators that can take a sort pushdown, continue with updated // requirements: @@ -216,6 +217,7 @@ fn pushdown_sorts_helper( fn pushdown_requirement_to_children( plan: &Arc, parent_required: OrderingRequirements, + parent_fetch: Option, ) -> Result>>> { let maintains_input_order = plan.maintains_input_order(); if is_window(plan) { @@ -345,6 +347,10 @@ fn pushdown_requirement_to_children( Ok(None) } } else if let Some(hash_join) = plan.as_any().downcast_ref::() { + // We should not push down TopK requirements through HashJoinExec + if parent_fetch.is_some() { + return Ok(None); + } handle_hash_join(hash_join, parent_required) } else { handle_custom_pushdown(plan, parent_required, maintains_input_order) @@ -668,15 +674,6 @@ fn handle_hash_join( plan: &HashJoinExec, parent_required: OrderingRequirements, ) -> Result>>> { - // Anti-joins (LeftAnti or RightAnti) do not preserve meaningful input order, - // so sorting beforehand cannot be relied on. Bail out early for both flavors: - match plan.join_type() { - JoinType::LeftAnti | JoinType::RightAnti => { - return Ok(None); - } - _ => {} - } - // If the plan has no children or does not maintain the right side ordering, // return early: if !plan.maintains_input_order()[1] { diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index c7e8366ca79ee..b4fec2677286a 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4806,6 +4806,7 @@ INSERT INTO t1 statement ok INSERT INTO t2 VALUES (1, 1); +## The TopK(Sort with fetch) should not be pushed down to the hash join query TT explain SELECT * @@ -4836,3 +4837,24 @@ LIMIT 2; ---- 2 2 3 3 + + +## Test left anti join without limit, we should support push down sort to the left side +query TT +explain +SELECT * +FROM t1 +LEFT ANTI JOIN t2 ON t1.k = t2.k +ORDER BY t1.k; +---- +logical_plan +01)Sort: t1.k ASC NULLS LAST +02)--LeftAnti Join: t1.k = t2.k +03)----TableScan: t1 projection=[k, v] +04)----TableScan: t2 projection=[k] +physical_plan +01)CoalesceBatchesExec: target_batch_size=3 +02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)----SortExec: expr=[k@0 ASC NULLS LAST], preserve_partitioning=[false] +05)------DataSourceExec: partitions=1, partition_sizes=[3334] From 7e0a73d65f827dca0cae35eaed44010bfe25ae49 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 2 Jul 2025 23:22:42 +0800 Subject: [PATCH 04/13] Using cardinality_effect to do it --- .../src/enforce_sorting/sort_pushdown.rs | 22 +++++++++++++++---- datafusion/sqllogictest/test_files/limit.slt | 11 +++++----- datafusion/sqllogictest/test_files/order.slt | 9 ++++---- datafusion/sqllogictest/test_files/topk.slt | 2 +- datafusion/sqllogictest/test_files/union.slt | 11 +++++----- 5 files changed, 33 insertions(+), 22 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 6ec145bd796bb..a9e2bab6d359b 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -35,6 +35,7 @@ use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr, PhysicalSortRequirement, }; +use datafusion_physical_plan::execution_plan::CardinalityEffect; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::utils::{ calculate_join_output_ordering, ColumnIndex, @@ -219,6 +220,23 @@ fn pushdown_requirement_to_children( parent_required: OrderingRequirements, parent_fetch: Option, ) -> Result>>> { + // Only attempt to push down TopK when there is an upstream LIMIT + if parent_fetch.is_some() { + // 1) Never push a new TopK below an operator that already has its own fetch + if plan.fetch().is_some() { + return Ok(None); + } + // 2) Only allow pushdown through operators that do not increase row count + // (equal or lower-equal cardinality). Any other operator (including joins, + // sort-with-limit, or UDTFs that may expand rows) must stop the pushdown. + let effect = plan.cardinality_effect(); + if !matches!(effect, CardinalityEffect::Equal | CardinalityEffect::LowerEqual) { + return Ok(None); + } + // At this point, only single-input, non-expanding operators + // such as Filter, Projection, or Map are allowed to receive TopK. + } + let maintains_input_order = plan.maintains_input_order(); if is_window(plan) { let mut required_input_ordering = plan.required_input_ordering(); @@ -347,10 +365,6 @@ fn pushdown_requirement_to_children( Ok(None) } } else if let Some(hash_join) = plan.as_any().downcast_ref::() { - // We should not push down TopK requirements through HashJoinExec - if parent_fetch.is_some() { - return Ok(None); - } handle_hash_join(hash_join, parent_required) } else { handle_custom_pushdown(plan, parent_required, maintains_input_order) diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index b46d15cb962aa..77850c6ae7c5f 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -663,15 +663,14 @@ logical_plan physical_plan 01)GlobalLimitExec: skip=4, fetch=10 02)--SortPreservingMergeExec: [c@0 DESC], fetch=14 -03)----UnionExec -04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] +03)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] +04)------UnionExec 05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c] 06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true -08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] -09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] -10)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -11)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], file_type=csv, has_header=true +08)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] +09)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], file_type=csv, has_header=true # Applying LIMIT & OFFSET to subquery. query III diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 3fc90a6459f27..e3bcfcdbda1d5 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1258,13 +1258,12 @@ logical_plan 08)--------TableScan: ordered_table projection=[a0, b, c, d] physical_plan 01)SortPreservingMergeExec: [d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], fetch=2 -02)--UnionExec -03)----SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----UnionExec 04)------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3 as d] 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true -06)----SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[false] -07)------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d] -08)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true +06)------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d] +07)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true # Test: run the query from above query IIIII diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 9ff382d32af95..afa78e43de2b5 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -53,7 +53,7 @@ query I select * from (select * from topk limit 8) order by x limit 3; ---- 0 -1 +2 2 diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index f901a4d373a31..45b592ee4c799 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -413,15 +413,14 @@ logical_plan 06)------TableScan: aggregate_test_100 projection=[c1, c3] physical_plan 01)SortPreservingMergeExec: [c9@1 DESC], fetch=5 -02)--UnionExec -03)----SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true] +02)--SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true] +03)----UnionExec 04)------ProjectionExec: expr=[c1@0 as c1, CAST(c9@1 AS Decimal128(20, 0)) as c9] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], file_type=csv, has_header=true -07)----SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true] -08)------ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Decimal128(20, 0)) as c9] -09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], file_type=csv, has_header=true +07)------ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Decimal128(20, 0)) as c9] +08)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +09)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], file_type=csv, has_header=true query TR SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5 From 8b59fd8e9c0b119de59d1958a23e8cd3f3807466 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 2 Jul 2025 23:29:31 +0800 Subject: [PATCH 05/13] fmt --- .../physical-optimizer/src/enforce_sorting/sort_pushdown.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index a9e2bab6d359b..a7f7361e2928f 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -230,7 +230,10 @@ fn pushdown_requirement_to_children( // (equal or lower-equal cardinality). Any other operator (including joins, // sort-with-limit, or UDTFs that may expand rows) must stop the pushdown. let effect = plan.cardinality_effect(); - if !matches!(effect, CardinalityEffect::Equal | CardinalityEffect::LowerEqual) { + if !matches!( + effect, + CardinalityEffect::Equal | CardinalityEffect::LowerEqual + ) { return Ok(None); } // At this point, only single-input, non-expanding operators From b3583714548d2346f2ae1c89865596511c2be315 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 2 Jul 2025 23:40:51 +0800 Subject: [PATCH 06/13] Update datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> --- .../physical-optimizer/src/enforce_sorting/sort_pushdown.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index a7f7361e2928f..691762b51ffbd 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -231,8 +231,8 @@ fn pushdown_requirement_to_children( // sort-with-limit, or UDTFs that may expand rows) must stop the pushdown. let effect = plan.cardinality_effect(); if !matches!( - effect, - CardinalityEffect::Equal | CardinalityEffect::LowerEqual + plan.cardinality_effect(), + CardinalityEffect::Equal ) { return Ok(None); } From 512b137bdbdf7e8c8eb60397279aa4fdee46b214 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 2 Jul 2025 23:50:29 +0800 Subject: [PATCH 07/13] Revert "Update datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs" This reverts commit b3583714548d2346f2ae1c89865596511c2be315. --- .../physical-optimizer/src/enforce_sorting/sort_pushdown.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 691762b51ffbd..a7f7361e2928f 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -231,8 +231,8 @@ fn pushdown_requirement_to_children( // sort-with-limit, or UDTFs that may expand rows) must stop the pushdown. let effect = plan.cardinality_effect(); if !matches!( - plan.cardinality_effect(), - CardinalityEffect::Equal + effect, + CardinalityEffect::Equal | CardinalityEffect::LowerEqual ) { return Ok(None); } From 6f95a6343f60d8bd63cf45a6b46badff888fb4d3 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 3 Jul 2025 00:13:00 +0800 Subject: [PATCH 08/13] fix --- .../physical-optimizer/src/enforce_sorting/sort_pushdown.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index a7f7361e2928f..b76815a1fa3e5 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -230,10 +230,7 @@ fn pushdown_requirement_to_children( // (equal or lower-equal cardinality). Any other operator (including joins, // sort-with-limit, or UDTFs that may expand rows) must stop the pushdown. let effect = plan.cardinality_effect(); - if !matches!( - effect, - CardinalityEffect::Equal | CardinalityEffect::LowerEqual - ) { + if !matches!(effect, CardinalityEffect::Equal) { return Ok(None); } // At this point, only single-input, non-expanding operators From 92366d69a730c9193cc244ddf0f930aa7352183d Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 3 Jul 2025 00:14:42 +0800 Subject: [PATCH 09/13] fix --- .../physical-optimizer/src/enforce_sorting/sort_pushdown.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index b76815a1fa3e5..615f58b55bd69 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -227,14 +227,14 @@ fn pushdown_requirement_to_children( return Ok(None); } // 2) Only allow pushdown through operators that do not increase row count - // (equal or lower-equal cardinality). Any other operator (including joins, + // (equal cardinality). Any other operator (including joins, filter, // sort-with-limit, or UDTFs that may expand rows) must stop the pushdown. let effect = plan.cardinality_effect(); if !matches!(effect, CardinalityEffect::Equal) { return Ok(None); } // At this point, only single-input, non-expanding operators - // such as Filter, Projection, or Map are allowed to receive TopK. + // such as ProjectionExec, CoalesceBatchesExec, are allowed to receive TopK. } let maintains_input_order = plan.maintains_input_order(); From c8c605164c05690f6a181327a68f3a4ff20cd081 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 3 Jul 2025 10:45:26 +0800 Subject: [PATCH 10/13] Update datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> --- .../src/enforce_sorting/sort_pushdown.rs | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 615f58b55bd69..04975bbc534dc 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -220,21 +220,11 @@ fn pushdown_requirement_to_children( parent_required: OrderingRequirements, parent_fetch: Option, ) -> Result>>> { - // Only attempt to push down TopK when there is an upstream LIMIT - if parent_fetch.is_some() { - // 1) Never push a new TopK below an operator that already has its own fetch - if plan.fetch().is_some() { - return Ok(None); - } - // 2) Only allow pushdown through operators that do not increase row count - // (equal cardinality). Any other operator (including joins, filter, - // sort-with-limit, or UDTFs that may expand rows) must stop the pushdown. - let effect = plan.cardinality_effect(); - if !matches!(effect, CardinalityEffect::Equal) { - return Ok(None); - } - // At this point, only single-input, non-expanding operators - // such as ProjectionExec, CoalesceBatchesExec, are allowed to receive TopK. + // If there is a limit on the parent plan we cannot push it down through operators that change the cardinality. + // E.g. consider if LIMIT 2 is applied below a FilteExec that filters out 1/2 of the rows we'll end up with 1 row instead of 2. + // If the LIMIT is applied after the FilterExec and the FilterExec returns > 2 rows we'll end up with 2 rows (correct). + if parent_fetch.is_some() && !plan.supports_limit_pushdown() { + return Ok(None) } let maintains_input_order = plan.maintains_input_order(); From c7178fba8bb4eb76aa77c0b0542b2c3c4c888f10 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 3 Jul 2025 11:08:19 +0800 Subject: [PATCH 11/13] polish code --- .../src/enforce_sorting/sort_pushdown.rs | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 04975bbc534dc..6d3bb91ba1870 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -223,8 +223,35 @@ fn pushdown_requirement_to_children( // If there is a limit on the parent plan we cannot push it down through operators that change the cardinality. // E.g. consider if LIMIT 2 is applied below a FilteExec that filters out 1/2 of the rows we'll end up with 1 row instead of 2. // If the LIMIT is applied after the FilterExec and the FilterExec returns > 2 rows we'll end up with 2 rows (correct). - if parent_fetch.is_some() && !plan.supports_limit_pushdown() { - return Ok(None) + if parent_fetch.is_some() { + if !plan.supports_limit_pushdown() { + return Ok(None); + } + + // Note: we still need to check the cardinality effect of the plan here, because the + // limit pushdown is not always safe, even if the plan supports it. Here's an example: + // + // UnionExec advertises `supports_limit_pushdown() == true` because it can + // forward a LIMIT k to each of its children—i.e. apply “LIMIT k” separately + // on each branch before merging them together. + // + // However, UnionExec’s `cardinality_effect() == GreaterEqual` (it sums up + // all child row counts), so pushing a global TopK/LIMIT through it would + // break the semantics of “take the first k rows of the combined result.” + // + // For example, with two branches A and B and k = 3: + // — Global LIMIT: take the first 3 rows from (A ∪ B) after merging. + // — Pushed down: take 3 from A, 3 from B, then merge → up to 6 rows! + // + // That’s why we still block on cardinality: even though UnionExec can + // push a LIMIT to its children, its GreaterEqual effect means it cannot + // preserve the global TopK semantics. + match plan.cardinality_effect() { + CardinalityEffect::Equal => { + // safe: only true sources (e.g. CoalesceBatchesExec, ProjectionExec) pass + } + _ => return Ok(None), + } } let maintains_input_order = plan.maintains_input_order(); From a3838a454bcc9fcb2cb32cb5d6a52a2779ddb442 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 3 Jul 2025 23:39:26 +0800 Subject: [PATCH 12/13] Update datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> --- .../physical-optimizer/src/enforce_sorting/sort_pushdown.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 6d3bb91ba1870..69038b2788374 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -223,10 +223,8 @@ fn pushdown_requirement_to_children( // If there is a limit on the parent plan we cannot push it down through operators that change the cardinality. // E.g. consider if LIMIT 2 is applied below a FilteExec that filters out 1/2 of the rows we'll end up with 1 row instead of 2. // If the LIMIT is applied after the FilterExec and the FilterExec returns > 2 rows we'll end up with 2 rows (correct). - if parent_fetch.is_some() { - if !plan.supports_limit_pushdown() { - return Ok(None); - } + if parent_fetch.is_some() && !plan.supports_limit_pushdown() { + return Ok(None); // Note: we still need to check the cardinality effect of the plan here, because the // limit pushdown is not always safe, even if the plan supports it. Here's an example: From b7aaab27c09fd24af667940c32b5e4654a85c0e3 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 3 Jul 2025 23:45:26 +0800 Subject: [PATCH 13/13] fix --- .../src/enforce_sorting/sort_pushdown.rs | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 69038b2788374..6e4e784866129 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -225,25 +225,26 @@ fn pushdown_requirement_to_children( // If the LIMIT is applied after the FilterExec and the FilterExec returns > 2 rows we'll end up with 2 rows (correct). if parent_fetch.is_some() && !plan.supports_limit_pushdown() { return Ok(None); - - // Note: we still need to check the cardinality effect of the plan here, because the - // limit pushdown is not always safe, even if the plan supports it. Here's an example: - // - // UnionExec advertises `supports_limit_pushdown() == true` because it can - // forward a LIMIT k to each of its children—i.e. apply “LIMIT k” separately - // on each branch before merging them together. - // - // However, UnionExec’s `cardinality_effect() == GreaterEqual` (it sums up - // all child row counts), so pushing a global TopK/LIMIT through it would - // break the semantics of “take the first k rows of the combined result.” - // - // For example, with two branches A and B and k = 3: - // — Global LIMIT: take the first 3 rows from (A ∪ B) after merging. - // — Pushed down: take 3 from A, 3 from B, then merge → up to 6 rows! - // - // That’s why we still block on cardinality: even though UnionExec can - // push a LIMIT to its children, its GreaterEqual effect means it cannot - // preserve the global TopK semantics. + } + // Note: we still need to check the cardinality effect of the plan here, because the + // limit pushdown is not always safe, even if the plan supports it. Here's an example: + // + // UnionExec advertises `supports_limit_pushdown() == true` because it can + // forward a LIMIT k to each of its children—i.e. apply “LIMIT k” separately + // on each branch before merging them together. + // + // However, UnionExec’s `cardinality_effect() == GreaterEqual` (it sums up + // all child row counts), so pushing a global TopK/LIMIT through it would + // break the semantics of “take the first k rows of the combined result.” + // + // For example, with two branches A and B and k = 3: + // — Global LIMIT: take the first 3 rows from (A ∪ B) after merging. + // — Pushed down: take 3 from A, 3 from B, then merge → up to 6 rows! + // + // That’s why we still block on cardinality: even though UnionExec can + // push a LIMIT to its children, its GreaterEqual effect means it cannot + // preserve the global TopK semantics. + if parent_fetch.is_some() { match plan.cardinality_effect() { CardinalityEffect::Equal => { // safe: only true sources (e.g. CoalesceBatchesExec, ProjectionExec) pass