diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index d7ce056a339ec..caeb06ffb9d14 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -1929,52 +1929,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn partition_aware_union() -> Result<()> { - let left = test_table().await?.select_columns(&["c1", "c2"])?; - let right = test_table_with_name("c2") - .await? - .select_columns(&["c1", "c3"])? - .with_column_renamed("c2.c1", "c2_c1")?; - - let left_rows = left.clone().collect().await?; - let right_rows = right.clone().collect().await?; - let join1 = left.clone().join( - right.clone(), - JoinType::Inner, - &["c1"], - &["c2_c1"], - None, - )?; - let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?; - - let union = join1.union(join2)?; - - let union_rows = union.clone().collect().await?; - - assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::()); - assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::()); - assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::()); - - let physical_plan = union.create_physical_plan().await?; - let default_partition_count = SessionConfig::new().target_partitions(); - - // For partition aware union, the output partition count should not be changed. - assert_eq!( - physical_plan.output_partitioning().partition_count(), - default_partition_count - ); - // For partition aware union, the output partition is the same with the union's inputs - for child in physical_plan.children() { - assert_eq!( - physical_plan.output_partitioning(), - child.output_partitioning() - ); - } - - Ok(()) - } - #[tokio::test] async fn non_partition_aware_union() -> Result<()> { let left = test_table().await?.select_columns(&["c1", "c2"])?; diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index 3d4522272a521..f761f19655b45 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -21,8 +21,6 @@ //! The Union operator combines multiple inputs with the same schema -use std::pin::Pin; -use std::task::{Context, Poll}; use std::{any::Any, sync::Arc}; use arrow::{ @@ -30,7 +28,7 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::{DFSchemaRef, DataFusionError}; -use futures::{Stream, StreamExt}; +use futures::StreamExt; use itertools::Itertools; use log::debug; use log::warn; @@ -47,7 +45,6 @@ use crate::{ error::Result, physical_plan::{expressions, metrics::BaselineMetrics}, }; -use tokio::macros::support::thread_rng_n; /// `UnionExec`: `UNION ALL` execution plan. /// @@ -94,8 +91,6 @@ pub struct UnionExec { metrics: ExecutionPlanMetricsSet, /// Schema of Union schema: SchemaRef, - /// Partition aware Union - partition_aware: bool, } impl UnionExec { @@ -154,24 +149,10 @@ impl UnionExec { inputs[0].schema().metadata().clone(), )); - // If all the input partitions have the same Hash partition spec with the first_input_partition - // The UnionExec is partition aware. - // - // It might be too strict here in the case that the input partition specs are compatible but not exactly the same. - // For example one input partition has the partition spec Hash('a','b','c') and - // other has the partition spec Hash('a'), It is safe to derive the out partition with the spec Hash('a','b','c'). - let first_input_partition = inputs[0].output_partitioning(); - let partition_aware = matches!(first_input_partition, Partitioning::Hash(_, _)) - && inputs - .iter() - .map(|plan| plan.output_partitioning()) - .all(|partition| partition == first_input_partition); - UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), schema, - partition_aware, } } @@ -204,28 +185,20 @@ impl ExecutionPlan for UnionExec { /// Output of the union is the combination of all output partitions of the inputs fn output_partitioning(&self) -> Partitioning { - if self.partition_aware { - self.inputs[0].output_partitioning() - } else { - // Output the combination of all output partitions of the inputs if the Union is not partition aware - let num_partitions = self - .inputs - .iter() - .map(|plan| plan.output_partitioning().partition_count()) - .sum(); + // Output the combination of all output partitions of the inputs if the Union is not partition aware + let num_partitions = self + .inputs + .iter() + .map(|plan| plan.output_partitioning().partition_count()) + .sum(); - Partitioning::UnknownPartitioning(num_partitions) - } + Partitioning::UnknownPartitioning(num_partitions) } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - // If the Union is partition aware, there is no output ordering. - // Otherwise, the output ordering is the "meet" of its input orderings. + // The output ordering is the "meet" of its input orderings. // The meet is the finest ordering that satisfied by all the input // orderings, see https://en.wikipedia.org/wiki/Join_and_meet. - if self.partition_aware { - return None; - } get_meet_of_orderings(&self.inputs) } @@ -273,34 +246,15 @@ impl ExecutionPlan for UnionExec { let elapsed_compute = baseline_metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); // record on drop - if self.partition_aware { - let mut input_stream_vec = vec![]; - for input in self.inputs.iter() { - if partition < input.output_partitioning().partition_count() { - input_stream_vec.push(input.execute(partition, context.clone())?); - } else { - // Do not find a partition to execute - break; - } - } - if input_stream_vec.len() == self.inputs.len() { - let stream = Box::pin(CombinedRecordBatchStream::new( - self.schema(), - input_stream_vec, - )); + // find partition to execute + for input in self.inputs.iter() { + // Calculate whether partition belongs to the current partition + if partition < input.output_partitioning().partition_count() { + let stream = input.execute(partition, context)?; + debug!("Found a Union partition to execute"); return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); - } - } else { - // find partition to execute - for input in self.inputs.iter() { - // Calculate whether partition belongs to the current partition - if partition < input.output_partitioning().partition_count() { - let stream = input.execute(partition, context)?; - debug!("Found a Union partition to execute"); - return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); - } else { - partition -= input.output_partitioning().partition_count(); - } + } else { + partition -= input.output_partitioning().partition_count(); } } @@ -340,73 +294,6 @@ impl ExecutionPlan for UnionExec { } } -/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one -pub struct CombinedRecordBatchStream { - /// Schema wrapped by Arc - schema: SchemaRef, - /// Stream entries - entries: Vec, -} - -impl CombinedRecordBatchStream { - /// Create an CombinedRecordBatchStream - pub fn new(schema: SchemaRef, entries: Vec) -> Self { - Self { schema, entries } - } -} - -impl RecordBatchStream for CombinedRecordBatchStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -impl Stream for CombinedRecordBatchStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - use Poll::*; - - let start = thread_rng_n(self.entries.len() as u32) as usize; - let mut idx = start; - - for _ in 0..self.entries.len() { - let stream = self.entries.get_mut(idx).unwrap(); - - match Pin::new(stream).poll_next(cx) { - Ready(Some(val)) => return Ready(Some(val)), - Ready(None) => { - // Remove the entry - self.entries.swap_remove(idx); - - // Check if this was the last entry, if so the cursor needs - // to wrap - if idx == self.entries.len() { - idx = 0; - } else if idx < start && start <= self.entries.len() { - // The stream being swapped into the current index has - // already been polled, so skip it. - idx = idx.wrapping_add(1) % self.entries.len(); - } - } - Pending => { - idx = idx.wrapping_add(1) % self.entries.len(); - } - } - } - - // If the map is empty, then the stream is complete. - if self.entries.is_empty() { - Ready(None) - } else { - Pending - } - } -} - /// Stream wrapper that records `BaselineMetrics` for a particular /// partition struct ObservedStream { diff --git a/datafusion/core/tests/sql/order.rs b/datafusion/core/tests/sql/order.rs index da7663cb8b5c5..309a0a7b4866b 100644 --- a/datafusion/core/tests/sql/order.rs +++ b/datafusion/core/tests/sql/order.rs @@ -175,3 +175,84 @@ async fn sort_with_duplicate_sort_exprs() -> Result<()> { Ok(()) } + +/// Minimal test case for https://github.com/apache/arrow-datafusion/issues/5970 +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_issue5970_mini() -> Result<()> { + let config = SessionConfig::new() + .with_target_partitions(2) + .with_repartition_sorts(true); + let ctx = SessionContext::with_config(config); + let sql = " +WITH + m0(t) AS ( + VALUES (0), (1), (2)), + + m1(t) AS ( + VALUES (0), (1)), + + u AS ( + SELECT 0 as m, t FROM m0 GROUP BY 1, 2), + + v AS ( + SELECT 1 as m, t FROM m1 GROUP BY 1, 2) +SELECT * FROM u +UNION ALL +SELECT * FROM v +ORDER BY 1, 2; + "; + + // check phys. plan + let dataframe = ctx.sql(sql).await.unwrap(); + let plan = dataframe.into_optimized_plan().unwrap(); + let plan = ctx.state().create_physical_plan(&plan).await.unwrap(); + let expected = vec![ + "SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]", + " UnionExec", + " SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]", + " ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]", + " AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=Hash([Column { name: \"Int64(0)\", index: 0 }, Column { name: \"t\", index: 1 }], 2), input_partitions=2", + " AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", + " ProjectionExec: expr=[column1@0 as t]", + " ValuesExec", + " SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]", + " ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]", + " AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=Hash([Column { name: \"Int64(1)\", index: 0 }, Column { name: \"t\", index: 1 }], 2), input_partitions=2", + " AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", + " ProjectionExec: expr=[column1@0 as t]", + " ValuesExec", + ]; + let formatted = displayable(plan.as_ref()).indent().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + // sometimes it "just works" + for i in 0..10 { + println!("run: {i}"); + let actual = execute_to_batches(&ctx, sql).await; + + // in https://github.com/apache/arrow-datafusion/issues/5970 the order of the output was sometimes not right + let expected = vec![ + "+---+---+", + "| m | t |", + "+---+---+", + "| 0 | 0 |", + "| 0 | 1 |", + "| 0 | 2 |", + "| 1 | 0 |", + "| 1 | 1 |", + "+---+---+", + ]; + assert_batches_eq!(expected, &actual); + } + Ok(()) +}