diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index 338ac7d048a33..6ffe7732da5ca 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -26,6 +26,7 @@ use super::metrics::BaselineMetrics; use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream}; use crate::displayable; +use arrow::array::RecordBatchOptions; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::{exec_err, Result}; use datafusion_common_runtime::JoinSet; @@ -362,6 +363,8 @@ pin_project! { #[pin] stream: S, + + transform_schema: bool, } } @@ -389,7 +392,19 @@ impl RecordBatchStreamAdapter { /// // ... /// ``` pub fn new(schema: SchemaRef, stream: S) -> Self { - Self { schema, stream } + Self { + schema, + stream, + transform_schema: false, + } + } + + pub fn new_with_transform_schema(schema: SchemaRef, stream: S) -> Self { + Self { + schema, + stream, + transform_schema: true, + } } } @@ -408,7 +423,25 @@ where type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().stream.poll_next(cx) + let this = self.project(); + let transform_schema = *this.transform_schema; + let schema = Arc::clone(this.schema); + let ret = this.stream.poll_next(cx); + if transform_schema { + if let Poll::Ready(Some(Ok(batch))) = ret { + let options = + RecordBatchOptions::default().with_row_count(Some(batch.num_rows())); + return Poll::Ready(Some( + RecordBatch::try_new_with_options( + schema, + batch.columns().to_vec(), + &options, + ) + .map_err(|e| datafusion_common::DataFusionError::ArrowError(e, None)), + )); + } + } + ret } fn size_hint(&self) -> (usize, Option) { diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 2b666093f29e0..825fd7fffb44f 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -37,7 +37,7 @@ use crate::execution_plan::{ }; use crate::metrics::BaselineMetrics; use crate::projection::{make_with_child, ProjectionExec}; -use crate::stream::ObservedStream; +use crate::stream::{ObservedStream, RecordBatchStreamAdapter}; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; @@ -236,8 +236,15 @@ impl ExecutionPlan for UnionExec { for input in self.inputs.iter() { // Calculate whether partition belongs to the current partition if partition < input.output_partitioning().partition_count() { - let stream = input.execute(partition, context)?; + let mut stream = input.execute(partition, context)?; debug!("Found a Union partition to execute"); + if self.schema() != stream.schema() { + stream = + Box::pin(RecordBatchStreamAdapter::new_with_transform_schema( + self.schema(), + stream, + )); + } return Ok(Box::pin(ObservedStream::new( stream, baseline_metrics, diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index 9572e6efc3e67..52b6fcf9f72ce 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -335,89 +335,8 @@ a b c a b c -# FIXME: The following should pass without error, but currently it is failing -# due to differing record batch schemas when the SLT runner collects results. -# This is due to the following issue: https://github.com/apache/datafusion/issues/15394#issue-2943811768 -# -# More context can be found here: https://github.com/apache/datafusion/pull/15242#issuecomment-2746563234 -query error +query TTTT rowsort select x, y, z from t3 union all by name select z, y, x, 'd' as zz from t3; ---- -DataFusion error: Internal error: Schema mismatch. Previously had -Schema { - fields: [ - Field { - name: "x", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "y", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "z", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "zz", - data_type: Utf8, - nullable: false, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - ], - metadata: {}, -} - -Got: -Schema { - fields: [ - Field { - name: "x", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "y", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "z", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - Field { - name: "zz", - data_type: Utf8, - nullable: true, - dict_id: 0, - dict_is_ordered: false, - metadata: {}, - }, - ], - metadata: {}, -}. -This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker +a b c NULL +a b c d