From fb0228db6144798b00b5d5cc87824d2bd4744524 Mon Sep 17 00:00:00 2001 From: Nathan Bezualem Date: Fri, 5 Jun 2026 21:44:40 -0400 Subject: [PATCH 1/2] . --- datafusion/expr/src/logical_plan/plan.rs | 67 +++++++++++++++++-- datafusion/expr/src/logical_plan/tree_node.rs | 33 ++++++++- .../optimizer/src/extract_leaf_expressions.rs | 1 - 3 files changed, 94 insertions(+), 7 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index b8843953865d2..f2fb431d59a49 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1157,12 +1157,20 @@ impl LogicalPlan { options, .. }) => { - self.assert_no_expressions(expr)?; + let exec_columns = if expr.is_empty() { + columns.clone() + } else { + expr.into_iter() + .map(|e| match e { + Expr::Column(c) => Ok(c), + other => internal_err!( + "Expected Expr::Column for Unnest exec_columns, got {other:?}" + ), + }) + .collect::>>()? + }; let input = self.only_input(inputs)?; - // Update schema with unnested column type. - let new_plan = - unnest_with_options(input, columns.clone(), options.clone())?; - Ok(new_plan) + Ok(unnest_with_options(input, exec_columns, options.clone())?) } } } @@ -6324,4 +6332,53 @@ mod tests { Ok(()) } + + #[test] + fn test_unnest_with_new_exprs_accepts_expressions() -> Result<()> { + use crate::LogicalPlanBuilder; + use arrow::datatypes::{DataType, Field, Schema}; + + let schema = Schema::new(vec![ + Field::new("list_col", DataType::new_list(DataType::Int32, true), true), + Field::new("other_col", DataType::Int32, true), + ]); + let plan = table_scan(Some("t"), &schema, None)?.build()?; + let unnest_plan = LogicalPlanBuilder::from(plan) + .unnest_column("list_col")? + .build()?; + + let exprs = unnest_plan.expressions(); + assert!(!exprs.is_empty(), "Unnest should expose exec_columns"); + assert_eq!(exprs.len(), 1); + assert!(matches!(&exprs[0], Expr::Column(c) if c.name == "list_col")); + + let inputs: Vec = + unnest_plan.inputs().into_iter().cloned().collect(); + let rebuilt = unnest_plan.with_new_exprs(exprs, inputs)?; + assert_eq!(rebuilt.schema(), unnest_plan.schema()); + + Ok(()) + } + + #[test] + fn test_unnest_with_new_exprs_empty_preserves_columns() -> Result<()> { + use crate::LogicalPlanBuilder; + use arrow::datatypes::{DataType, Field, Schema}; + + let schema = Schema::new(vec![ + Field::new("list_col", DataType::new_list(DataType::Int32, true), true), + Field::new("other_col", DataType::Int32, true), + ]); + let plan = table_scan(Some("t"), &schema, None)?.build()?; + let unnest_plan = LogicalPlanBuilder::from(plan) + .unnest_column("list_col")? + .build()?; + + let inputs: Vec = + unnest_plan.inputs().into_iter().cloned().collect(); + let rebuilt = unnest_plan.with_new_exprs(vec![], inputs)?; + assert_eq!(rebuilt.schema(), unnest_plan.schema()); + + Ok(()) + } } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index e0cdec9e2c088..cab0f59f2bc03 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -671,9 +671,40 @@ impl LogicalPlan { _ => Transformed::no(stmt), } .update_data(LogicalPlan::Statement), + LogicalPlan::Unnest(Unnest { + input, + exec_columns, + list_type_columns, + struct_type_columns, + dependency_indices, + schema, + options, + }) => { + let exprs: Vec = + exec_columns.into_iter().map(Expr::Column).collect(); + exprs.map_elements(f)?.update_data(|mapped_exprs| { + let new_columns = mapped_exprs + .into_iter() + .map(|e| match e { + Expr::Column(c) => c, + _ => unreachable!( + "Unnest exec_columns must remain Column expressions" + ), + }) + .collect(); + LogicalPlan::Unnest(Unnest { + input, + exec_columns: new_columns, + list_type_columns, + struct_type_columns, + dependency_indices, + schema, + options, + }) + }) + } // plans without expressions LogicalPlan::EmptyRelation(_) - | LogicalPlan::Unnest(_) | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) diff --git a/datafusion/optimizer/src/extract_leaf_expressions.rs b/datafusion/optimizer/src/extract_leaf_expressions.rs index 185f9d045f10f..d540f2683e3e4 100644 --- a/datafusion/optimizer/src/extract_leaf_expressions.rs +++ b/datafusion/optimizer/src/extract_leaf_expressions.rs @@ -1145,7 +1145,6 @@ fn try_push_into_inputs( // Unnest may output a column with the same name but different value/type // than its input column. Name-based routing cannot distinguish those. - // On top of that Unnest can't go through the `node.with_new_exprs(node.expressions(), new_inputs)` rebuild if matches!(node, LogicalPlan::Unnest(_)) { return Ok(None); } From 4ef89cde2708ed8cb9f8fb8380eae5ce89fe1fc5 Mon Sep 17 00:00:00 2001 From: Nathan Bezualem Date: Sat, 6 Jun 2026 21:07:07 -0400 Subject: [PATCH 2/2] . --- datafusion/expr/src/logical_plan/tree_node.rs | 39 ++++++++++--------- .../optimizer/src/extract_leaf_expressions.rs | 17 ++++---- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index cab0f59f2bc03..f7b541831cf94 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -37,12 +37,14 @@ //! * [`LogicalPlan::with_new_exprs`]: Create a new plan with different expressions //! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions +use std::sync::Arc; + use crate::{ Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement, Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, Sort, Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, - Values, Window, dml::CopyTo, + Values, Window, builder::unnest_with_options, dml::CopyTo, }; use datafusion_common::tree_node::TreeNodeRefContainer; @@ -674,34 +676,33 @@ impl LogicalPlan { LogicalPlan::Unnest(Unnest { input, exec_columns, - list_type_columns, - struct_type_columns, - dependency_indices, - schema, options, + .. }) => { let exprs: Vec = exec_columns.into_iter().map(Expr::Column).collect(); - exprs.map_elements(f)?.update_data(|mapped_exprs| { + exprs.map_elements(f)?.map_data(|mapped_exprs| { let new_columns = mapped_exprs .into_iter() .map(|e| match e { - Expr::Column(c) => c, - _ => unreachable!( - "Unnest exec_columns must remain Column expressions" + Expr::Column(c) => Ok(c), + other => internal_err!( + "Expected Expr::Column for Unnest exec_columns, got {other:?}" ), }) - .collect(); - LogicalPlan::Unnest(Unnest { - input, - exec_columns: new_columns, - list_type_columns, - struct_type_columns, - dependency_indices, - schema, + .collect::>>()?; + // Rebuild through `unnest_with_options` so the derived + // `list_type_columns`, `struct_type_columns`, + // `dependency_indices`, and `schema` are recomputed from + // the (possibly rewritten) columns rather than carried over + // stale. This keeps `map_expressions` consistent with + // `with_new_exprs`. + unnest_with_options( + Arc::unwrap_or_clone(input), + new_columns, options, - }) - }) + ) + })? } // plans without expressions LogicalPlan::EmptyRelation(_) diff --git a/datafusion/optimizer/src/extract_leaf_expressions.rs b/datafusion/optimizer/src/extract_leaf_expressions.rs index d540f2683e3e4..98a6c30b628a1 100644 --- a/datafusion/optimizer/src/extract_leaf_expressions.rs +++ b/datafusion/optimizer/src/extract_leaf_expressions.rs @@ -3042,16 +3042,15 @@ mod tests { Ok(()) } - /// Regression test for the `Assertion failed: expr.is_empty(): Unnest` - /// internal error. + /// Regression test: the optimizer must not push extractions through + /// `Unnest`. /// - /// `try_push_into_inputs` rebuilds the parent node via - /// `node.with_new_exprs(node.expressions(), new_inputs)`. For `Unnest`, - /// `apply_expressions` exposes the `exec_columns` as `Expr::Column`s - /// (so `expressions()` is **non-empty**), but `with_new_exprs` for - /// `Unnest` immediately calls `assert_no_expressions(expr)?` and errors - /// out. The optimizer should treat `Unnest` as a barrier and bail - /// instead of attempting to push through it. + /// `try_push_into_inputs` routes extracted pairs to inputs by column name. + /// `Unnest` can emit an output column with the same name as its input + /// column but a different value/type (the unnested element), so name-based + /// routing cannot tell the two apart. `try_push_into_inputs` therefore + /// treats `Unnest` as a barrier and bails instead of pushing through it + /// (see the `matches!(node, LogicalPlan::Unnest(_))` guard there). #[test] fn test_no_push_through_unnest() -> Result<()> { use arrow::datatypes::{DataType, Field, Schema};