From ea595840f3feec2e3ae053feafc970462ecd6980 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Fri, 14 Nov 2025 03:16:14 +0530 Subject: [PATCH 1/5] It works ig --- .../optimizer/src/eliminate_nested_union.rs | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_nested_union.rs b/datafusion/optimizer/src/eliminate_nested_union.rs index f8f93727cd9b..e1822abf8275 100644 --- a/datafusion/optimizer/src/eliminate_nested_union.rs +++ b/datafusion/optimizer/src/eliminate_nested_union.rs @@ -21,7 +21,7 @@ use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::tree_node::Transformed; use datafusion_common::Result; use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema; -use datafusion_expr::{Distinct, LogicalPlan, Union}; +use datafusion_expr::{Distinct, LogicalPlan, Projection, Union}; use itertools::Itertools; use std::sync::Arc; @@ -61,7 +61,6 @@ impl OptimizerRule for EliminateNestedUnion { .flat_map(extract_plans_from_union) .map(|plan| coerce_plan_expr_for_schema(plan, &schema)) .collect::>>()?; - Ok(Transformed::yes(LogicalPlan::Union(Union { inputs: inputs.into_iter().map(Arc::new).collect_vec(), schema, @@ -100,6 +99,21 @@ fn extract_plans_from_union(plan: Arc) -> Vec { .into_iter() .map(Arc::unwrap_or_clone) .collect::>(), + LogicalPlan::Projection(Projection { + expr, + input, + schema, + .. + }) => match Arc::unwrap_or_clone(input) { + LogicalPlan::Union(Union { inputs, .. }) => inputs + .into_iter() + .map(Arc::unwrap_or_clone) + .collect::>(), + + plan => vec![LogicalPlan::Projection( + Projection::try_new_with_schema(expr, Arc::new(plan), schema).unwrap(), + )], + }, plan => vec![plan], } } @@ -111,6 +125,13 @@ fn extract_plan_from_distinct(plan: Arc) -> Arc { } } +// fn extract_plan_from_projection(plan: Arc) -> Arc { +// match Arc::unwrap_or_clone(plan) { +// LogicalPlan::Distinct(Distinct::All(plan)) => plan, +// plan => Arc::new(plan), +// } +// } + #[cfg(test)] mod tests { use super::*; From 466ca17b0f0994b52ff4ba1dddcc884cd0ad62a6 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Fri, 14 Nov 2025 20:30:08 +0530 Subject: [PATCH 2/5] Okay, this one maybe, wrapped projection nicely --- .../optimizer/src/eliminate_nested_union.rs | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_nested_union.rs b/datafusion/optimizer/src/eliminate_nested_union.rs index e1822abf8275..208668442b1d 100644 --- a/datafusion/optimizer/src/eliminate_nested_union.rs +++ b/datafusion/optimizer/src/eliminate_nested_union.rs @@ -54,13 +54,14 @@ impl OptimizerRule for EliminateNestedUnion { plan: LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result> { - match plan { + match plan.clone() { LogicalPlan::Union(Union { inputs, schema }) => { let inputs = inputs .into_iter() .flat_map(extract_plans_from_union) .map(|plan| coerce_plan_expr_for_schema(plan, &schema)) .collect::>>()?; + Ok(Transformed::yes(LogicalPlan::Union(Union { inputs: inputs.into_iter().map(Arc::new).collect_vec(), schema, @@ -94,7 +95,7 @@ impl OptimizerRule for EliminateNestedUnion { } fn extract_plans_from_union(plan: Arc) -> Vec { - match Arc::unwrap_or_clone(plan) { + match Arc::unwrap_or_clone(plan.clone()) { LogicalPlan::Union(Union { inputs, .. }) => inputs .into_iter() .map(Arc::unwrap_or_clone) @@ -104,15 +105,23 @@ fn extract_plans_from_union(plan: Arc) -> Vec { input, schema, .. - }) => match Arc::unwrap_or_clone(input) { + }) => match Arc::unwrap_or_clone(input.clone()) { LogicalPlan::Union(Union { inputs, .. }) => inputs .into_iter() .map(Arc::unwrap_or_clone) + .map(|plan| { + LogicalPlan::Projection( + Projection::try_new_with_schema( + expr.clone(), + Arc::new(plan), + schema.clone(), + ) + .unwrap(), + ) + }) .collect::>(), - plan => vec![LogicalPlan::Projection( - Projection::try_new_with_schema(expr, Arc::new(plan), schema).unwrap(), - )], + _ => vec![Arc::unwrap_or_clone(plan)], }, plan => vec![plan], } @@ -125,13 +134,6 @@ fn extract_plan_from_distinct(plan: Arc) -> Arc { } } -// fn extract_plan_from_projection(plan: Arc) -> Arc { -// match Arc::unwrap_or_clone(plan) { -// LogicalPlan::Distinct(Distinct::All(plan)) => plan, -// plan => Arc::new(plan), -// } -// } - #[cfg(test)] mod tests { use super::*; From c516035107ee4c75d6af53d46a4c45e2d6924bcf Mon Sep 17 00:00:00 2001 From: logan-keede Date: Fri, 14 Nov 2025 21:21:13 +0530 Subject: [PATCH 3/5] clippy --- datafusion/optimizer/src/eliminate_nested_union.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_nested_union.rs b/datafusion/optimizer/src/eliminate_nested_union.rs index 208668442b1d..9ed010997243 100644 --- a/datafusion/optimizer/src/eliminate_nested_union.rs +++ b/datafusion/optimizer/src/eliminate_nested_union.rs @@ -95,7 +95,7 @@ impl OptimizerRule for EliminateNestedUnion { } fn extract_plans_from_union(plan: Arc) -> Vec { - match Arc::unwrap_or_clone(plan.clone()) { + match Arc::unwrap_or_clone(plan) { LogicalPlan::Union(Union { inputs, .. }) => inputs .into_iter() .map(Arc::unwrap_or_clone) @@ -105,7 +105,7 @@ fn extract_plans_from_union(plan: Arc) -> Vec { input, schema, .. - }) => match Arc::unwrap_or_clone(input.clone()) { + }) => match Arc::unwrap_or_clone(input) { LogicalPlan::Union(Union { inputs, .. }) => inputs .into_iter() .map(Arc::unwrap_or_clone) @@ -113,15 +113,17 @@ fn extract_plans_from_union(plan: Arc) -> Vec { LogicalPlan::Projection( Projection::try_new_with_schema( expr.clone(), - Arc::new(plan), - schema.clone(), + Arc::new(plan.clone()), + Arc::clone(&schema), ) .unwrap(), ) }) .collect::>(), - _ => vec![Arc::unwrap_or_clone(plan)], + plan => vec![LogicalPlan::Projection( + Projection::try_new_with_schema(expr, Arc::new(plan), schema).unwrap(), + )], }, plan => vec![plan], } From d9acde58d596140d8f4f6f23f6d6271cab02d9d2 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Sun, 23 Nov 2025 17:21:59 +0530 Subject: [PATCH 4/5] remove clone --- datafusion/optimizer/src/optimize_unions.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/optimize_unions.rs b/datafusion/optimizer/src/optimize_unions.rs index 7e7303f6577c..307ff5a831e3 100644 --- a/datafusion/optimizer/src/optimize_unions.rs +++ b/datafusion/optimizer/src/optimize_unions.rs @@ -118,7 +118,7 @@ fn extract_plans_from_union(plan: Arc) -> Vec { LogicalPlan::Projection( Projection::try_new_with_schema( expr.clone(), - Arc::new(plan.clone()), + Arc::new(plan), Arc::clone(&schema), ) .unwrap(), From 681a173443d3b483d7b0340c5359620dba03fc35 Mon Sep 17 00:00:00 2001 From: logan-keede Date: Mon, 24 Nov 2025 23:43:41 +0530 Subject: [PATCH 5/5] add inline comment for explanation --- datafusion/optimizer/src/optimize_unions.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/optimizer/src/optimize_unions.rs b/datafusion/optimizer/src/optimize_unions.rs index 307ff5a831e3..23a6fe95e579 100644 --- a/datafusion/optimizer/src/optimize_unions.rs +++ b/datafusion/optimizer/src/optimize_unions.rs @@ -105,6 +105,13 @@ fn extract_plans_from_union(plan: Arc) -> Vec { .into_iter() .map(Arc::unwrap_or_clone) .collect::>(), + // While unnesting, unwrap a Projection whose input is a nested Union, + // flatten the inner Union, and push the same Projection down onto + // each of the nested Union’s children. + // + // Example: + // Union { Projection { Union { plan1, plan2 } }, plan3 } + // => Union { Projection { plan1 }, Projection { plan2 }, plan3 } LogicalPlan::Projection(Projection { expr, input,