diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 5c9472182b0a..9cd0e92cfebe 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -34,7 +34,7 @@ use crate::coalesce::LimitedBatchCoalescer; use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::hash_utils::create_hashes; use crate::metrics::{BaselineMetrics, SpillMetrics}; -use crate::projection::{ProjectionExec, all_columns, make_with_child, update_expr}; +use crate::projection::{ProjectionExec, make_with_child, update_expr}; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::spill_manager::SpillManager; use crate::spill::spill_pool::{self, SpillPoolWriter}; @@ -1042,18 +1042,6 @@ impl ExecutionPlan for RepartitionExec { &self, projection: &ProjectionExec, ) -> Result>> { - // If the projection does not narrow the schema, we should not try to push it down. - if projection.expr().len() >= projection.input().schema().fields().len() { - return Ok(None); - } - - // If pushdown is not beneficial or applicable, break it. - if projection.benefits_from_input_partitioning()[0] - || !all_columns(projection.expr()) - { - return Ok(None); - } - let new_projection = make_with_child(projection, self.input())?; let new_partitioning = match self.partitioning() { diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 9215ce87e3be..940358ce3ef4 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -819,32 +819,20 @@ explain SELECT int_col, bigint_col, int_col+bigint_col AS sum_col FROM table1; ---- physical_plan 01)┌───────────────────────────┐ -02)│ ProjectionExec │ +02)│ RepartitionExec │ 03)│ -------------------- │ -04)│ bigint_col: │ -05)│ bigint_col │ +04)│ partition_count(in->out): │ +05)│ 1 -> 4 │ 06)│ │ -07)│ int_col: int_col │ -08)│ │ -09)│ sum_col: │ -10)│ CAST(int_col AS Int64) + │ -11)│ bigint_col │ -12)└─────────────┬─────────────┘ -13)┌─────────────┴─────────────┐ -14)│ RepartitionExec │ -15)│ -------------------- │ -16)│ partition_count(in->out): │ -17)│ 1 -> 4 │ -18)│ │ -19)│ partitioning_scheme: │ -20)│ RoundRobinBatch(4) │ -21)└─────────────┬─────────────┘ -22)┌─────────────┴─────────────┐ -23)│ DataSourceExec │ -24)│ -------------------- │ -25)│ files: 1 │ -26)│ format: csv │ -27)└───────────────────────────┘ +07)│ partitioning_scheme: │ +08)│ RoundRobinBatch(4) │ +09)└─────────────┬─────────────┘ +10)┌─────────────┴─────────────┐ +11)│ DataSourceExec │ +12)│ -------------------- │ +13)│ files: 1 │ +14)│ format: csv │ +15)└───────────────────────────┘ query TT explain select @@ -965,31 +953,20 @@ explain SELECT int_col, bigint_col, int_col+bigint_col AS sum_col FROM table4; ---- physical_plan 01)┌───────────────────────────┐ -02)│ ProjectionExec │ +02)│ RepartitionExec │ 03)│ -------------------- │ -04)│ bigint_col: │ -05)│ bigint_col │ +04)│ partition_count(in->out): │ +05)│ 1 -> 4 │ 06)│ │ -07)│ int_col: int_col │ -08)│ │ -09)│ sum_col: │ -10)│ int_col + bigint_col │ -11)└─────────────┬─────────────┘ -12)┌─────────────┴─────────────┐ -13)│ RepartitionExec │ -14)│ -------------------- │ -15)│ partition_count(in->out): │ -16)│ 1 -> 4 │ -17)│ │ -18)│ partitioning_scheme: │ -19)│ RoundRobinBatch(4) │ -20)└─────────────┬─────────────┘ -21)┌─────────────┴─────────────┐ -22)│ DataSourceExec │ -23)│ -------------------- │ -24)│ files: 1 │ -25)│ format: json │ -26)└───────────────────────────┘ +07)│ partitioning_scheme: │ +08)│ RoundRobinBatch(4) │ +09)└─────────────┬─────────────┘ +10)┌─────────────┴─────────────┐ +11)│ DataSourceExec │ +12)│ -------------------- │ +13)│ files: 1 │ +14)│ format: json │ +15)└───────────────────────────┘ # Query with projection on arrow @@ -998,32 +975,20 @@ explain SELECT int_col, bigint_col, int_col+bigint_col AS sum_col FROM table5; ---- physical_plan 01)┌───────────────────────────┐ -02)│ ProjectionExec │ +02)│ RepartitionExec │ 03)│ -------------------- │ -04)│ bigint_col: │ -05)│ bigint_col │ +04)│ partition_count(in->out): │ +05)│ 1 -> 4 │ 06)│ │ -07)│ int_col: int_col │ -08)│ │ -09)│ sum_col: │ -10)│ CAST(int_col AS Int64) + │ -11)│ bigint_col │ -12)└─────────────┬─────────────┘ -13)┌─────────────┴─────────────┐ -14)│ RepartitionExec │ -15)│ -------------------- │ -16)│ partition_count(in->out): │ -17)│ 1 -> 4 │ -18)│ │ -19)│ partitioning_scheme: │ -20)│ RoundRobinBatch(4) │ -21)└─────────────┬─────────────┘ -22)┌─────────────┴─────────────┐ -23)│ DataSourceExec │ -24)│ -------------------- │ -25)│ files: 1 │ -26)│ format: arrow │ -27)└───────────────────────────┘ +07)│ partitioning_scheme: │ +08)│ RoundRobinBatch(4) │ +09)└─────────────┬─────────────┘ +10)┌─────────────┴─────────────┐ +11)│ DataSourceExec │ +12)│ -------------------- │ +13)│ files: 1 │ +14)│ format: arrow │ +15)└───────────────────────────┘ # Query with PartialSortExec. query TT diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index cd1ed2bc0cac..d6487b371a37 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -4371,8 +4371,8 @@ logical_plan 03)----TableScan: unbounded_csv_with_timestamps2 projection=[name, ts] physical_plan 01)SortPreservingMergeExec: [name@0 DESC, time_chunks@1 DESC], fetch=5 -02)--ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks] -03)----RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true +02)--RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true +03)----ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks] 04)------StreamingTableExec: partition_sizes=1, projection=[name, ts], infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC] statement ok diff --git a/datafusion/sqllogictest/test_files/join.slt.part b/datafusion/sqllogictest/test_files/join.slt.part index 5d111374ac8c..f5ccb9aff08c 100644 --- a/datafusion/sqllogictest/test_files/join.slt.part +++ b/datafusion/sqllogictest/test_files/join.slt.part @@ -1177,8 +1177,8 @@ logical_plan physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(CAST(t1.v0 AS Float64)@6, v1@1)], filter=v1@1 + CAST(v0@0 AS Float64) > 0, projection=[v0@0, v1@1, v2@3, v3@4, v4@5, v0@7, v1@8] 02)--CoalescePartitionsExec -03)----ProjectionExec: expr=[v0@0 as v0, v1@1 as v1, v0@2 as v0, v2@3 as v2, v3@4 as v3, v4@5 as v4, CAST(v0@0 AS Float64) as CAST(t1.v0 AS Float64)] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------ProjectionExec: expr=[v0@0 as v0, v1@1 as v1, v0@2 as v0, v2@3 as v2, v3@4 as v3, v4@5 as v4, CAST(v0@0 AS Float64) as CAST(t1.v0 AS Float64)] 05)--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(v0@0, v0@0), (v1@1, v1@1)], projection=[v0@0, v1@1, v0@2, v2@4, v3@5, v4@6] 06)----------DataSourceExec: partitions=1, partition_sizes=[0] 07)----------DataSourceExec: partitions=1, partition_sizes=[0] @@ -1375,8 +1375,8 @@ physical_plan 06)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(CAST(f.a AS Int64)@1, col0@0)], projection=[a@0, col0@2, col1@3] 07)--------ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as CAST(f.a AS Int64)] 08)----------DataSourceExec: partitions=1, partition_sizes=[1] -09)--------ProjectionExec: expr=[CAST(x@0 AS Int64) + 1 as col0, CAST(y@1 AS Int64) + 1 as col1] -10)----------RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 +09)--------RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 +10)----------ProjectionExec: expr=[CAST(x@0 AS Int64) + 1 as col0, CAST(y@1 AS Int64) + 1 as col1] 11)------------FilterExec: y@1 = x@0 12)--------------DataSourceExec: partitions=1, partition_sizes=[1] diff --git a/datafusion/sqllogictest/test_files/join_is_not_distinct_from.slt b/datafusion/sqllogictest/test_files/join_is_not_distinct_from.slt index 8246f489c446..d0fde0e64c0e 100644 --- a/datafusion/sqllogictest/test_files/join_is_not_distinct_from.slt +++ b/datafusion/sqllogictest/test_files/join_is_not_distinct_from.slt @@ -143,8 +143,8 @@ physical_plan 01)ProjectionExec: expr=[id@0 as t1_id, id@2 as t2_id, val@1 as val, val@3 as val] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1.val + Int64(1)@2, t2.val + Int64(1)@2)], projection=[id@0, val@1, id@3, val@4], NullsEqual: true 03)----CoalescePartitionsExec -04)------ProjectionExec: expr=[id@0 as id, val@1 as val, CAST(val@1 AS Int64) + 1 as t1.val + Int64(1)] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------ProjectionExec: expr=[id@0 as id, val@1 as val, CAST(val@1 AS Int64) + 1 as t1.val + Int64(1)] 06)----------FilterExec: CAST(val@1 AS Int64) + 1 IS NOT DISTINCT FROM 11 07)------------DataSourceExec: partitions=1, partition_sizes=[1] 08)----ProjectionExec: expr=[id@0 as id, val@1 as val, CAST(val@1 AS Int64) + 1 as t2.val + Int64(1)] diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 3259ef9cd811..fa1a358e62b5 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -1487,11 +1487,11 @@ physical_plan 01)ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)], projection=[t1_id@0, t1_name@1, t1_int@2, t2_id@4, t2_name@5, t2_int@6] 03)----CoalescePartitionsExec -04)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)] -05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)--------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)] 06)----------DataSourceExec: partitions=1, partition_sizes=[1] -07)----ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(join_t2.t2_id AS Int64)] -08)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +07)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +08)------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(join_t2.t2_id AS Int64)] 09)--------DataSourceExec: partitions=1, partition_sizes=[1] statement ok @@ -1512,11 +1512,11 @@ physical_plan 01)ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)], projection=[t1_id@0, t1_name@1, t1_int@2, t2_id@4, t2_name@5, t2_int@6] 03)----CoalescePartitionsExec -04)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)] -05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)--------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)] 06)----------DataSourceExec: partitions=1, partition_sizes=[1] -07)----ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(join_t2.t2_id AS Int64)] -08)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +07)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +08)------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(join_t2.t2_id AS Int64)] 09)--------DataSourceExec: partitions=1, partition_sizes=[1] # Both side expr key inner join @@ -1539,11 +1539,11 @@ physical_plan 01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id + UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)], projection=[t2_id@0, t1_id@2, t1_name@3] 03)----CoalescePartitionsExec -04)------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)] -05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)] 06)----------DataSourceExec: partitions=1, partition_sizes=[1] -07)----ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)] -08)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +07)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +08)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)] 09)--------DataSourceExec: partitions=1, partition_sizes=[1] statement ok @@ -1564,11 +1564,11 @@ physical_plan 01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id + UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)], projection=[t2_id@0, t1_id@2, t1_name@3] 03)----CoalescePartitionsExec -04)------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)] -05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)] 06)----------DataSourceExec: partitions=1, partition_sizes=[1] -07)----ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)] -08)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +07)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +08)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)] 09)--------DataSourceExec: partitions=1, partition_sizes=[1] # Left side expr key inner join @@ -1592,8 +1592,8 @@ physical_plan 01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t2_id@0, join_t1.t1_id + UInt32(11)@2)], projection=[t2_id@0, t1_id@1, t1_name@2] 03)----DataSourceExec: partitions=1, partition_sizes=[1] -04)----ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)] -05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)] 06)--------DataSourceExec: partitions=1, partition_sizes=[1] statement ok @@ -1615,8 +1615,8 @@ physical_plan 01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t2_id@0, join_t1.t1_id + UInt32(11)@2)], projection=[t2_id@0, t1_id@1, t1_name@2] 03)----DataSourceExec: partitions=1, partition_sizes=[1] -04)----ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)] -05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)] 06)--------DataSourceExec: partitions=1, partition_sizes=[1] # Right side expr key inner join @@ -1640,8 +1640,8 @@ physical_plan 01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id - UInt32(11)@1, t1_id@0)], projection=[t2_id@0, t1_id@2, t1_name@3] 03)----CoalescePartitionsExec -04)------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] -05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] 06)----------DataSourceExec: partitions=1, partition_sizes=[1] 07)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 08)------DataSourceExec: partitions=1, partition_sizes=[1] @@ -1665,8 +1665,8 @@ physical_plan 01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id - UInt32(11)@1, t1_id@0)], projection=[t2_id@0, t1_id@2, t1_name@3] 03)----CoalescePartitionsExec -04)------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] -05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] 06)----------DataSourceExec: partitions=1, partition_sizes=[1] 07)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 08)------DataSourceExec: partitions=1, partition_sizes=[1] @@ -1690,8 +1690,8 @@ logical_plan physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, join_t2.t2_id - UInt32(11)@3)], projection=[t1_id@0, t1_name@1, t1_int@2, t2_id@3, t2_name@4, t2_int@5] 02)--DataSourceExec: partitions=1, partition_sizes=[1] -03)--ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] -04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)----ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] 05)------DataSourceExec: partitions=1, partition_sizes=[1] statement ok @@ -1711,8 +1711,8 @@ logical_plan physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, join_t2.t2_id - UInt32(11)@3)], projection=[t1_id@0, t1_name@1, t1_int@2, t2_id@3, t2_name@4, t2_int@5] 02)--DataSourceExec: partitions=1, partition_sizes=[1] -03)--ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] -04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)----ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] 05)------DataSourceExec: partitions=1, partition_sizes=[1] ##### @@ -2751,8 +2751,8 @@ physical_plan 02)--SortMergeJoin: join_type=Right, on=[(CAST(t1.c3 AS Decimal128(10, 2))@4, c3@2)] 03)----SortExec: expr=[CAST(t1.c3 AS Decimal128(10, 2))@4 ASC], preserve_partitioning=[true] 04)------RepartitionExec: partitioning=Hash([CAST(t1.c3 AS Decimal128(10, 2))@4], 2), input_partitions=2 -05)--------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))] -06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +06)----------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))] 07)------------DataSourceExec: partitions=1, partition_sizes=[1] 08)----SortExec: expr=[c3@2 ASC], preserve_partitioning=[true] 09)------RepartitionExec: partitioning=Hash([c3@2], 2), input_partitions=1 @@ -3457,9 +3457,8 @@ logical_plan physical_plan 01)NestedLoopJoinExec: join_type=Inner, filter=example(join_proj_push_down_1@0, join_proj_push_down_2@1) > 3, projection=[a0@0, a@1, b@2, c@3, d@4, a0@6, a@7, b@8, c@9, d@10] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d, CAST(a@1 AS Float64) as join_proj_push_down_1], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true -03)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, CAST(a@1 AS Float64) as join_proj_push_down_2] -04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true -05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +03)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d, CAST(a@1 AS Float64) as join_proj_push_down_2], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true #### # Config teardown diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 524304546d56..65239547d3f8 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -661,12 +661,10 @@ physical_plan 02)--SortPreservingMergeExec: [c@0 DESC], fetch=14 03)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] 04)------UnionExec -05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c] -06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true -08)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] -09)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], file_type=csv, has_header=true +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true +07)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +08)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(d@4 AS Int64) as c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true # Applying LIMIT & OFFSET to subquery. query III diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 7feefc169fca..eb0cfdfd421e 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -45,9 +45,8 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, b] physical_plan 01)SortPreservingMergeExec: [a_big@0 ASC NULLS LAST, b@1 ASC NULLS LAST] -02)--ProjectionExec: expr=[CAST(a@0 AS Int64) as a_big, b@1 as b] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(a@1 AS Int64) as a_big, b], file_type=csv, has_header=true query TT EXPLAIN @@ -61,9 +60,8 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, b] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST] -02)--ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as a_big, b@1 as b] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, CAST(a@1 AS Int64) as a_big, b], output_ordering=[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], file_type=csv, has_header=true # Cast to larger types as well as preserving ordering # doesn't invalidate lexicographical ordering. @@ -82,9 +80,8 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, b] physical_plan 01)SortPreservingMergeExec: [a_big@1 ASC NULLS LAST, b@2 ASC NULLS LAST] -02)--ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as a_big, b@1 as b] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, CAST(a@1 AS Int64) as a_big, b], output_ordering=[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], file_type=csv, has_header=true # test for common rename query TT @@ -130,9 +127,8 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a_str@0 ASC NULLS LAST, b@1 ASC NULLS LAST] 02)--SortExec: expr=[a_str@0 ASC NULLS LAST, b@1 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[CAST(a@0 AS Utf8View) as a_str, b@1 as b] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(a@1 AS Utf8View) as a_str, b], file_type=csv, has_header=true # We cannot determine a+b is ordered from the # invariant [a ASC, b ASC] is satisfied. Hence @@ -165,6 +161,5 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [sum_expr@0 ASC NULLS LAST] 02)--SortExec: expr=[sum_expr@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[CAST(a@0 + b@1 AS Int64) as sum_expr, a@0 as a, b@1 as b] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(a@1 + b@2 AS Int64) as sum_expr, a, b], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 62579ea1cd0e..d32b8aba2ec1 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -585,9 +585,8 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, b, c] physical_plan 01)SortPreservingMergeExec: [result@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[b@1 + a@0 + c@2 as result] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST], [b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b@2 + a@1 + c@3 as result], file_type=csv, has_header=true statement ok drop table multiple_ordered_table; @@ -616,9 +615,8 @@ logical_plan 03)----TableScan: csv_with_timestamps projection=[ts] physical_plan 01)SortPreservingMergeExec: [db15@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@0, 1659537600000000000) as db15] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 ASC NULLS LAST], file_type=csv, has_header=false +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@1, 1659537600000000000) as db15], file_type=csv, has_header=false query TT EXPLAIN SELECT DATE_TRUNC('DAY', ts) as dt_day @@ -631,9 +629,8 @@ logical_plan 03)----TableScan: csv_with_timestamps projection=[ts] physical_plan 01)SortPreservingMergeExec: [dt_day@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[date_trunc(DAY, ts@0) as dt_day] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 ASC NULLS LAST], file_type=csv, has_header=false +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[date_trunc(DAY, ts@1) as dt_day], file_type=csv, has_header=false statement ok drop table csv_with_timestamps; @@ -674,9 +671,8 @@ logical_plan 03)----TableScan: aggregate_test_100 projection=[c11] physical_plan 01)SortPreservingMergeExec: [atan_c11@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[atan(c11@0) as atan_c11] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[atan(c11@10) as atan_c11], file_type=csv, has_header=true query TT EXPLAIN SELECT CEIL(c11) as ceil_c11 @@ -689,9 +685,8 @@ logical_plan 03)----TableScan: aggregate_test_100 projection=[c11] physical_plan 01)SortPreservingMergeExec: [ceil_c11@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[ceil(c11@0) as ceil_c11] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[ceil(c11@10) as ceil_c11], file_type=csv, has_header=true query TT EXPLAIN SELECT LOG(c12, c11) as log_c11_base_c12 @@ -704,9 +699,8 @@ logical_plan 03)----TableScan: aggregate_test_100 projection=[c11, c12] physical_plan 01)SortPreservingMergeExec: [log_c11_base_c12@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[log(c12@1, c11@0) as log_c11_base_c12] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC NULLS LAST]], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[log(c12@11, c11@10) as log_c11_base_c12], file_type=csv, has_header=true query TT EXPLAIN SELECT LOG(c11, c12) as log_c12_base_c11 @@ -719,9 +713,8 @@ logical_plan 03)----TableScan: aggregate_test_100 projection=[c11, c12] physical_plan 01)SortPreservingMergeExec: [log_c12_base_c11@0 DESC NULLS LAST] -02)--ProjectionExec: expr=[log(c11@0, c12@1) as log_c12_base_c11] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_orderings=[[c11@0 ASC NULLS LAST], [c12@1 DESC NULLS LAST]], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[log(c11@10, c12@11) as log_c12_base_c11], file_type=csv, has_header=true statement ok drop table aggregate_test_100; @@ -1165,9 +1158,8 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [c_str@0 ASC NULLS LAST], fetch=5 02)--SortExec: TopK(fetch=5), expr=[c_str@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[CAST(c@0 AS Utf8View) as c_str] -04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Utf8View) as c_str], file_type=csv, has_header=true # Casting from numeric to numeric types preserves the ordering @@ -1195,9 +1187,8 @@ logical_plan 03)----TableScan: ordered_table projection=[c] physical_plan 01)SortPreservingMergeExec: [c_bigint@0 ASC NULLS LAST], fetch=5 -02)--ProjectionExec: expr=[CAST(c@0 AS Int64) as c_bigint] -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as c_bigint], file_type=csv, has_header=true statement ok drop table ordered_table; @@ -1231,9 +1222,8 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5 02)--SortExec: TopK(fetch=5), expr=[abs_c@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[abs(c@0) as abs_c] -04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true +03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[abs(c@3) as abs_c], file_type=csv, has_header=true statement ok drop table ordered_table; @@ -1265,9 +1255,8 @@ logical_plan 03)----TableScan: ordered_table projection=[c] physical_plan 01)SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5 -02)--ProjectionExec: expr=[abs(c@0) as abs_c] -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[abs(c@3) as abs_c], file_type=csv, has_header=true # Boolean to integer casts preserve the order. statement ok @@ -1291,9 +1280,8 @@ logical_plan 03)----TableScan: annotated_data_finite projection=[inc_col, desc_col] physical_plan 01)SortPreservingMergeExec: [c@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[CAST(inc_col@0 > desc_col@1 AS Int32) as c] -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[inc_col, desc_col], output_orderings=[[inc_col@0 ASC NULLS LAST], [desc_col@1 DESC]], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[CAST(inc_col@1 > desc_col@2 AS Int32) as c], file_type=csv, has_header=true # Union a query with the actual data and one with a constant query I @@ -1314,8 +1302,8 @@ logical_plan 02)--Sort: ordered_table.a ASC NULLS LAST, fetch=1 03)----TableScan: ordered_table projection=[a, b] physical_plan -01)ProjectionExec: expr=[a@0 + b@1 as sum1] -02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true +01)RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +02)--ProjectionExec: expr=[a@0 + b@1 as sum1] 03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], file_type=csv, has_header=true @@ -1353,8 +1341,8 @@ logical_plan 02)--Sort: ordered_table.a ASC NULLS LAST, fetch=1 03)----TableScan: ordered_table projection=[a, b] physical_plan -01)ProjectionExec: expr=[a@0 + b@1 as sum1] -02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true +01)RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +02)--ProjectionExec: expr=[a@0 + b@1 as sum1] 03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index 490df4b72d17..78353498d2cf 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1403,9 +1403,8 @@ logical_plan 03)----TableScan: annotated_data_finite2 projection=[a, b] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[a@0 as a, a@0 + b@1 as annotated_data_finite2.a + annotated_data_finite2.b] -03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 + b@2 as annotated_data_finite2.a + annotated_data_finite2.b], output_ordering=[a@0 ASC NULLS LAST], file_type=csv, has_header=true # since query below doesn't computation # inside projection expr, increasing partitions diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index aba468d21fd0..a4213811a0f4 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -380,10 +380,8 @@ explain select number + 1 as number_plus, number, number + 1 as other_number_plu physical_plan 01)SortPreservingMergeExec: [number_plus@0 DESC, number@1 DESC, other_number_plus@2 DESC, age@3 ASC NULLS LAST], fetch=3 02)--SortExec: TopK(fetch=3), expr=[number_plus@0 DESC, number@1 DESC, age@3 ASC NULLS LAST], preserve_partitioning=[true], sort_prefix=[number_plus@0 DESC, number@1 DESC] -03)----ProjectionExec: expr=[__common_expr_1@0 as number_plus, number@1 as number, __common_expr_1@0 as other_number_plus, age@2 as age] -04)------ProjectionExec: expr=[CAST(number@0 AS Int64) + 1 as __common_expr_1, number@0 as number, age@1 as age] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, age], output_ordering=[number@0 DESC], file_type=parquet +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[CAST(number@0 AS Int64) + 1 as number_plus, number, CAST(number@0 AS Int64) + 1 as other_number_plus, age], output_ordering=[number@1 DESC], file_type=parquet, predicate=DynamicFilter [ empty ] # Cleanup statement ok diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part index add578c3b079..8e3909b40823 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part @@ -80,8 +80,8 @@ physical_plan 04)------AggregateExec: mode=FinalPartitioned, gby=[cntrycode@0 as cntrycode], aggr=[count(Int64(1)), sum(custsale.c_acctbal)] 05)--------RepartitionExec: partitioning=Hash([cntrycode@0], 4), input_partitions=4 06)----------AggregateExec: mode=Partial, gby=[cntrycode@0 as cntrycode], aggr=[count(Int64(1)), sum(custsale.c_acctbal)] -07)------------ProjectionExec: expr=[substr(c_phone@0, 1, 2) as cntrycode, c_acctbal@1 as c_acctbal] -08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +08)--------------ProjectionExec: expr=[substr(c_phone@0, 1, 2) as cntrycode, c_acctbal@1 as c_acctbal] 09)----------------NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@1 > avg(customer.c_acctbal)@0, projection=[c_phone@0, c_acctbal@1, avg(customer.c_acctbal)@3] 10)------------------ProjectionExec: expr=[c_phone@0 as c_phone, c_acctbal@1 as c_acctbal, CAST(c_acctbal@1 AS Decimal128(19, 6)) as join_proj_push_down_1] 11)--------------------CoalescePartitionsExec diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index b79b6d2fe5e9..c8a2d6ff8528 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -307,8 +307,8 @@ physical_plan 01)UnionExec 02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(CAST(t2.id AS Int32)@2, id@0), (name@1, name@1)], NullsEqual: true 03)----CoalescePartitionsExec -04)------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)] 06)----------DataSourceExec: partitions=1, partition_sizes=[1] 07)----AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[] 08)------RepartitionExec: partitioning=Hash([id@0, name@1], 4), input_partitions=4 @@ -405,12 +405,10 @@ physical_plan 01)SortPreservingMergeExec: [c9@1 DESC], fetch=5 02)--SortExec: TopK(fetch=5), expr=[c9@1 DESC], preserve_partitioning=[true] 03)----UnionExec -04)------ProjectionExec: expr=[c1@0 as c1, CAST(c9@1 AS Decimal128(20, 0)) as c9] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], file_type=csv, has_header=true -07)------ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Decimal128(20, 0)) as c9] -08)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -09)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], file_type=csv, has_header=true +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, CAST(c9@8 AS Decimal128(20, 0)) as c9], file_type=csv, has_header=true +06)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, CAST(c3@2 AS Decimal128(20, 0)) as c9], file_type=csv, has_header=true query TR SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5 @@ -896,8 +894,8 @@ physical_plan 01)SortPreservingMergeExec: [y@0 ASC NULLS LAST] 02)--UnionExec 03)----SortExec: expr=[y@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------ProjectionExec: expr=[CAST(y@0 AS Int64) as y] -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------ProjectionExec: expr=[CAST(y@0 AS Int64) as y] 06)----------DataSourceExec: partitions=1, partition_sizes=[2] 07)----SortExec: expr=[y@0 ASC NULLS LAST], preserve_partitioning=[false] 08)------DataSourceExec: partitions=1, partition_sizes=[1] @@ -923,8 +921,8 @@ logical_plan 05)----TableScan: u2 projection=[y] physical_plan 01)UnionExec -02)--ProjectionExec: expr=[CAST(y@0 AS Int64) as y] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----ProjectionExec: expr=[CAST(y@0 AS Int64) as y] 04)------DataSourceExec: partitions=1, partition_sizes=[2] 05)--DataSourceExec: partitions=1, partition_sizes=[1] diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 352056adbf81..08a72a1185a7 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -659,8 +659,8 @@ logical_plan physical_plan 01)ProjectionExec: expr=[__unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1],depth=2)@0 as UNNEST(UNNEST(UNNEST(recursive_unnest_table.column3)[c1])), column3@1 as column3] 02)--UnnestExec -03)----ProjectionExec: expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3] -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------ProjectionExec: expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3] 05)--------UnnestExec 06)----------ProjectionExec: expr=[column3@0 as __unnest_placeholder(recursive_unnest_table.column3), column3@0 as column3] 07)------------DataSourceExec: partitions=1, partition_sizes=[1] @@ -966,8 +966,8 @@ physical_plan 05)--------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted 06)----------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar] 07)------------UnnestExec -08)--------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))] -09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))] 10)------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 11)--------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192] diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 8ac8724683a8..9c9cdb7271d6 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -1831,9 +1831,8 @@ physical_plan 05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 06)----------SortPreservingMergeExec: [__common_expr_1@0 DESC, c9@3 DESC, c2@1 ASC NULLS LAST] 07)------------SortExec: expr=[__common_expr_1@0 DESC, c9@3 DESC, c2@1 ASC NULLS LAST], preserve_partitioning=[true] -08)--------------ProjectionExec: expr=[c3@1 + c4@2 as __common_expr_1, c2@0 as c2, c3@1 as c3, c9@3 as c9] -09)----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -10)------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c4, c9], file_type=csv, has_header=true +08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3@2 + c4@3 as __common_expr_1, c2, c3, c9], file_type=csv, has_header=true query III @@ -3353,8 +3352,8 @@ physical_plan 08)--------------RepartitionExec: partitioning=Hash([a@1, d@4], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST 09)----------------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 10)------------------RepartitionExec: partitioning=Hash([a@1, b@2], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST -11)--------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as __common_expr_1, a@0 as a, b@1 as b, c@2 as c, d@3 as d] -12)----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true +11)--------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true +12)----------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as __common_expr_1, a@0 as a, b@1 as b, c@2 as c, d@3 as d] 13)------------------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] # reset the partition number 1 again