Skip to content

Commit 68dc644

Browse files
authored
Enable PhysicalOptimizerRule lazily (#4806) (#4807)
1 parent 34a8b86 commit 68dc644

File tree

3 files changed

+59
-62
lines changed

3 files changed

+59
-62
lines changed

datafusion/core/src/execution/context.rs

Lines changed: 48 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1449,59 +1449,54 @@ impl SessionState {
14491449
}
14501450

14511451
// We need to take care of the rule ordering. They may influence each other.
1452-
let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> =
1453-
vec![Arc::new(AggregateStatistics::new())];
1454-
1455-
// - In order to increase the parallelism, it will change the output partitioning
1456-
// of some operators in the plan tree, which will influence other rules.
1457-
// Therefore, it should be run as soon as possible.
1458-
// - The reason to make it optional is
1459-
// - it's not used for the distributed engine, Ballista.
1460-
// - it's conflicted with some parts of the BasicEnforcement, since it will
1461-
// introduce additional repartitioning while the BasicEnforcement aims at
1462-
// reducing unnecessary repartitioning.
1463-
if config.options.optimizer.enable_round_robin_repartition {
1464-
physical_optimizers.push(Arc::new(Repartition::new()));
1465-
}
1466-
//- Currently it will depend on the partition number to decide whether to change the
1467-
// single node sort to parallel local sort and merge. Therefore, it should be run
1468-
// after the Repartition.
1469-
// - Since it will change the output ordering of some operators, it should be run
1470-
// before JoinSelection and BasicEnforcement, which may depend on that.
1471-
physical_optimizers.push(Arc::new(GlobalSortSelection::new()));
1472-
// Statistics-base join selection will change the Auto mode to real join implementation,
1473-
// like collect left, or hash join, or future sort merge join, which will
1474-
// influence the BasicEnforcement to decide whether to add additional repartition
1475-
// and local sort to meet the distribution and ordering requirements.
1476-
// Therefore, it should be run before BasicEnforcement
1477-
physical_optimizers.push(Arc::new(JoinSelection::new()));
1478-
// If the query is processing infinite inputs, the PipelineFixer rule applies the
1479-
// necessary transformations to make the query runnable (if it is not already runnable).
1480-
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
1481-
// Since the transformations it applies may alter output partitioning properties of operators
1482-
// (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
1483-
physical_optimizers.push(Arc::new(PipelineFixer::new()));
1484-
// It's for adding essential repartition and local sorting operator to satisfy the
1485-
// required distribution and local sort.
1486-
// Please make sure that the whole plan tree is determined.
1487-
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
1488-
// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
1489-
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
1490-
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
1491-
// rule below performs this analysis and removes unnecessary `SortExec`s.
1492-
physical_optimizers.push(Arc::new(OptimizeSorts::new()));
1493-
// It will not influence the distribution and ordering of the whole plan tree.
1494-
// Therefore, to avoid influencing other rules, it should be run at last.
1495-
if config.options.execution.coalesce_batches {
1496-
physical_optimizers.push(Arc::new(CoalesceBatches::new(
1497-
config.options.execution.batch_size,
1498-
)));
1499-
}
1500-
// The PipelineChecker rule will reject non-runnable query plans that use
1501-
// pipeline-breaking operators on infinite input(s). The rule generates a
1502-
// diagnostic error message when this happens. It makes no changes to the
1503-
// given query plan; i.e. it only acts as a final gatekeeping rule.
1504-
physical_optimizers.push(Arc::new(PipelineChecker::new()));
1452+
let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
1453+
Arc::new(AggregateStatistics::new()),
1454+
// - In order to increase the parallelism, it will change the output partitioning
1455+
// of some operators in the plan tree, which will influence other rules.
1456+
// Therefore, it should be run as soon as possible.
1457+
// - The reason to make it optional is
1458+
// - it's not used for the distributed engine, Ballista.
1459+
// - it's conflicted with some parts of the BasicEnforcement, since it will
1460+
// introduce additional repartitioning while the BasicEnforcement aims at
1461+
// reducing unnecessary repartitioning.
1462+
Arc::new(Repartition::new()),
1463+
//- Currently it will depend on the partition number to decide whether to change the
1464+
// single node sort to parallel local sort and merge. Therefore, it should be run
1465+
// after the Repartition.
1466+
// - Since it will change the output ordering of some operators, it should be run
1467+
// before JoinSelection and BasicEnforcement, which may depend on that.
1468+
Arc::new(GlobalSortSelection::new()),
1469+
// Statistics-base join selection will change the Auto mode to real join implementation,
1470+
// like collect left, or hash join, or future sort merge join, which will
1471+
// influence the BasicEnforcement to decide whether to add additional repartition
1472+
// and local sort to meet the distribution and ordering requirements.
1473+
// Therefore, it should be run before BasicEnforcement
1474+
Arc::new(JoinSelection::new()),
1475+
// If the query is processing infinite inputs, the PipelineFixer rule applies the
1476+
// necessary transformations to make the query runnable (if it is not already runnable).
1477+
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
1478+
// Since the transformations it applies may alter output partitioning properties of operators
1479+
// (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
1480+
Arc::new(PipelineFixer::new()),
1481+
// It's for adding essential repartition and local sorting operator to satisfy the
1482+
// required distribution and local sort.
1483+
// Please make sure that the whole plan tree is determined.
1484+
Arc::new(BasicEnforcement::new()),
1485+
// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
1486+
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
1487+
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
1488+
// rule below performs this analysis and removes unnecessary `SortExec`s.
1489+
Arc::new(OptimizeSorts::new()),
1490+
// It will not influence the distribution and ordering of the whole plan tree.
1491+
// Therefore, to avoid influencing other rules, it should be run at last.
1492+
Arc::new(CoalesceBatches::new()),
1493+
// The PipelineChecker rule will reject non-runnable query plans that use
1494+
// pipeline-breaking operators on infinite input(s). The rule generates a
1495+
// diagnostic error message when this happens. It makes no changes to the
1496+
// given query plan; i.e. it only acts as a final gatekeeping rule.
1497+
Arc::new(PipelineChecker::new()),
1498+
];
1499+
15051500
SessionState {
15061501
session_id,
15071502
optimizer: Optimizer::new(),

datafusion/core/src/physical_optimizer/coalesce_batches.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,25 @@ use std::sync::Arc;
3232
/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
3333
/// are produced by highly selective filters
3434
#[derive(Default)]
35-
pub struct CoalesceBatches {
36-
/// Target batch size
37-
target_batch_size: usize,
38-
}
35+
pub struct CoalesceBatches {}
3936

4037
impl CoalesceBatches {
4138
#[allow(missing_docs)]
42-
pub fn new(target_batch_size: usize) -> Self {
43-
Self { target_batch_size }
39+
pub fn new() -> Self {
40+
Self::default()
4441
}
4542
}
4643
impl PhysicalOptimizerRule for CoalesceBatches {
4744
fn optimize(
4845
&self,
4946
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
50-
_config: &ConfigOptions,
47+
config: &ConfigOptions,
5148
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
52-
let target_batch_size = self.target_batch_size;
49+
if !config.execution.coalesce_batches {
50+
return Ok(plan);
51+
}
52+
53+
let target_batch_size = config.execution.batch_size;
5354
plan.transform_up(&|plan| {
5455
let plan_any = plan.as_any();
5556
// The goal here is to detect operators that could produce small batches and only

datafusion/core/src/physical_optimizer/repartition.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,9 @@ impl PhysicalOptimizerRule for Repartition {
211211
config: &ConfigOptions,
212212
) -> Result<Arc<dyn ExecutionPlan>> {
213213
let target_partitions = config.execution.target_partitions;
214+
let enabled = config.optimizer.enable_round_robin_repartition;
214215
// Don't run optimizer if target_partitions == 1
215-
if target_partitions == 1 {
216+
if !enabled || target_partitions == 1 {
216217
Ok(plan)
217218
} else {
218219
optimize_partitions(

0 commit comments

Comments
 (0)