Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions datafusion/physical-plan/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use super::metrics::BaselineMetrics;
use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
use crate::displayable;

use arrow::array::RecordBatchOptions;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::{exec_err, Result};
use datafusion_common_runtime::JoinSet;
Expand Down Expand Up @@ -362,6 +363,8 @@ pin_project! {

#[pin]
stream: S,

transform_schema: bool,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like this is fixing the symptom rather than the root cause

I think it would be better to have the correct schema reflected in the plan in the first place 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, correct nullability in schema is better. I tried to fix logical plan before.

But nullability in logical plan won't affect physical plan. it's ignored.

LogicalPlan::Projection(Projection { input, expr, .. }) => self

in physical plan, it will recompute nullaibility from bottom to top.

e.nullable(&input_schema)?,

but in this scenario, it seems that we need to pass nullability from top to bottom.

I need more suggestions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to learn some experience from spark.

for logical plan, I haven't found any logic to handle this problem.

https://github.com/apache/spark/blob/75d80c7795ca71d24229010ab04ae740473126aa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L475

for physical plan, spark is much easier, its InternalRow is schemaless. so it will use the schema of physical plan by default. but recordbatch contains schema.

https://github.com/apache/spark/blob/75d80c7795ca71d24229010ab04ae740473126aa/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala#L688

I'm not 100% sure, I think current logical plan and physical plan schema is correct. the root cause is that recordbatch's schema doesn't match physical plan's. so adding an adapter is a proper way.

}
}

Expand Down Expand Up @@ -389,7 +392,19 @@ impl<S> RecordBatchStreamAdapter<S> {
/// // ...
/// ```
pub fn new(schema: SchemaRef, stream: S) -> Self {
Self { schema, stream }
Self {
schema,
stream,
transform_schema: false,
}
}

pub fn new_with_transform_schema(schema: SchemaRef, stream: S) -> Self {
Self {
schema,
stream,
transform_schema: true,
}
}
}

Expand All @@ -408,7 +423,25 @@ where
type Item = Result<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
let this = self.project();
let transform_schema = *this.transform_schema;
let schema = Arc::clone(this.schema);
let ret = this.stream.poll_next(cx);
if transform_schema {
if let Poll::Ready(Some(Ok(batch))) = ret {
let options =
RecordBatchOptions::default().with_row_count(Some(batch.num_rows()));
return Poll::Ready(Some(
RecordBatch::try_new_with_options(
schema,
batch.columns().to_vec(),
&options,
)
.map_err(|e| datafusion_common::DataFusionError::ArrowError(e, None)),
));
}
}
ret
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down
11 changes: 9 additions & 2 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::execution_plan::{
};
use crate::metrics::BaselineMetrics;
use crate::projection::{make_with_child, ProjectionExec};
use crate::stream::ObservedStream;
use crate::stream::{ObservedStream, RecordBatchStreamAdapter};

use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -236,8 +236,15 @@ impl ExecutionPlan for UnionExec {
for input in self.inputs.iter() {
// Calculate whether partition belongs to the current partition
if partition < input.output_partitioning().partition_count() {
let stream = input.execute(partition, context)?;
let mut stream = input.execute(partition, context)?;
debug!("Found a Union partition to execute");
if self.schema() != stream.schema() {
stream =
Box::pin(RecordBatchStreamAdapter::new_with_transform_schema(
self.schema(),
stream,
));
}
return Ok(Box::pin(ObservedStream::new(
stream,
baseline_metrics,
Expand Down
87 changes: 3 additions & 84 deletions datafusion/sqllogictest/test_files/union_by_name.slt
Original file line number Diff line number Diff line change
Expand Up @@ -335,89 +335,8 @@ a b c
a b c


# FIXME: The following should pass without error, but currently it is failing
# due to differing record batch schemas when the SLT runner collects results.
# This is due to the following issue: https://github.com/apache/datafusion/issues/15394#issue-2943811768
#
# More context can be found here: https://github.com/apache/datafusion/pull/15242#issuecomment-2746563234
query error
query TTTT rowsort
select x, y, z from t3 union all by name select z, y, x, 'd' as zz from t3;
----
DataFusion error: Internal error: Schema mismatch. Previously had
Schema {
fields: [
Field {
name: "x",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "y",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "z",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "zz",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
}

Got:
Schema {
fields: [
Field {
name: "x",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "y",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "z",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "zz",
data_type: Utf8,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
metadata: {},
}.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
a b c NULL
a b c d