Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::coalesce::LimitedBatchCoalescer;
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
use crate::hash_utils::create_hashes;
use crate::metrics::{BaselineMetrics, SpillMetrics};
use crate::projection::{ProjectionExec, all_columns, make_with_child, update_expr};
use crate::projection::{ProjectionExec, make_with_child, update_expr};
use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::spill::spill_manager::SpillManager;
use crate::spill::spill_pool::{self, SpillPoolWriter};
Expand Down Expand Up @@ -1042,18 +1042,6 @@ impl ExecutionPlan for RepartitionExec {
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
// If the projection does not narrow the schema, we should not try to push it down.
Copy link
Contributor

@Dandandan Dandandan Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this still shows up in the plans, in some cases the number of fields is bigger before repartitition because of this (and might hurt performance as RepartitionExec copies the input columns (currently twice)). Can we instead relax the conditition to > instead of >= so it will be pushed down if it also keeps the number of fields equal?

if projection.expr().len() >= projection.input().schema().fields().len() {
return Ok(None);
}

// If pushdown is not beneficial or applicable, break it.
if projection.benefits_from_input_partitioning()[0]
|| !all_columns(projection.expr())
{
return Ok(None);
}

let new_projection = make_with_child(projection, self.input())?;

let new_partitioning = match self.partitioning() {
Expand Down
107 changes: 36 additions & 71 deletions datafusion/sqllogictest/test_files/explain_tree.slt
Original file line number Diff line number Diff line change
Expand Up @@ -819,32 +819,20 @@ explain SELECT int_col, bigint_col, int_col+bigint_col AS sum_col FROM table1;
----
physical_plan
01)┌───────────────────────────┐
02)│ ProjectionExec
02)│ RepartitionExec
03)│ -------------------- │
04)│ bigint_col:
05)│ bigint_col
04)│ partition_count(in->out):
05)│ 1 -> 4
06)│ │
07)│ int_col: int_col │
08)│ │
09)│ sum_col: │
10)│ CAST(int_col AS Int64) + │
11)│ bigint_col │
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ RepartitionExec │
15)│ -------------------- │
16)│ partition_count(in->out): │
17)│ 1 -> 4 │
18)│ │
19)│ partitioning_scheme: │
20)│ RoundRobinBatch(4) │
21)└─────────────┬─────────────┘
22)┌─────────────┴─────────────┐
23)│ DataSourceExec │
24)│ -------------------- │
25)│ files: 1 │
26)│ format: csv │
27)└───────────────────────────┘
07)│ partitioning_scheme: │
08)│ RoundRobinBatch(4) │
09)└─────────────┬─────────────┘
10)┌─────────────┴─────────────┐
11)│ DataSourceExec │
12)│ -------------------- │
13)│ files: 1 │
14)│ format: csv │
15)└───────────────────────────┘

query TT
explain select
Expand Down Expand Up @@ -965,31 +953,20 @@ explain SELECT int_col, bigint_col, int_col+bigint_col AS sum_col FROM table4;
----
physical_plan
01)┌───────────────────────────┐
02)│ ProjectionExec
02)│ RepartitionExec
03)│ -------------------- │
04)│ bigint_col:
05)│ bigint_col
04)│ partition_count(in->out):
05)│ 1 -> 4
06)│ │
07)│ int_col: int_col │
08)│ │
09)│ sum_col: │
10)│ int_col + bigint_col │
11)└─────────────┬─────────────┘
12)┌─────────────┴─────────────┐
13)│ RepartitionExec │
14)│ -------------------- │
15)│ partition_count(in->out): │
16)│ 1 -> 4 │
17)│ │
18)│ partitioning_scheme: │
19)│ RoundRobinBatch(4) │
20)└─────────────┬─────────────┘
21)┌─────────────┴─────────────┐
22)│ DataSourceExec │
23)│ -------------------- │
24)│ files: 1 │
25)│ format: json │
26)└───────────────────────────┘
07)│ partitioning_scheme: │
08)│ RoundRobinBatch(4) │
09)└─────────────┬─────────────┘
10)┌─────────────┴─────────────┐
11)│ DataSourceExec │
12)│ -------------------- │
13)│ files: 1 │
14)│ format: json │
15)└───────────────────────────┘


# Query with projection on arrow
Expand All @@ -998,32 +975,20 @@ explain SELECT int_col, bigint_col, int_col+bigint_col AS sum_col FROM table5;
----
physical_plan
01)┌───────────────────────────┐
02)│ ProjectionExec
02)│ RepartitionExec
03)│ -------------------- │
04)│ bigint_col:
05)│ bigint_col
04)│ partition_count(in->out):
05)│ 1 -> 4
06)│ │
07)│ int_col: int_col │
08)│ │
09)│ sum_col: │
10)│ CAST(int_col AS Int64) + │
11)│ bigint_col │
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ RepartitionExec │
15)│ -------------------- │
16)│ partition_count(in->out): │
17)│ 1 -> 4 │
18)│ │
19)│ partitioning_scheme: │
20)│ RoundRobinBatch(4) │
21)└─────────────┬─────────────┘
22)┌─────────────┴─────────────┐
23)│ DataSourceExec │
24)│ -------------------- │
25)│ files: 1 │
26)│ format: arrow │
27)└───────────────────────────┘
07)│ partitioning_scheme: │
08)│ RoundRobinBatch(4) │
09)└─────────────┬─────────────┘
10)┌─────────────┴─────────────┐
11)│ DataSourceExec │
12)│ -------------------- │
13)│ files: 1 │
14)│ format: arrow │
15)└───────────────────────────┘

# Query with PartialSortExec.
query TT
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4371,8 +4371,8 @@ logical_plan
03)----TableScan: unbounded_csv_with_timestamps2 projection=[name, ts]
physical_plan
01)SortPreservingMergeExec: [name@0 DESC, time_chunks@1 DESC], fetch=5
02)--ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks]
03)----RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
02)--RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
03)----ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks]
04)------StreamingTableExec: partition_sizes=1, projection=[name, ts], infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC]

statement ok
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/join.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -1177,8 +1177,8 @@ logical_plan
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(CAST(t1.v0 AS Float64)@6, v1@1)], filter=v1@1 + CAST(v0@0 AS Float64) > 0, projection=[v0@0, v1@1, v2@3, v3@4, v4@5, v0@7, v1@8]
02)--CoalescePartitionsExec
03)----ProjectionExec: expr=[v0@0 as v0, v1@1 as v1, v0@2 as v0, v2@3 as v2, v3@4 as v3, v4@5 as v4, CAST(v0@0 AS Float64) as CAST(t1.v0 AS Float64)]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------ProjectionExec: expr=[v0@0 as v0, v1@1 as v1, v0@2 as v0, v2@3 as v2, v3@4 as v3, v4@5 as v4, CAST(v0@0 AS Float64) as CAST(t1.v0 AS Float64)]
05)--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(v0@0, v0@0), (v1@1, v1@1)], projection=[v0@0, v1@1, v0@2, v2@4, v3@5, v4@6]
06)----------DataSourceExec: partitions=1, partition_sizes=[0]
07)----------DataSourceExec: partitions=1, partition_sizes=[0]
Expand Down Expand Up @@ -1375,8 +1375,8 @@ physical_plan
06)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(CAST(f.a AS Int64)@1, col0@0)], projection=[a@0, col0@2, col1@3]
07)--------ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as CAST(f.a AS Int64)]
08)----------DataSourceExec: partitions=1, partition_sizes=[1]
09)--------ProjectionExec: expr=[CAST(x@0 AS Int64) + 1 as col0, CAST(y@1 AS Int64) + 1 as col1]
10)----------RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
09)--------RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
10)----------ProjectionExec: expr=[CAST(x@0 AS Int64) + 1 as col0, CAST(y@1 AS Int64) + 1 as col1]
11)------------FilterExec: y@1 = x@0
12)--------------DataSourceExec: partitions=1, partition_sizes=[1]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ physical_plan
01)ProjectionExec: expr=[id@0 as t1_id, id@2 as t2_id, val@1 as val, val@3 as val]
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1.val + Int64(1)@2, t2.val + Int64(1)@2)], projection=[id@0, val@1, id@3, val@4], NullsEqual: true
03)----CoalescePartitionsExec
04)------ProjectionExec: expr=[id@0 as id, val@1 as val, CAST(val@1 AS Int64) + 1 as t1.val + Int64(1)]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------ProjectionExec: expr=[id@0 as id, val@1 as val, CAST(val@1 AS Int64) + 1 as t1.val + Int64(1)]
06)----------FilterExec: CAST(val@1 AS Int64) + 1 IS NOT DISTINCT FROM 11
07)------------DataSourceExec: partitions=1, partition_sizes=[1]
08)----ProjectionExec: expr=[id@0 as id, val@1 as val, CAST(val@1 AS Int64) + 1 as t2.val + Int64(1)]
Expand Down
Loading