From 085c6f678ee0be07ab7b842ecb3a6f531f771877 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Sun, 6 Apr 2025 18:46:31 +0800 Subject: [PATCH 1/3] fix: union all by name --- datafusion/physical-plan/src/stream.rs | 29 ++++++- datafusion/physical-plan/src/union.rs | 11 ++- .../sqllogictest/test_files/union_by_name.slt | 87 +------------------ 3 files changed, 39 insertions(+), 88 deletions(-) diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index 338ac7d048a33..09175bf84e0d5 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -362,6 +362,8 @@ pin_project! { #[pin] stream: S, + + transform_schema: bool, } } @@ -389,7 +391,19 @@ impl RecordBatchStreamAdapter { /// // ... /// ``` pub fn new(schema: SchemaRef, stream: S) -> Self { - Self { schema, stream } + Self { + schema, + stream, + transform_schema: false, + } + } + + pub fn new_with_transform_schema(schema: SchemaRef, stream: S) -> Self { + Self { + schema, + stream, + transform_schema: true, + } } } @@ -408,7 +422,18 @@ where type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().stream.poll_next(cx) + let this = self.project(); + let transform_schema = *this.transform_schema; + let schema = Arc::clone(this.schema); + let ret = this.stream.poll_next(cx); + if transform_schema { + if let Poll::Ready(Some(Ok(batch))) = ret { + return Poll::Ready(Some(batch.with_schema(schema).map_err(|e| { + datafusion_common::DataFusionError::ArrowError(e, None) + }))); + } + } + ret } fn size_hint(&self) -> (usize, Option) { diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 2b666093f29e0..825fd7fffb44f 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -37,7 +37,7 @@ use crate::execution_plan::{ }; use crate::metrics::BaselineMetrics; use crate::projection::{make_with_child, ProjectionExec}; -use crate::stream::ObservedStream; +use crate::stream::{ObservedStream, RecordBatchStreamAdapter}; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; @@ -236,8 +236,15 @@ impl ExecutionPlan for UnionExec { for input in self.inputs.iter() { // Calculate whether partition belongs to the current partition if partition < input.output_partitioning().partition_count() { - let stream = input.execute(partition, context)?; + let mut stream = input.execute(partition, context)?; debug!("Found a Union partition to execute"); + if self.schema() != stream.schema() { + stream = + Box::pin(RecordBatchStreamAdapter::new_with_transform_schema( + self.schema(), + stream, + )); + } return Ok(Box::pin(ObservedStream::new( stream, baseline_metrics, diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index 9572e6efc3e67..52b6fcf9f72ce 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -335,89 +335,8 @@ a b c a b c -# FIXME: The following should pass without error, but currently it is failing -# due to differing record batch schemas when the SLT runner collects results. -# This is due to the following issue: https://github.com/apache/datafusion/issues/15394#issue-2943811768 -# -# More context can be found here: https://github.com/apache/datafusion/pull/15242#issuecomment-2746563234 -query error +query TTTT rowsort select x, y, z from t3 union all by name select z, y, x, 'd' as zz from t3; ---- -DataFusion error: Internal error: Schema mismatch. Previously had -Schema { - fields: [ - Field { - name: "x", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "y", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "z", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "zz", - data_type: Utf8, - nullable: false, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - ], - metadata: {}, -} - -Got: -Schema { - fields: [ - Field { - name: "x", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "y", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "z", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "zz", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - ], - metadata: {}, -}. -This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker +a b c NULL +a b c d From 9e9c37d12dc98656991b5f908df048bc07aedabd Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Wed, 9 Apr 2025 08:15:56 +0800 Subject: [PATCH 2/3] use try_new_with_options --- datafusion/physical-plan/src/stream.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index 09175bf84e0d5..e8c7c13da7eb4 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -26,6 +26,7 @@ use super::metrics::BaselineMetrics; use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream}; use crate::displayable; +use arrow::array::RecordBatchOptions; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::{exec_err, Result}; use datafusion_common_runtime::JoinSet; @@ -428,7 +429,13 @@ where let ret = this.stream.poll_next(cx); if transform_schema { if let Poll::Ready(Some(Ok(batch))) = ret { - return Poll::Ready(Some(batch.with_schema(schema).map_err(|e| { + let options = RecordBatchOptions::default().with_row_count(Some(batch.num_rows())); + return Poll::Ready(Some(RecordBatch::try_new_with_options( + schema, + batch.columns().to_vec(), + &options, + ) + .map_err(|e| { datafusion_common::DataFusionError::ArrowError(e, None) }))); } From 232a7bfa865ad59614fc7a6b86fcf76fc29064ed Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Wed, 9 Apr 2025 09:12:11 +0800 Subject: [PATCH 3/3] fmt --- datafusion/physical-plan/src/stream.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index e8c7c13da7eb4..6ffe7732da5ca 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -429,15 +429,16 @@ where let ret = this.stream.poll_next(cx); if transform_schema { if let Poll::Ready(Some(Ok(batch))) = ret { - let options = RecordBatchOptions::default().with_row_count(Some(batch.num_rows())); - return Poll::Ready(Some(RecordBatch::try_new_with_options( - schema, - batch.columns().to_vec(), - &options, - ) - .map_err(|e| { - datafusion_common::DataFusionError::ArrowError(e, None) - }))); + let options = + RecordBatchOptions::default().with_row_count(Some(batch.num_rows())); + return Poll::Ready(Some( + RecordBatch::try_new_with_options( + schema, + batch.columns().to_vec(), + &options, + ) + .map_err(|e| datafusion_common::DataFusionError::ArrowError(e, None)), + )); } } ret