Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1151,8 +1151,13 @@ config_namespace! {
/// in parallel using the provided `target_partitions` level
pub repartition_aggregations: bool, default = true

/// Minimum total files size in bytes to perform file scan repartitioning.
pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
/// Minimum total file size in bytes for file-group byte-range
/// splitting to fire. Files (or merged file groups) smaller than this
/// stay as one partition. Lower values produce more, smaller
/// partitions — better at filling `target_partitions` worth of cores
/// when files are modestly sized, at the cost of slightly more
/// per-partition open / metadata-load overhead.
pub repartition_file_min_size: usize, default = 1024 * 1024

/// Should DataFusion repartition data using the join keys to execute joins in parallel
/// using the provided `target_partitions` level
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/csv_files.slt
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ id3 value3

# Reset repartition_file_min_size to default value
statement ok
SET datafusion.optimizer.repartition_file_min_size = 10485760;
RESET datafusion.optimizer.repartition_file_min_size;

statement ok
drop table stored_table_with_cr_terminator;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ datafusion.optimizer.prefer_existing_union false
datafusion.optimizer.prefer_hash_join true
datafusion.optimizer.preserve_file_partitions 0
datafusion.optimizer.repartition_aggregations true
datafusion.optimizer.repartition_file_min_size 10485760
datafusion.optimizer.repartition_file_min_size 1048576
datafusion.optimizer.repartition_file_scans true
datafusion.optimizer.repartition_joins true
datafusion.optimizer.repartition_sorts true
Expand Down Expand Up @@ -475,7 +475,7 @@ datafusion.optimizer.prefer_existing_union false When set to true, the optimizer
datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory
datafusion.optimizer.preserve_file_partitions 0 Minimum number of distinct partition values required to group files by their Hive partition column values (enabling Hash partitioning declaration). How the option is used: - preserve_file_partitions=0: Disable it. - preserve_file_partitions=1: Always enable it. - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N. This threshold preserves I/O parallelism when file partitioning is below it. Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct partitions is less than the target_partitions.
datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level
datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning.
datafusion.optimizer.repartition_file_min_size 1048576 Minimum total file size in bytes for file-group byte-range splitting to fire. Files (or merged file groups) smaller than this stay as one partition. Lower values produce more, smaller partitions — better at filling `target_partitions` worth of cores when files are modestly sized, at the cost of slightly more per-partition open / metadata-load overhead.
datafusion.optimizer.repartition_file_scans true When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism. This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). For FileSources, only Parquet and CSV formats are currently supported. If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't happen within a single file. If set to `true` for an in-memory source, all memtable's partitions will have their batches repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change the total number of partitions and batches per partition, but does not slice the initial record tables provided to the MemTable on creation.
datafusion.optimizer.repartition_joins true Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level
datafusion.optimizer.repartition_sorts true Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below ```text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ``` would turn into the plan below which performs better in multithreaded environments ```text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ```
Expand Down Expand Up @@ -895,7 +895,7 @@ show functions
statement ok
reset datafusion.catalog.information_schema;

# The SLT runner sets `target_partitions` to 4 instead of using the default, so
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
# reset it explicitly.
statement ok
set datafusion.execution.target_partitions = 4;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/tpch/plans/q10.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ physical_plan
09)----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(o_orderkey@7, l_orderkey@0)], projection=[c_custkey@0, c_name@1, c_address@2, c_nationkey@3, c_phone@4, c_acctbal@5, c_comment@6, l_extendedprice@9, l_discount@10]
10)------------------RepartitionExec: partitioning=Hash([o_orderkey@7], 4), input_partitions=4
11)--------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c_custkey@0, o_custkey@1)], projection=[c_custkey@0, c_name@1, c_address@2, c_nationkey@3, c_phone@4, c_acctbal@5, c_comment@6, o_orderkey@7]
12)----------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=1
13)------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_comment], file_type=csv, has_header=false
12)----------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=4
13)------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl:0..606529], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl:606529..1213058], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl:1213058..1819587], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl:1819587..2426114]]}, projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_comment], file_type=csv, has_header=false
14)----------------------RepartitionExec: partitioning=Hash([o_custkey@1], 4), input_partitions=4
15)------------------------FilterExec: o_orderdate@2 >= 1993-10-01 AND o_orderdate@2 < 1994-01-01, projection=[o_orderkey@0, o_custkey@1]
16)--------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]}, projection=[o_orderkey, o_custkey, o_orderdate], file_type=csv, has_header=false
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ physical_plan
07)------------ProjectionExec: expr=[count(orders.o_orderkey)@1 as c_count]
08)--------------AggregateExec: mode=SinglePartitioned, gby=[c_custkey@0 as c_custkey], aggr=[count(orders.o_orderkey)]
09)----------------HashJoinExec: mode=Partitioned, join_type=Left, on=[(c_custkey@0, o_custkey@1)], projection=[c_custkey@0, o_orderkey@1]
10)------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=1
11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_custkey], file_type=csv, has_header=false
10)------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=4
11)--------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl:0..606529], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl:606529..1213058], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl:1213058..1819587], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl:1819587..2426114]]}, projection=[c_custkey], file_type=csv, has_header=false
12)------------------RepartitionExec: partitioning=Hash([o_custkey@1], 4), input_partitions=4
13)--------------------FilterExec: o_comment@2 NOT LIKE %special%requests%, projection=[o_orderkey@0, o_custkey@1]
14)----------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]}, projection=[o_orderkey, o_custkey, o_comment], file_type=csv, has_header=false
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/tpch/plans/q14.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ physical_plan
07)------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), input_partitions=4
08)--------------FilterExec: l_shipdate@3 >= 1995-09-01 AND l_shipdate@3 < 1995-10-01, projection=[l_partkey@0, l_extendedprice@1, l_discount@2]
09)----------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_partkey, l_extendedprice, l_discount, l_shipdate], file_type=csv, has_header=false
10)------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=1
11)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]}, projection=[p_partkey, p_type], file_type=csv, has_header=false
10)------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4
11)--------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl:0..597773], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl:597773..1195546], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl:1195546..1793319], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl:1793319..2391090]]}, projection=[p_partkey, p_type], file_type=csv, has_header=false
9 changes: 4 additions & 5 deletions datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ physical_plan
14)--------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]}, projection=[ps_partkey, ps_suppkey], file_type=csv, has_header=false
15)------------------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4
16)--------------------------FilterExec: p_brand@1 != Brand#45 AND p_type@2 NOT LIKE MEDIUM POLISHED% AND p_size@3 IN (SET) ([49, 14, 23, 45, 19, 3, 36, 9])
17)----------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
18)------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]}, projection=[p_partkey, p_brand, p_type, p_size], file_type=csv, has_header=false
19)--------------------FilterExec: s_comment@1 LIKE %Customer%Complaints%, projection=[s_suppkey@0]
20)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
21)------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_comment], file_type=csv, has_header=false
17)----------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl:0..597773], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl:597773..1195546], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl:1195546..1793319], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl:1793319..2391090]]}, projection=[p_partkey, p_brand, p_type, p_size], file_type=csv, has_header=false
18)--------------------FilterExec: s_comment@1 LIKE %Customer%Complaints%, projection=[s_suppkey@0]
19)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
20)------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_comment], file_type=csv, has_header=false
13 changes: 6 additions & 7 deletions datafusion/sqllogictest/test_files/tpch/plans/q17.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ physical_plan
08)--------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_partkey, l_quantity, l_extendedprice], file_type=csv, has_header=false
09)------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4
10)--------------FilterExec: p_brand@1 = Brand#23 AND p_container@2 = MED BOX, projection=[p_partkey@0]
11)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
12)------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]}, projection=[p_partkey, p_brand, p_container], file_type=csv, has_header=false
13)----------ProjectionExec: expr=[CAST(0.2 * CAST(avg(lineitem.l_quantity)@1 AS Float64) AS Decimal128(30, 15)) as Float64(0.2) * avg(lineitem.l_quantity), l_partkey@0 as l_partkey]
14)------------AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey], aggr=[avg(lineitem.l_quantity)]
15)--------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), input_partitions=4
16)----------------AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[avg(lineitem.l_quantity)]
17)------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_partkey, l_quantity], file_type=csv, has_header=false
11)----------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl:0..597773], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl:597773..1195546], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl:1195546..1793319], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl:1793319..2391090]]}, projection=[p_partkey, p_brand, p_container], file_type=csv, has_header=false
12)----------ProjectionExec: expr=[CAST(0.2 * CAST(avg(lineitem.l_quantity)@1 AS Float64) AS Decimal128(30, 15)) as Float64(0.2) * avg(lineitem.l_quantity), l_partkey@0 as l_partkey]
13)------------AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey], aggr=[avg(lineitem.l_quantity)]
14)--------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), input_partitions=4
15)----------------AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[avg(lineitem.l_quantity)]
16)------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_partkey, l_quantity], file_type=csv, has_header=false
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/tpch/plans/q18.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ physical_plan
05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(o_orderkey@2, l_orderkey@0)], projection=[c_custkey@0, c_name@1, o_orderkey@2, o_totalprice@3, o_orderdate@4, l_quantity@6]
06)----------RepartitionExec: partitioning=Hash([o_orderkey@2], 4), input_partitions=4
07)------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c_custkey@0, o_custkey@1)], projection=[c_custkey@0, c_name@1, o_orderkey@2, o_totalprice@4, o_orderdate@5]
08)--------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=1
09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_custkey, c_name], file_type=csv, has_header=false
08)--------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=4
09)----------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl:0..606529], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl:606529..1213058], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl:1213058..1819587], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl:1819587..2426114]]}, projection=[c_custkey, c_name], file_type=csv, has_header=false
10)--------------RepartitionExec: partitioning=Hash([o_custkey@1], 4), input_partitions=4
11)----------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]}, projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate], file_type=csv, has_header=false
12)----------RepartitionExec: partitioning=Hash([l_orderkey@0], 4), input_partitions=4
Expand Down
Loading
Loading