Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 0 additions & 46 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1929,52 +1929,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn partition_aware_union() -> Result<()> {
let left = test_table().await?.select_columns(&["c1", "c2"])?;
let right = test_table_with_name("c2")
.await?
.select_columns(&["c1", "c3"])?
.with_column_renamed("c2.c1", "c2_c1")?;

let left_rows = left.clone().collect().await?;
let right_rows = right.clone().collect().await?;
let join1 = left.clone().join(
right.clone(),
JoinType::Inner,
&["c1"],
&["c2_c1"],
None,
)?;
let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?;

let union = join1.union(join2)?;

let union_rows = union.clone().collect().await?;

assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());

let physical_plan = union.create_physical_plan().await?;
let default_partition_count = SessionConfig::new().target_partitions();

// For partition aware union, the output partition count should not be changed.
assert_eq!(
physical_plan.output_partitioning().partition_count(),
default_partition_count
);
// For partition aware union, the output partition is the same with the union's inputs
for child in physical_plan.children() {
assert_eq!(
physical_plan.output_partitioning(),
child.output_partitioning()
);
}

Ok(())
}

#[tokio::test]
async fn non_partition_aware_union() -> Result<()> {
let left = test_table().await?.select_columns(&["c1", "c2"])?;
Expand Down
147 changes: 17 additions & 130 deletions datafusion/core/src/physical_plan/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@

//! The Union operator combines multiple inputs with the same schema

use std::pin::Pin;
use std::task::{Context, Poll};
use std::{any::Any, sync::Arc};

use arrow::{
datatypes::{Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_common::{DFSchemaRef, DataFusionError};
use futures::{Stream, StreamExt};
use futures::StreamExt;
use itertools::Itertools;
use log::debug;
use log::warn;
Expand All @@ -47,7 +45,6 @@ use crate::{
error::Result,
physical_plan::{expressions, metrics::BaselineMetrics},
};
use tokio::macros::support::thread_rng_n;

/// `UnionExec`: `UNION ALL` execution plan.
///
Expand Down Expand Up @@ -94,8 +91,6 @@ pub struct UnionExec {
metrics: ExecutionPlanMetricsSet,
/// Schema of Union
schema: SchemaRef,
/// Partition aware Union
partition_aware: bool,
}

impl UnionExec {
Expand Down Expand Up @@ -154,24 +149,10 @@ impl UnionExec {
inputs[0].schema().metadata().clone(),
));

// If all the input partitions have the same Hash partition spec with the first_input_partition
// The UnionExec is partition aware.
//
// It might be too strict here in the case that the input partition specs are compatible but not exactly the same.
// For example one input partition has the partition spec Hash('a','b','c') and
// other has the partition spec Hash('a'), It is safe to derive the out partition with the spec Hash('a','b','c').
let first_input_partition = inputs[0].output_partitioning();
let partition_aware = matches!(first_input_partition, Partitioning::Hash(_, _))
&& inputs
.iter()
.map(|plan| plan.output_partitioning())
.all(|partition| partition == first_input_partition);

UnionExec {
inputs,
metrics: ExecutionPlanMetricsSet::new(),
schema,
partition_aware,
}
}

Expand Down Expand Up @@ -204,28 +185,20 @@ impl ExecutionPlan for UnionExec {

/// Output of the union is the combination of all output partitions of the inputs
fn output_partitioning(&self) -> Partitioning {
if self.partition_aware {
self.inputs[0].output_partitioning()
} else {
// Output the combination of all output partitions of the inputs if the Union is not partition aware
let num_partitions = self
.inputs
.iter()
.map(|plan| plan.output_partitioning().partition_count())
.sum();
// Output the combination of all output partitions of the inputs if the Union is not partition aware
let num_partitions = self
.inputs
.iter()
.map(|plan| plan.output_partitioning().partition_count())
.sum();

Partitioning::UnknownPartitioning(num_partitions)
}
Partitioning::UnknownPartitioning(num_partitions)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
// If the Union is partition aware, there is no output ordering.
// Otherwise, the output ordering is the "meet" of its input orderings.
// The output ordering is the "meet" of its input orderings.
// The meet is the finest ordering that satisfied by all the input
// orderings, see https://en.wikipedia.org/wiki/Join_and_meet.
if self.partition_aware {
return None;
}
get_meet_of_orderings(&self.inputs)
}

Expand Down Expand Up @@ -273,34 +246,15 @@ impl ExecutionPlan for UnionExec {
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer(); // record on drop

if self.partition_aware {
let mut input_stream_vec = vec![];
for input in self.inputs.iter() {
if partition < input.output_partitioning().partition_count() {
input_stream_vec.push(input.execute(partition, context.clone())?);
} else {
// Do not find a partition to execute
break;
}
}
if input_stream_vec.len() == self.inputs.len() {
let stream = Box::pin(CombinedRecordBatchStream::new(
self.schema(),
input_stream_vec,
));
// find partition to execute
for input in self.inputs.iter() {
// Calculate whether partition belongs to the current partition
if partition < input.output_partitioning().partition_count() {
let stream = input.execute(partition, context)?;
debug!("Found a Union partition to execute");
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
}
} else {
// find partition to execute
for input in self.inputs.iter() {
// Calculate whether partition belongs to the current partition
if partition < input.output_partitioning().partition_count() {
let stream = input.execute(partition, context)?;
debug!("Found a Union partition to execute");
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
} else {
partition -= input.output_partitioning().partition_count();
}
} else {
partition -= input.output_partitioning().partition_count();
}
}

Expand Down Expand Up @@ -340,73 +294,6 @@ impl ExecutionPlan for UnionExec {
}
}

/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one
pub struct CombinedRecordBatchStream {
/// Schema wrapped by Arc
schema: SchemaRef,
/// Stream entries
entries: Vec<SendableRecordBatchStream>,
}

impl CombinedRecordBatchStream {
/// Create an CombinedRecordBatchStream
pub fn new(schema: SchemaRef, entries: Vec<SendableRecordBatchStream>) -> Self {
Self { schema, entries }
}
}

impl RecordBatchStream for CombinedRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

impl Stream for CombinedRecordBatchStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
use Poll::*;

let start = thread_rng_n(self.entries.len() as u32) as usize;
let mut idx = start;

for _ in 0..self.entries.len() {
let stream = self.entries.get_mut(idx).unwrap();

match Pin::new(stream).poll_next(cx) {
Ready(Some(val)) => return Ready(Some(val)),
Ready(None) => {
// Remove the entry
self.entries.swap_remove(idx);

// Check if this was the last entry, if so the cursor needs
// to wrap
if idx == self.entries.len() {
idx = 0;
} else if idx < start && start <= self.entries.len() {
// The stream being swapped into the current index has
// already been polled, so skip it.
idx = idx.wrapping_add(1) % self.entries.len();
}
}
Pending => {
idx = idx.wrapping_add(1) % self.entries.len();
}
}
}

// If the map is empty, then the stream is complete.
if self.entries.is_empty() {
Ready(None)
} else {
Pending
}
}
}

/// Stream wrapper that records `BaselineMetrics` for a particular
/// partition
struct ObservedStream {
Expand Down
81 changes: 81 additions & 0 deletions datafusion/core/tests/sql/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,84 @@ async fn sort_with_duplicate_sort_exprs() -> Result<()> {

Ok(())
}

/// Minimal test case for https://github.com/apache/arrow-datafusion/issues/5970
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_issue5970_mini() -> Result<()> {
let config = SessionConfig::new()
.with_target_partitions(2)
.with_repartition_sorts(true);
let ctx = SessionContext::with_config(config);
let sql = "
WITH
m0(t) AS (
VALUES (0), (1), (2)),

m1(t) AS (
VALUES (0), (1)),

u AS (
SELECT 0 as m, t FROM m0 GROUP BY 1, 2),

v AS (
SELECT 1 as m, t FROM m1 GROUP BY 1, 2)
SELECT * FROM u
UNION ALL
SELECT * FROM v
ORDER BY 1, 2;
";

// check phys. plan
let dataframe = ctx.sql(sql).await.unwrap();
let plan = dataframe.into_optimized_plan().unwrap();
let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
let expected = vec![
"SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
" UnionExec",
" SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
" ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name: \"Int64(0)\", index: 0 }, Column { name: \"t\", index: 1 }], 2), input_partitions=2",
" AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
" SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
" ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name: \"Int64(1)\", index: 0 }, Column { name: \"t\", index: 1 }], 2), input_partitions=2",
" AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
];
let formatted = displayable(plan.as_ref()).indent().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
expected, actual,
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

// sometimes it "just works"
for i in 0..10 {
println!("run: {i}");
let actual = execute_to_batches(&ctx, sql).await;

// in https://github.com/apache/arrow-datafusion/issues/5970 the order of the output was sometimes not right
let expected = vec![
"+---+---+",
"| m | t |",
"+---+---+",
"| 0 | 0 |",
"| 0 | 1 |",
"| 0 | 2 |",
"| 1 | 0 |",
"| 1 | 1 |",
"+---+---+",
];
assert_batches_eq!(expected, &actual);
}
Ok(())
}