diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index affe432830f87..f74a7f2e93e35 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -29,6 +29,7 @@ use crate::physical_plan::joins::{ use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortOptions; +use crate::physical_plan::union::{can_interleave, InterleaveExec, UnionExec}; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::Partitioning; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; @@ -825,6 +826,35 @@ fn ensure_distribution( return Ok(Transformed::No(plan)); } + // special case for UnionExec: We want to "bubble up" hash-partitioned data. So instead of: + // + // Agg: + // Repartition (hash): + // Union: + // - Agg: + // Repartition (hash): + // Data + // - Agg: + // Repartition (hash): + // Data + // + // We can use: + // + // Agg: + // Interleave: + // - Agg: + // Repartition (hash): + // Data + // - Agg: + // Repartition (hash): + // Data + if let Some(union_exec) = plan.as_any().downcast_ref::() { + if can_interleave(union_exec.inputs()) { + let plan = InterleaveExec::try_new(union_exec.inputs().clone())?; + return Ok(Transformed::Yes(Arc::new(plan))); + } + } + let required_input_distributions = plan.required_input_distribution(); let children: Vec> = plan.children(); assert_eq!(children.len(), required_input_distributions.len()); @@ -2134,4 +2164,42 @@ mod tests { assert_optimized!(expected, exec); Ok(()) } + + #[test] + fn union_to_interleave() -> Result<()> { + // group by (a as a1) + let left = aggregate_exec_with_alias( + parquet_exec(), + vec![("a".to_string(), "a1".to_string())], + ); + // group by (a as a2) + let right = aggregate_exec_with_alias( + parquet_exec(), + vec![("a".to_string(), "a1".to_string())], + ); + + // Union + let plan = Arc::new(UnionExec::new(vec![left, right])); + + // final agg + let plan = + aggregate_exec_with_alias(plan, vec![("a1".to_string(), "a2".to_string())]); + + // Only two RepartitionExecs added, no final RepartionExec required + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]", + "AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]", + "InterleaveExec", + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized!(expected, plan); + Ok(()) + } } diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index 3d4522272a521..0448813240895 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -94,8 +94,6 @@ pub struct UnionExec { metrics: ExecutionPlanMetricsSet, /// Schema of Union schema: SchemaRef, - /// Partition aware Union - partition_aware: bool, } impl UnionExec { @@ -133,45 +131,12 @@ impl UnionExec { /// Create a new UnionExec pub fn new(inputs: Vec>) -> Self { - let fields: Vec = (0..inputs[0].schema().fields().len()) - .map(|i| { - inputs - .iter() - .filter_map(|input| { - if input.schema().fields().len() > i { - Some(input.schema().field(i).clone()) - } else { - None - } - }) - .find_or_first(|f| f.is_nullable()) - .unwrap() - }) - .collect(); - - let schema = Arc::new(Schema::new_with_metadata( - fields, - inputs[0].schema().metadata().clone(), - )); - - // If all the input partitions have the same Hash partition spec with the first_input_partition - // The UnionExec is partition aware. - // - // It might be too strict here in the case that the input partition specs are compatible but not exactly the same. - // For example one input partition has the partition spec Hash('a','b','c') and - // other has the partition spec Hash('a'), It is safe to derive the out partition with the spec Hash('a','b','c'). - let first_input_partition = inputs[0].output_partitioning(); - let partition_aware = matches!(first_input_partition, Partitioning::Hash(_, _)) - && inputs - .iter() - .map(|plan| plan.output_partitioning()) - .all(|partition| partition == first_input_partition); + let schema = union_schema(&inputs); UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), schema, - partition_aware, } } @@ -204,28 +169,20 @@ impl ExecutionPlan for UnionExec { /// Output of the union is the combination of all output partitions of the inputs fn output_partitioning(&self) -> Partitioning { - if self.partition_aware { - self.inputs[0].output_partitioning() - } else { - // Output the combination of all output partitions of the inputs if the Union is not partition aware - let num_partitions = self - .inputs - .iter() - .map(|plan| plan.output_partitioning().partition_count()) - .sum(); + // Output the combination of all output partitions of the inputs if the Union is not partition aware + let num_partitions = self + .inputs + .iter() + .map(|plan| plan.output_partitioning().partition_count()) + .sum(); - Partitioning::UnknownPartitioning(num_partitions) - } + Partitioning::UnknownPartitioning(num_partitions) } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - // If the Union is partition aware, there is no output ordering. - // Otherwise, the output ordering is the "meet" of its input orderings. + // The output ordering is the "meet" of its input orderings. // The meet is the finest ordering that satisfied by all the input // orderings, see https://en.wikipedia.org/wiki/Join_and_meet. - if self.partition_aware { - return None; - } get_meet_of_orderings(&self.inputs) } @@ -273,34 +230,15 @@ impl ExecutionPlan for UnionExec { let elapsed_compute = baseline_metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); // record on drop - if self.partition_aware { - let mut input_stream_vec = vec![]; - for input in self.inputs.iter() { - if partition < input.output_partitioning().partition_count() { - input_stream_vec.push(input.execute(partition, context.clone())?); - } else { - // Do not find a partition to execute - break; - } - } - if input_stream_vec.len() == self.inputs.len() { - let stream = Box::pin(CombinedRecordBatchStream::new( - self.schema(), - input_stream_vec, - )); + // find partition to execute + for input in self.inputs.iter() { + // Calculate whether partition belongs to the current partition + if partition < input.output_partitioning().partition_count() { + let stream = input.execute(partition, context)?; + debug!("Found a Union partition to execute"); return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); - } - } else { - // find partition to execute - for input in self.inputs.iter() { - // Calculate whether partition belongs to the current partition - if partition < input.output_partitioning().partition_count() { - let stream = input.execute(partition, context)?; - debug!("Found a Union partition to execute"); - return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); - } else { - partition -= input.output_partitioning().partition_count(); - } + } else { + partition -= input.output_partitioning().partition_count(); } } @@ -340,8 +278,224 @@ impl ExecutionPlan for UnionExec { } } +/// Combines multiple input streams by interleaving them. +/// +/// This only works if all inputs have the same hash-partitioning. +/// +/// # Data Flow +/// ```text +/// +---------+ +/// | |---+ +/// | Input 1 | | +/// | |-------------+ +/// +---------+ | | +/// | | +---------+ +/// +------------------>| | +/// +---------------->| Combine |--> +/// | +-------------->| | +/// | | | +---------+ +/// +---------+ | | | +/// | |-----+ | | +/// | Input 2 | | | +/// | |---------------+ +/// +---------+ | | | +/// | | | +---------+ +/// | +-------->| | +/// | +------>| Combine |--> +/// | +---->| | +/// | | +---------+ +/// +---------+ | | +/// | |-------+ | +/// | Input 3 | | +/// | |-----------------+ +/// +---------+ +/// ``` +#[derive(Debug)] +pub struct InterleaveExec { + /// Input execution plan + inputs: Vec>, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, + /// Schema of Interleave + schema: SchemaRef, +} + +impl InterleaveExec { + /// Create a new InterleaveExec + pub fn try_new(inputs: Vec>) -> Result { + let schema = union_schema(&inputs); + + if !can_interleave(&inputs) { + return Err(DataFusionError::Internal(String::from( + "Not all InterleaveExec children have a consistent hash partitioning", + ))); + } + + Ok(InterleaveExec { + inputs, + metrics: ExecutionPlanMetricsSet::new(), + schema, + }) + } + + /// Get inputs of the execution plan + pub fn inputs(&self) -> &Vec> { + &self.inputs + } +} + +impl ExecutionPlan for InterleaveExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + /// Specifies whether this plan generates an infinite stream of records. + /// If the plan does not support pipelining, but it its input(s) are + /// infinite, returns an error to indicate this. + fn unbounded_output(&self, children: &[bool]) -> Result { + Ok(children.iter().any(|x| *x)) + } + + fn children(&self) -> Vec> { + self.inputs.clone() + } + + /// All inputs must have the same partitioning. The output partioning of InterleaveExec is the same as the inputs + /// (NOT combined). E.g. if there are 10 inputs where each is `Hash(3)`-partitioned, InterleaveExec is also + /// `Hash(3)`-partitioned. + fn output_partitioning(&self) -> Partitioning { + self.inputs[0].output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn maintains_input_order(&self) -> Vec { + vec![false; self.inputs().len()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(InterleaveExec::try_new(children)?)) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + debug!("Start InterleaveExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + // record the tiny amount of work done in this function so + // elapsed_compute is reported as non zero + let elapsed_compute = baseline_metrics.elapsed_compute().clone(); + let _timer = elapsed_compute.timer(); // record on drop + + let mut input_stream_vec = vec![]; + for input in self.inputs.iter() { + if partition < input.output_partitioning().partition_count() { + input_stream_vec.push(input.execute(partition, context.clone())?); + } else { + // Do not find a partition to execute + break; + } + } + if input_stream_vec.len() == self.inputs.len() { + let stream = Box::pin(CombinedRecordBatchStream::new( + self.schema(), + input_stream_vec, + )); + return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); + } + + warn!("Error in InterleaveExec: Partition {} not found", partition); + + Err(crate::error::DataFusionError::Execution(format!( + "Partition {partition} not found in InterleaveExec" + ))) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "InterleaveExec") + } + } + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Statistics { + self.inputs + .iter() + .map(|ep| ep.statistics()) + .reduce(stats_union) + .unwrap_or_default() + } + + fn benefits_from_input_partitioning(&self) -> bool { + false + } +} + +/// If all the input partitions have the same Hash partition spec with the first_input_partition +/// The InterleaveExec is partition aware. +/// +/// It might be too strict here in the case that the input partition specs are compatible but not exactly the same. +/// For example one input partition has the partition spec Hash('a','b','c') and +/// other has the partition spec Hash('a'), It is safe to derive the out partition with the spec Hash('a','b','c'). +pub fn can_interleave(inputs: &[Arc]) -> bool { + if inputs.is_empty() { + return false; + } + + let first_input_partition = inputs[0].output_partitioning(); + matches!(first_input_partition, Partitioning::Hash(_, _)) + && inputs + .iter() + .map(|plan| plan.output_partitioning()) + .all(|partition| partition == first_input_partition) +} + +fn union_schema(inputs: &[Arc]) -> SchemaRef { + let fields: Vec = (0..inputs[0].schema().fields().len()) + .map(|i| { + inputs + .iter() + .filter_map(|input| { + if input.schema().fields().len() > i { + Some(input.schema().field(i).clone()) + } else { + None + } + }) + .find_or_first(|f| f.is_nullable()) + .unwrap() + }) + .collect(); + + Arc::new(Schema::new_with_metadata( + fields, + inputs[0].schema().metadata().clone(), + )) +} + /// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one -pub struct CombinedRecordBatchStream { +struct CombinedRecordBatchStream { /// Schema wrapped by Arc schema: SchemaRef, /// Stream entries diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 2a05c918b51a0..03e03ae1a28af 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -752,7 +752,11 @@ fn create_sort_merge_join_datatype_context() -> Result { } fn create_union_context() -> Result { - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config( + SessionConfig::new() + .with_target_partitions(4) + .with_batch_size(4096), + ); let t1_schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, true), Field::new("name", DataType::UInt8, true), diff --git a/datafusion/core/tests/sql/order.rs b/datafusion/core/tests/sql/order.rs index da7663cb8b5c5..5691a7cfd664f 100644 --- a/datafusion/core/tests/sql/order.rs +++ b/datafusion/core/tests/sql/order.rs @@ -175,3 +175,80 @@ async fn sort_with_duplicate_sort_exprs() -> Result<()> { Ok(()) } + +/// Minimal test case for https://github.com/apache/arrow-datafusion/issues/5970 +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_issue5970_mini() -> Result<()> { + let config = SessionConfig::new() + .with_target_partitions(2) + .with_repartition_sorts(true); + let ctx = SessionContext::with_config(config); + let sql = " +WITH + m0(t) AS ( + VALUES (0), (1), (2)), + m1(t) AS ( + VALUES (0), (1)), + u AS ( + SELECT 0 as m, t FROM m0 GROUP BY 1, 2), + v AS ( + SELECT 1 as m, t FROM m1 GROUP BY 1, 2) +SELECT * FROM u +UNION ALL +SELECT * FROM v +ORDER BY 1, 2; + "; + + // check phys. plan + let dataframe = ctx.sql(sql).await.unwrap(); + let plan = dataframe.into_optimized_plan().unwrap(); + let plan = ctx.state().create_physical_plan(&plan).await.unwrap(); + let expected = vec![ + "SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]", + " SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]", + " InterleaveExec", + " ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]", + " AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=Hash([Column { name: \"Int64(0)\", index: 0 }, Column { name: \"t\", index: 1 }], 2), input_partitions=2", + " AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", + " ProjectionExec: expr=[column1@0 as t]", + " ValuesExec", + " ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]", + " AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=Hash([Column { name: \"Int64(1)\", index: 0 }, Column { name: \"t\", index: 1 }], 2), input_partitions=2", + " AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", + " ProjectionExec: expr=[column1@0 as t]", + " ValuesExec", + ]; + let formatted = displayable(plan.as_ref()).indent().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + // sometimes it "just works" + for i in 0..10 { + println!("run: {i}"); + let actual = execute_to_batches(&ctx, sql).await; + + // in https://github.com/apache/arrow-datafusion/issues/5970 the order of the output was sometimes not right + let expected = vec![ + "+---+---+", + "| m | t |", + "+---+---+", + "| 0 | 0 |", + "| 0 | 1 |", + "| 0 | 2 |", + "| 1 | 0 |", + "| 1 | 1 |", + "+---+---+", + ]; + assert_batches_eq!(expected, &actual); + } + Ok(()) +} diff --git a/datafusion/core/tests/sql/union.rs b/datafusion/core/tests/sql/union.rs index d56a0a39ffad4..cca9c23e67338 100644 --- a/datafusion/core/tests/sql/union.rs +++ b/datafusion/core/tests/sql/union.rs @@ -140,3 +140,43 @@ async fn test_union_upcast_types() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn union_with_hash_aggregate() -> Result<()> { + let ctx = create_union_context()?; + let sql = "select count(*) from ( + select distinct name from t1 + union all + select distinct name from t2 + ) group by name"; + + let dataframe = ctx.sql(sql).await.unwrap(); + let plan = dataframe.into_optimized_plan().unwrap(); + let plan = ctx.state().create_physical_plan(&plan).await.unwrap(); + let formatted = displayable(plan.as_ref()).indent().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + + let expected = vec![ + "ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]", + " AggregateExec: mode=Single, gby=[name@0 as name], aggr=[COUNT(UInt8(1))]", + " InterleaveExec", + " AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 4), input_partitions=4", + " RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1", + " AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]", + " MemoryExec: partitions=1, partition_sizes=[1]", + " AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 4), input_partitions=4", + " RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1", + " AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]", + " MemoryExec: partitions=1, partition_sizes=[1]", + ]; + + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + Ok(()) +}