Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
use datafusion_sql::{ResolvedTableReference, TableReference};

use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;

use crate::config::{
Expand All @@ -82,6 +81,7 @@ use crate::config::{
};
use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_optimizer::enforcement::BasicEnforcement;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udaf::AggregateUDF;
Expand Down Expand Up @@ -1207,6 +1207,8 @@ pub struct SessionConfig {
pub parquet_pruning: bool,
/// Should DataFusion collect statistics after listing files
pub collect_statistics: bool,
/// Should DataFusion optimizer run a top down process to reorder the join keys
pub top_down_join_key_reordering: bool,
/// Configuration options
pub config_options: Arc<RwLock<ConfigOptions>>,
/// Opaque extensions.
Expand All @@ -1226,6 +1228,7 @@ impl Default for SessionConfig {
repartition_windows: true,
parquet_pruning: true,
collect_statistics: false,
top_down_join_key_reordering: true,
config_options: Arc::new(RwLock::new(ConfigOptions::new())),
// Assume no extensions by default.
extensions: HashMap::with_capacity_and_hasher(
Expand Down Expand Up @@ -1548,6 +1551,7 @@ impl SessionState {
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
];
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason that BasicEnforcement must be run twice?

Copy link
Contributor Author

@mingmwang mingmwang Nov 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is to cover the case that Repartition rule could add additional RepartitionExec with RoundRobin partitioning, and to make sure the SinglePartition is satisfied, we run the BasicEnforcement again. Originally it was the AddCoalescePartitionsExec here, now I replace AddCoalescePartitionsExec with BasicEnforcement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool -- perhaps you can add a comment to explain the rationale

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will do it in the following PR.

if config
.config_options
.read()
Expand All @@ -1565,7 +1569,8 @@ impl SessionState {
)));
}
physical_optimizers.push(Arc::new(Repartition::new()));
physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
// physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new rule BasicEnforcement had already covered the work of AddCoalescePartitionsExec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this call is removed, there is no other code that calls AddCoalescePartitionsExec and thus I think we should remove that entire module. We could do so as a follow on PR if you want to keep this one smaller

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will do it in the following PR.


SessionState {
session_id,
Expand Down
31 changes: 11 additions & 20 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
repartition::RepartitionExec, with_new_children_if_necessary,
repartition::RepartitionExec, rewrite::TreeNodeRewritable,
},
};
use std::sync::Arc;
Expand All @@ -48,34 +48,25 @@ impl PhysicalOptimizerRule for CoalesceBatches {
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
_config: &crate::execution::context::SessionConfig,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if plan.children().is_empty() {
// leaf node, children cannot be replaced
Ok(plan.clone())
} else {
// recurse down first
let children = plan
.children()
.iter()
.map(|child| self.optimize(child.clone(), _config))
.collect::<Result<Vec<_>>>()?;
let plan = with_new_children_if_necessary(plan, children)?;
let target_batch_size = self.target_batch_size;
plan.transform_up(&|plan| {
let plan_any = plan.as_any();
// The goal here is to detect operators that could produce small batches and only
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
// would be to build the coalescing logic directly into the operators
// See https://github.com/apache/arrow-datafusion/issues/139
let plan_any = plan.as_any();
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();
Ok(if wrap_in_coalesce {
Arc::new(CoalesceBatchesExec::new(
if wrap_in_coalesce {
Some(Arc::new(CoalesceBatchesExec::new(
plan.clone(),
self.target_batch_size,
))
target_batch_size,
)))
} else {
plan.clone()
})
}
None
}
})
}

fn name(&self) -> &str {
Expand Down
Loading