diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 8dbf41c37f4d1..9ca6941a61ce6 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1186,12 +1186,20 @@ impl LogicalPlan { options, .. }) => { - self.assert_no_expressions(expr)?; + let exec_columns = if expr.is_empty() { + columns.clone() + } else { + expr.into_iter() + .map(|e| match e { + Expr::Column(c) => Ok(c), + other => internal_err!( + "Expected Expr::Column for Unnest exec_columns, got {other:?}" + ), + }) + .collect::>>()? + }; let input = self.only_input(inputs)?; - // Update schema with unnested column type. - let new_plan = - unnest_with_options(input, columns.clone(), options.clone())?; - Ok(new_plan) + Ok(unnest_with_options(input, exec_columns, options.clone())?) } } } @@ -6602,4 +6610,53 @@ mod tests { Ok(()) } + + #[test] + fn test_unnest_with_new_exprs_accepts_expressions() -> Result<()> { + use crate::LogicalPlanBuilder; + use arrow::datatypes::{DataType, Field, Schema}; + + let schema = Schema::new(vec![ + Field::new("list_col", DataType::new_list(DataType::Int32, true), true), + Field::new("other_col", DataType::Int32, true), + ]); + let plan = table_scan(Some("t"), &schema, None)?.build()?; + let unnest_plan = LogicalPlanBuilder::from(plan) + .unnest_column("list_col")? + .build()?; + + let exprs = unnest_plan.expressions(); + assert!(!exprs.is_empty(), "Unnest should expose exec_columns"); + assert_eq!(exprs.len(), 1); + assert!(matches!(&exprs[0], Expr::Column(c) if c.name == "list_col")); + + let inputs: Vec = + unnest_plan.inputs().into_iter().cloned().collect(); + let rebuilt = unnest_plan.with_new_exprs(exprs, inputs)?; + assert_eq!(rebuilt.schema(), unnest_plan.schema()); + + Ok(()) + } + + #[test] + fn test_unnest_with_new_exprs_empty_preserves_columns() -> Result<()> { + use crate::LogicalPlanBuilder; + use arrow::datatypes::{DataType, Field, Schema}; + + let schema = Schema::new(vec![ + Field::new("list_col", DataType::new_list(DataType::Int32, true), true), + Field::new("other_col", DataType::Int32, true), + ]); + let plan = table_scan(Some("t"), &schema, None)?.build()?; + let unnest_plan = LogicalPlanBuilder::from(plan) + .unnest_column("list_col")? + .build()?; + + let inputs: Vec = + unnest_plan.inputs().into_iter().cloned().collect(); + let rebuilt = unnest_plan.with_new_exprs(vec![], inputs)?; + assert_eq!(rebuilt.schema(), unnest_plan.schema()); + + Ok(()) + } } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index cba2dac24b610..c10ac92eef4f5 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -37,13 +37,15 @@ //! * [`LogicalPlan::with_new_exprs`]: Create a new plan with different expressions //! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions +use std::sync::Arc; + use crate::logical_plan::plan::RangePartitioning; use crate::{ Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement, Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, Sort, Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, - Values, Window, dml::CopyTo, + Values, Window, builder::unnest_with_options, dml::CopyTo, }; use datafusion_common::tree_node::TreeNodeRefContainer; @@ -686,9 +688,39 @@ impl LogicalPlan { _ => Transformed::no(stmt), } .update_data(LogicalPlan::Statement), + LogicalPlan::Unnest(Unnest { + input, + exec_columns, + options, + .. + }) => { + let exprs: Vec = + exec_columns.into_iter().map(Expr::Column).collect(); + exprs.map_elements(f)?.map_data(|mapped_exprs| { + let new_columns = mapped_exprs + .into_iter() + .map(|e| match e { + Expr::Column(c) => Ok(c), + other => internal_err!( + "Expected Expr::Column for Unnest exec_columns, got {other:?}" + ), + }) + .collect::>>()?; + // Rebuild through `unnest_with_options` so the derived + // `list_type_columns`, `struct_type_columns`, + // `dependency_indices`, and `schema` are recomputed from + // the (possibly rewritten) columns rather than carried over + // stale. This keeps `map_expressions` consistent with + // `with_new_exprs`. + unnest_with_options( + Arc::unwrap_or_clone(input), + new_columns, + options, + ) + })? + } // plans without expressions LogicalPlan::EmptyRelation(_) - | LogicalPlan::Unnest(_) | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) diff --git a/datafusion/optimizer/src/extract_leaf_expressions.rs b/datafusion/optimizer/src/extract_leaf_expressions.rs index c90f1567fadbb..b855f224c420b 100644 --- a/datafusion/optimizer/src/extract_leaf_expressions.rs +++ b/datafusion/optimizer/src/extract_leaf_expressions.rs @@ -1151,7 +1151,6 @@ fn try_push_into_inputs( // Unnest may output a column with the same name but different value/type // than its input column. Name-based routing cannot distinguish those. - // On top of that Unnest can't go through the `node.with_new_exprs(node.expressions(), new_inputs)` rebuild if matches!(node, LogicalPlan::Unnest(_)) { return Ok(None); } @@ -3046,16 +3045,15 @@ mod tests { Ok(()) } - /// Regression test for the `Assertion failed: expr.is_empty(): Unnest` - /// internal error. + /// Regression test: the optimizer must not push extractions through + /// `Unnest`. /// - /// `try_push_into_inputs` rebuilds the parent node via - /// `node.with_new_exprs(node.expressions(), new_inputs)`. For `Unnest`, - /// `apply_expressions` exposes the `exec_columns` as `Expr::Column`s - /// (so `expressions()` is **non-empty**), but `with_new_exprs` for - /// `Unnest` immediately calls `assert_no_expressions(expr)?` and errors - /// out. The optimizer should treat `Unnest` as a barrier and bail - /// instead of attempting to push through it. + /// `try_push_into_inputs` routes extracted pairs to inputs by column name. + /// `Unnest` can emit an output column with the same name as its input + /// column but a different value/type (the unnested element), so name-based + /// routing cannot tell the two apart. `try_push_into_inputs` therefore + /// treats `Unnest` as a barrier and bails instead of pushing through it + /// (see the `matches!(node, LogicalPlan::Unnest(_))` guard there). #[test] fn test_no_push_through_unnest() -> Result<()> { use arrow::datatypes::{DataType, Field, Schema};