diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index a9c0e4cb28589..6e4e784866129 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -35,6 +35,7 @@ use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr, PhysicalSortRequirement, }; +use datafusion_physical_plan::execution_plan::CardinalityEffect; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::utils::{ calculate_join_output_ordering, ColumnIndex, @@ -190,6 +191,7 @@ fn pushdown_sorts_helper( } else if let Some(adjusted) = pushdown_requirement_to_children( &sort_push_down.plan, parent_requirement.clone(), + parent_fetch, )? { // For operators that can take a sort pushdown, continue with updated // requirements: @@ -216,7 +218,41 @@ fn pushdown_sorts_helper( fn pushdown_requirement_to_children( plan: &Arc, parent_required: OrderingRequirements, + parent_fetch: Option, ) -> Result>>> { + // If there is a limit on the parent plan we cannot push it down through operators that change the cardinality. + // E.g. consider if LIMIT 2 is applied below a FilteExec that filters out 1/2 of the rows we'll end up with 1 row instead of 2. + // If the LIMIT is applied after the FilterExec and the FilterExec returns > 2 rows we'll end up with 2 rows (correct). + if parent_fetch.is_some() && !plan.supports_limit_pushdown() { + return Ok(None); + } + // Note: we still need to check the cardinality effect of the plan here, because the + // limit pushdown is not always safe, even if the plan supports it. Here's an example: + // + // UnionExec advertises `supports_limit_pushdown() == true` because it can + // forward a LIMIT k to each of its children—i.e. apply “LIMIT k” separately + // on each branch before merging them together. + // + // However, UnionExec’s `cardinality_effect() == GreaterEqual` (it sums up + // all child row counts), so pushing a global TopK/LIMIT through it would + // break the semantics of “take the first k rows of the combined result.” + // + // For example, with two branches A and B and k = 3: + // — Global LIMIT: take the first 3 rows from (A ∪ B) after merging. + // — Pushed down: take 3 from A, 3 from B, then merge → up to 6 rows! + // + // That’s why we still block on cardinality: even though UnionExec can + // push a LIMIT to its children, its GreaterEqual effect means it cannot + // preserve the global TopK semantics. + if parent_fetch.is_some() { + match plan.cardinality_effect() { + CardinalityEffect::Equal => { + // safe: only true sources (e.g. CoalesceBatchesExec, ProjectionExec) pass + } + _ => return Ok(None), + } + } + let maintains_input_order = plan.maintains_input_order(); if is_window(plan) { let mut required_input_ordering = plan.required_input_ordering(); diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 3be5c1b1c370e..b4fec2677286a 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4790,3 +4790,71 @@ DROP TABLE compound_field_table_t; statement ok DROP TABLE compound_field_table_u; + + +statement ok +CREATE TABLE t1 (k INT, v INT); + +statement ok +CREATE TABLE t2 (k INT, v INT); + +statement ok +INSERT INTO t1 + SELECT value AS k, value AS v + FROM range(1, 10001) AS t(value); + +statement ok +INSERT INTO t2 VALUES (1, 1); + +## The TopK(Sort with fetch) should not be pushed down to the hash join +query TT +explain +SELECT * +FROM t1 +LEFT ANTI JOIN t2 ON t1.k = t2.k +ORDER BY t1.k +LIMIT 2; +---- +logical_plan +01)Sort: t1.k ASC NULLS LAST, fetch=2 +02)--LeftAnti Join: t1.k = t2.k +03)----TableScan: t1 projection=[k, v] +04)----TableScan: t2 projection=[k] +physical_plan +01)SortExec: TopK(fetch=2), expr=[k@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--CoalesceBatchesExec: target_batch_size=3 +03)----HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)] +04)------DataSourceExec: partitions=1, partition_sizes=[1] +05)------DataSourceExec: partitions=1, partition_sizes=[3334] + + +query II +SELECT * +FROM t1 +LEFT ANTI JOIN t2 ON t1.k = t2.k +ORDER BY t1.k +LIMIT 2; +---- +2 2 +3 3 + + +## Test left anti join without limit, we should support push down sort to the left side +query TT +explain +SELECT * +FROM t1 +LEFT ANTI JOIN t2 ON t1.k = t2.k +ORDER BY t1.k; +---- +logical_plan +01)Sort: t1.k ASC NULLS LAST +02)--LeftAnti Join: t1.k = t2.k +03)----TableScan: t1 projection=[k, v] +04)----TableScan: t2 projection=[k] +physical_plan +01)CoalesceBatchesExec: target_batch_size=3 +02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)----SortExec: expr=[k@0 ASC NULLS LAST], preserve_partitioning=[false] +05)------DataSourceExec: partitions=1, partition_sizes=[3334] diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index b46d15cb962aa..77850c6ae7c5f 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -663,15 +663,14 @@ logical_plan physical_plan 01)GlobalLimitExec: skip=4, fetch=10 02)--SortPreservingMergeExec: [c@0 DESC], fetch=14 -03)----UnionExec -04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] +03)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] +04)------UnionExec 05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c] 06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true -08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] -09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] -10)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -11)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], file_type=csv, has_header=true +08)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] +09)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], file_type=csv, has_header=true # Applying LIMIT & OFFSET to subquery. query III diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 3fc90a6459f27..e3bcfcdbda1d5 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1258,13 +1258,12 @@ logical_plan 08)--------TableScan: ordered_table projection=[a0, b, c, d] physical_plan 01)SortPreservingMergeExec: [d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], fetch=2 -02)--UnionExec -03)----SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a@2 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----UnionExec 04)------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3 as d] 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true -06)----SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST, c@1 ASC NULLS LAST, a0@3 ASC NULLS LAST, b@0 ASC NULLS LAST], preserve_partitioning=[false] -07)------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d] -08)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true +06)------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d] +07)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], file_type=csv, has_header=true # Test: run the query from above query IIIII diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 9ff382d32af95..afa78e43de2b5 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -53,7 +53,7 @@ query I select * from (select * from topk limit 8) order by x limit 3; ---- 0 -1 +2 2 diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index f901a4d373a31..45b592ee4c799 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -413,15 +413,14 @@ logical_plan 06)------TableScan: aggregate_test_100 projection=[c1, c3] physical_plan 01)SortPreservingMergeExec: [c9@1 DESC], fetch=5 -02)--UnionExec -03)----SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true] +02)--SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true] +03)----UnionExec 04)------ProjectionExec: expr=[c1@0 as c1, CAST(c9@1 AS Decimal128(20, 0)) as c9] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], file_type=csv, has_header=true -07)----SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true] -08)------ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Decimal128(20, 0)) as c9] -09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], file_type=csv, has_header=true +07)------ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Decimal128(20, 0)) as c9] +08)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +09)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], file_type=csv, has_header=true query TR SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5