From db53b91774ce14ffcd681b33ca151dd544476d6a Mon Sep 17 00:00:00 2001 From: liuli Date: Mon, 15 Nov 2021 17:14:23 +0800 Subject: [PATCH 1/5] Extract Projection, Filter, Window in LogicalPlan as independent struct (#1302) --- .../core/src/serde/logical_plan/to_proto.rs | 11 ++- datafusion/src/execution/context.rs | 5 +- datafusion/src/logical_plan/builder.rs | 14 +-- datafusion/src/logical_plan/mod.rs | 2 +- datafusion/src/logical_plan/plan.rs | 99 ++++++++++--------- .../src/optimizer/common_subexpr_eliminate.rs | 27 ++--- datafusion/src/optimizer/filter_push_down.rs | 11 ++- datafusion/src/optimizer/limit_push_down.rs | 9 +- .../src/optimizer/projection_push_down.rs | 17 ++-- datafusion/src/optimizer/utils.rs | 15 +-- datafusion/src/physical_plan/planner.rs | 11 ++- datafusion/tests/custom_sources.rs | 3 +- datafusion/tests/sql.rs | 3 +- 13 files changed, 123 insertions(+), 104 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index e4c7656cc8a1c..e62182314bbdb 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -50,6 +50,7 @@ use std::{ boxed, convert::{TryFrom, TryInto}, }; +use datafusion::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; impl protobuf::IntervalUnit { pub fn from_arrow_interval_unit(interval_unit: &IntervalUnit) -> Self { @@ -777,9 +778,9 @@ impl TryInto for &LogicalPlan { ))) } } - LogicalPlan::Projection { + LogicalPlan::Projection(ProjectionPlan { expr, input, alias, .. - } => Ok(protobuf::LogicalPlanNode { + }) => Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Projection(Box::new( protobuf::ProjectionNode { input: Some(Box::new(input.as_ref().try_into()?)), @@ -794,7 +795,7 @@ impl TryInto for &LogicalPlan { }, ))), }), - LogicalPlan::Filter { predicate, input } => { + LogicalPlan::Filter(FilterPlan { predicate, input }) => { let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Selection(Box::new( @@ -805,9 +806,9 @@ impl TryInto for &LogicalPlan { ))), }) } - LogicalPlan::Window { + LogicalPlan::Window(WindowPlan { input, window_expr, .. - } => { + }) => { let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Window(Box::new( diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 78a7884a8103a..31dc3b93c0ed8 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1208,6 +1208,7 @@ mod tests { use std::{io::prelude::*, sync::Mutex}; use tempfile::TempDir; use test::*; + use crate::logical_plan::plan::ProjectionPlan; #[test] fn optimize_explain() { @@ -1420,7 +1421,7 @@ mod tests { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match &**input { + LogicalPlan::Projection(ProjectionPlan { input, .. }) => match &**input { LogicalPlan::TableScan { source, projected_schema, @@ -1493,7 +1494,7 @@ mod tests { let ctx = ExecutionContext::new(); let optimized_plan = ctx.optimize(&plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match &**input { + LogicalPlan::Projection(ProjectionPlan { input, .. }) => match &**input { LogicalPlan::TableScan { source, projected_schema, diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 0c7950cb6ea9b..1dbacecd37b2d 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -25,7 +25,7 @@ use crate::datasource::{ MemTable, TableProvider, }; use crate::error::{DataFusionError, Result}; -use crate::logical_plan::plan::ToStringifiedPlan; +use crate::logical_plan::plan::{FilterPlan, ProjectionPlan, ToStringifiedPlan, WindowPlan}; use crate::prelude::*; use crate::scalar::ScalarValue; use arrow::{ @@ -450,10 +450,10 @@ impl LogicalPlanBuilder { /// Apply a filter pub fn filter(&self, expr: impl Into) -> Result { let expr = normalize_col(expr.into(), &self.plan)?; - Ok(Self::from(LogicalPlan::Filter { + Ok(Self::from(LogicalPlan::Filter(FilterPlan { predicate: expr, input: Arc::new(self.plan.clone()), - })) + }))) } /// Apply a limit @@ -657,11 +657,11 @@ impl LogicalPlanBuilder { let mut window_fields: Vec = exprlist_to_fields(all_expr, self.plan.schema())?; window_fields.extend_from_slice(self.plan.schema().fields()); - Ok(Self::from(LogicalPlan::Window { + Ok(Self::from(LogicalPlan::Window(WindowPlan { input: Arc::new(self.plan.clone()), window_expr, schema: Arc::new(DFSchema::new(window_fields)?), - })) + }))) } /// Apply an aggregate: grouping on the `group_expr` expressions @@ -897,12 +897,12 @@ pub fn project_with_alias( Some(ref alias) => input_schema.replace_qualifier(alias.as_str()), None => input_schema, }; - Ok(LogicalPlan::Projection { + Ok(LogicalPlan::Projection(ProjectionPlan { expr: projected_expr, input: Arc::new(plan.clone()), schema: DFSchemaRef::new(schema), alias, - }) + })) } /// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs index 8569b35196df9..5db6a99672e64 100644 --- a/datafusion/src/logical_plan/mod.rs +++ b/datafusion/src/logical_plan/mod.rs @@ -27,7 +27,7 @@ mod display; mod expr; mod extension; mod operators; -mod plan; +pub mod plan; mod registry; pub mod window_frames; pub use builder::{ diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index d1e1678b6617a..43724b7df5f42 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -57,6 +57,36 @@ pub enum JoinConstraint { Using, } +#[derive(Clone)] +pub struct ProjectionPlan { + /// The list of expressions + pub expr: Vec, + /// The incoming logical plan + pub input: Arc, + /// The schema description of the output + pub schema: DFSchemaRef, + /// Projection output relation alias + pub alias: Option, +} + +#[derive(Clone)] +pub struct FilterPlan { + /// The predicate expression, which must have Boolean type. + pub predicate: Expr, + /// The incoming logical plan + pub input: Arc, +} + +#[derive(Clone)] +pub struct WindowPlan { + /// The incoming logical plan + pub input: Arc, + /// The window function expression + pub window_expr: Vec, + /// The schema description of the window output + pub schema: DFSchemaRef, +} + /// A LogicalPlan represents the different types of relational /// operators (such as Projection, Filter, etc) and can be created by /// the SQL query planner and the DataFrame API. @@ -69,16 +99,7 @@ pub enum JoinConstraint { pub enum LogicalPlan { /// Evaluates an arbitrary list of expressions (essentially a /// SELECT with an expression list) on its input. - Projection { - /// The list of expressions - expr: Vec, - /// The incoming logical plan - input: Arc, - /// The schema description of the output - schema: DFSchemaRef, - /// Projection output relation alias - alias: Option, - }, + Projection(ProjectionPlan), /// Filters rows from its input that do not match an /// expression (essentially a WHERE clause with a predicate /// expression). @@ -87,21 +108,9 @@ pub enum LogicalPlan { /// If the value of `` is true, the input row is passed to /// the output. If the value of `` is false, the row is /// discarded. - Filter { - /// The predicate expression, which must have Boolean type. - predicate: Expr, - /// The incoming logical plan - input: Arc, - }, + Filter(FilterPlan), /// Window its input based on a set of window spec and window function (e.g. SUM or RANK) - Window { - /// The incoming logical plan - input: Arc, - /// The window function expression - window_expr: Vec, - /// The schema description of the window output - schema: DFSchemaRef, - }, + Window(WindowPlan), /// Aggregates its input based on a set of grouping and aggregate /// expressions (e.g. SUM). Aggregate { @@ -268,9 +277,9 @@ impl LogicalPlan { LogicalPlan::TableScan { projected_schema, .. } => projected_schema, - LogicalPlan::Projection { schema, .. } => schema, - LogicalPlan::Filter { input, .. } => input.schema(), - LogicalPlan::Window { schema, .. } => schema, + LogicalPlan::Projection(ProjectionPlan { schema, .. }) => schema, + LogicalPlan::Filter(FilterPlan { input, .. }) => input.schema(), + LogicalPlan::Window(WindowPlan { schema, .. }) => schema, LogicalPlan::Aggregate { schema, .. } => schema, LogicalPlan::Sort { input, .. } => input.schema(), LogicalPlan::Join { schema, .. } => schema, @@ -294,9 +303,9 @@ impl LogicalPlan { projected_schema, .. } => vec![projected_schema], LogicalPlan::Values { schema, .. } => vec![schema], - LogicalPlan::Window { input, schema, .. } + LogicalPlan::Window(WindowPlan { input, schema, .. }) | LogicalPlan::Aggregate { input, schema, .. } - | LogicalPlan::Projection { input, schema, .. } => { + | LogicalPlan::Projection(ProjectionPlan { input, schema, .. }) => { let mut schemas = input.all_schemas(); schemas.insert(0, schema); schemas @@ -329,7 +338,7 @@ impl LogicalPlan { | LogicalPlan::Repartition { input, .. } | LogicalPlan::Sort { input, .. } | LogicalPlan::CreateMemoryTable { input, .. } - | LogicalPlan::Filter { input, .. } => input.all_schemas(), + | LogicalPlan::Filter(FilterPlan { input, .. }) => input.all_schemas(), LogicalPlan::DropTable { .. } => vec![], } } @@ -347,11 +356,11 @@ impl LogicalPlan { /// children pub fn expressions(self: &LogicalPlan) -> Vec { match self { - LogicalPlan::Projection { expr, .. } => expr.clone(), + LogicalPlan::Projection(ProjectionPlan { expr, .. }) => expr.clone(), LogicalPlan::Values { values, .. } => { values.iter().flatten().cloned().collect() } - LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()], + LogicalPlan::Filter(FilterPlan { predicate, .. }) => vec![predicate.clone()], LogicalPlan::Repartition { partitioning_scheme, .. @@ -359,7 +368,7 @@ impl LogicalPlan { Partitioning::Hash(expr, _) => expr.clone(), _ => vec![], }, - LogicalPlan::Window { window_expr, .. } => window_expr.clone(), + LogicalPlan::Window(WindowPlan { window_expr, .. }) => window_expr.clone(), LogicalPlan::Aggregate { group_expr, aggr_expr, @@ -391,10 +400,10 @@ impl LogicalPlan { /// include inputs to inputs. pub fn inputs(self: &LogicalPlan) -> Vec<&LogicalPlan> { match self { - LogicalPlan::Projection { input, .. } => vec![input], - LogicalPlan::Filter { input, .. } => vec![input], + LogicalPlan::Projection(ProjectionPlan { input, .. }) => vec![input], + LogicalPlan::Filter(FilterPlan { input, .. }) => vec![input], LogicalPlan::Repartition { input, .. } => vec![input], - LogicalPlan::Window { input, .. } => vec![input], + LogicalPlan::Window(WindowPlan { input, .. }) => vec![input], LogicalPlan::Aggregate { input, .. } => vec![input], LogicalPlan::Sort { input, .. } => vec![input], LogicalPlan::Join { left, right, .. } => vec![left, right], @@ -524,10 +533,10 @@ impl LogicalPlan { } let recurse = match self { - LogicalPlan::Projection { input, .. } => input.accept(visitor)?, - LogicalPlan::Filter { input, .. } => input.accept(visitor)?, + LogicalPlan::Projection(ProjectionPlan { input, .. }) => input.accept(visitor)?, + LogicalPlan::Filter(FilterPlan { input, .. }) => input.accept(visitor)?, LogicalPlan::Repartition { input, .. } => input.accept(visitor)?, - LogicalPlan::Window { input, .. } => input.accept(visitor)?, + LogicalPlan::Window(WindowPlan { input, .. }) => input.accept(visitor)?, LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?, LogicalPlan::Sort { input, .. } => input.accept(visitor)?, LogicalPlan::Join { left, right, .. } @@ -788,9 +797,9 @@ impl LogicalPlan { Ok(()) } - LogicalPlan::Projection { + LogicalPlan::Projection(ProjectionPlan { ref expr, alias, .. - } => { + }) => { write!(f, "Projection: ")?; for (i, expr_item) in expr.iter().enumerate() { if i > 0 { @@ -803,13 +812,13 @@ impl LogicalPlan { } Ok(()) } - LogicalPlan::Filter { + LogicalPlan::Filter(FilterPlan { predicate: ref expr, .. - } => write!(f, "Filter: {:?}", expr), - LogicalPlan::Window { + }) => write!(f, "Filter: {:?}", expr), + LogicalPlan::Window(WindowPlan { ref window_expr, .. - } => { + }) => { write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr) } LogicalPlan::Aggregate { diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index c88032e38b2e7..443d9703090a2 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -28,6 +28,7 @@ use crate::optimizer::utils; use arrow::datatypes::DataType; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use crate::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; /// A map from expression's identifier to tuple including /// - the expression itself (cloned) @@ -77,12 +78,12 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { + }) => { let arrays = to_arrays(expr, input, &mut expr_set)?; let (mut new_expr, new_input) = rewrite_expr( @@ -94,14 +95,14 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { + LogicalPlan::Filter(FilterPlan { predicate, input }) => { let schemas = plan.all_schemas(); let all_schema = schemas.into_iter().fold(DFSchema::empty(), |mut lhs, rhs| { @@ -122,16 +123,16 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { + }) => { let arrays = to_arrays(window_expr, input, &mut expr_set)?; let (mut new_expr, new_input) = rewrite_expr( @@ -143,11 +144,11 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result LogicalPlan { and(acc, (*predicate).to_owned()) }); - LogicalPlan::Filter { + LogicalPlan::Filter(FilterPlan { predicate, input: Arc::new(plan), - } + }) } // remove all filters from `filters` that are in `predicate_columns` @@ -285,7 +286,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { push_down(&state, plan) } LogicalPlan::Analyze { .. } => push_down(&state, plan), - LogicalPlan::Filter { input, predicate } => { + LogicalPlan::Filter(FilterPlan { input, predicate }) => { let mut predicates = vec![]; split_members(predicate, &mut predicates); @@ -314,12 +315,12 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { optimize(input, state) } } - LogicalPlan::Projection { + LogicalPlan::Projection(ProjectionPlan { input, expr, schema, alias: _, - } => { + }) => { // A projection is filter-commutable, but re-writes all predicate expressions // collect projection. let projection = schema diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs index d02777c6c609d..12413ee0d3801 100644 --- a/datafusion/src/optimizer/limit_push_down.rs +++ b/datafusion/src/optimizer/limit_push_down.rs @@ -23,6 +23,7 @@ use crate::execution::context::ExecutionProps; use crate::logical_plan::LogicalPlan; use crate::optimizer::optimizer::OptimizerRule; use std::sync::Arc; +use crate::logical_plan::plan::ProjectionPlan; /// Optimization rule that tries pushes down LIMIT n /// where applicable to reduce the amount of scanned / processed data @@ -76,16 +77,16 @@ fn limit_push_down( projected_schema: projected_schema.clone(), }), ( - LogicalPlan::Projection { + LogicalPlan::Projection(ProjectionPlan { expr, input, schema, alias, - }, + }), upper_limit, ) => { // Push down limit directly (projection doesn't change number of rows) - Ok(LogicalPlan::Projection { + Ok(LogicalPlan::Projection(ProjectionPlan { expr: expr.clone(), input: Arc::new(limit_push_down( optimizer, @@ -95,7 +96,7 @@ fn limit_push_down( )?), schema: schema.clone(), alias: alias.clone(), - }) + })) } ( LogicalPlan::Union { diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 7c087a17a9864..47aad82ce5e3c 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -33,6 +33,7 @@ use std::{ collections::{BTreeSet, HashSet}, sync::Arc, }; +use crate::logical_plan::plan::{ProjectionPlan, WindowPlan}; /// Optimizer that removes unused projections and aggregations from plans /// This reduces both scans and @@ -130,12 +131,12 @@ fn optimize_plan( ) -> Result { let mut new_required_columns = required_columns.clone(); match plan { - LogicalPlan::Projection { + LogicalPlan::Projection(ProjectionPlan { input, expr, schema, alias, - } => { + }) => { // projection: // * remove any expression that is not required // * construct the new set of required columns @@ -181,12 +182,12 @@ fn optimize_plan( // no need for an expression at all Ok(new_input) } else { - Ok(LogicalPlan::Projection { + Ok(LogicalPlan::Projection(ProjectionPlan { expr: new_expr, input: Arc::new(new_input), schema: DFSchemaRef::new(DFSchema::new(new_fields)?), alias: alias.clone(), - }) + })) } } LogicalPlan::Join { @@ -235,12 +236,12 @@ fn optimize_plan( null_equals_null: *null_equals_null, }) } - LogicalPlan::Window { + LogicalPlan::Window(WindowPlan { schema, window_expr, input, .. - } => { + }) => { // Gather all columns needed for expressions in this Window let mut new_window_expr = Vec::new(); { @@ -747,12 +748,12 @@ mod tests { let expr = vec![col("a"), col("b")]; let projected_fields = exprlist_to_fields(&expr, input_schema).unwrap(); let projected_schema = DFSchema::new(projected_fields).unwrap(); - let plan = LogicalPlan::Projection { + let plan = LogicalPlan::Projection(ProjectionPlan { expr, input: Arc::new(table_scan), schema: Arc::new(projected_schema), alias: None, - }; + }); assert_fields_eq(&plan, vec!["a", "b"]); diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 52beb695529c3..c75bb294b8248 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -36,6 +36,7 @@ use crate::{ logical_plan::ExpressionVisitor, }; use std::{collections::HashSet, sync::Arc}; +use crate::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__"; const CASE_ELSE_MARKER: &str = "__DATAFUSION_CASE_ELSE__"; @@ -143,12 +144,12 @@ pub fn from_plan( inputs: &[LogicalPlan], ) -> Result { match plan { - LogicalPlan::Projection { schema, alias, .. } => Ok(LogicalPlan::Projection { + LogicalPlan::Projection(ProjectionPlan { schema, alias, .. }) => Ok(LogicalPlan::Projection(ProjectionPlan { expr: expr.to_vec(), input: Arc::new(inputs[0].clone()), schema: schema.clone(), alias: alias.clone(), - }), + })), LogicalPlan::Values { schema, .. } => Ok(LogicalPlan::Values { schema: schema.clone(), values: expr @@ -156,10 +157,10 @@ pub fn from_plan( .map(|s| s.to_vec()) .collect::>(), }), - LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter { + LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(FilterPlan { predicate: expr[0].clone(), input: Arc::new(inputs[0].clone()), - }), + })), LogicalPlan::Repartition { partitioning_scheme, .. @@ -173,15 +174,15 @@ pub fn from_plan( input: Arc::new(inputs[0].clone()), }), }, - LogicalPlan::Window { + LogicalPlan::Window(WindowPlan { window_expr, schema, .. - } => Ok(LogicalPlan::Window { + }) => Ok(LogicalPlan::Window(WindowPlan { input: Arc::new(inputs[0].clone()), window_expr: expr[0..window_expr.len()].to_vec(), schema: schema.clone(), - }), + })), LogicalPlan::Aggregate { group_expr, schema, .. } => Ok(LogicalPlan::Aggregate { diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 402f119e8ea01..718ecb65d3564 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -62,6 +62,7 @@ use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryStreamExt}; use log::debug; use std::sync::Arc; +use crate::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; fn create_function_physical_name( fun: &str, @@ -370,9 +371,9 @@ impl DefaultPhysicalPlanner { )?; Ok(Arc::new(value_exec)) } - LogicalPlan::Window { + LogicalPlan::Window(WindowPlan { input, window_expr, .. - } => { + }) => { if window_expr.is_empty() { return Err(DataFusionError::Internal( "Impossibly got empty window expression".to_owned(), @@ -576,7 +577,7 @@ impl DefaultPhysicalPlanner { physical_input_schema.clone(), )?) ) } - LogicalPlan::Projection { input, expr, .. } => { + LogicalPlan::Projection(ProjectionPlan { input, expr, .. }) => { let input_exec = self.create_initial_plan(input, ctx_state).await?; let input_schema = input.as_ref().schema(); @@ -628,9 +629,9 @@ impl DefaultPhysicalPlanner { input_exec, )?) ) } - LogicalPlan::Filter { + LogicalPlan::Filter(FilterPlan { input, predicate, .. - } => { + }) => { let physical_input = self.create_initial_plan(input, ctx_state).await?; let input_schema = physical_input.as_ref().schema(); let input_dfschema = input.as_ref().schema(); diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index a29a265f68716..03e7e4aa6114e 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -45,6 +45,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use async_trait::async_trait; +use datafusion::logical_plan::plan::ProjectionPlan; //// Custom source dataframe tests //// @@ -216,7 +217,7 @@ async fn custom_source_dataframe() -> Result<()> { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match &**input { + LogicalPlan::Projection(ProjectionPlan { input, .. }) => match &**input { LogicalPlan::TableScan { source, projected_schema, diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index eeb6c10926b14..893da7cad67f8 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -48,6 +48,7 @@ use datafusion::{ physical_plan::ColumnarValue, }; use datafusion::{execution::context::ExecutionContext, physical_plan::displayable}; +use datafusion::logical_plan::plan::ProjectionPlan; mod common; @@ -90,7 +91,7 @@ async fn nyc() -> Result<()> { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match input.as_ref() { + LogicalPlan::Projection(ProjectionPlan { input, .. }) => match input.as_ref() { LogicalPlan::Aggregate { input, .. } => match input.as_ref() { LogicalPlan::TableScan { ref projected_schema, From f386f29f1d758422eaad84331daeeb73e6a9734a Mon Sep 17 00:00:00 2001 From: liuli Date: Mon, 15 Nov 2021 23:14:19 +0800 Subject: [PATCH 2/5] Extract Projection, Filter, Window in LogicalPlan as independent struct (#1302) --- .../core/src/serde/logical_plan/to_proto.rs | 2 +- datafusion/src/execution/context.rs | 2 +- datafusion/src/logical_plan/builder.rs | 4 ++- datafusion/src/logical_plan/plan.rs | 29 +++++++++++-------- .../src/optimizer/common_subexpr_eliminate.rs | 2 +- datafusion/src/optimizer/filter_push_down.rs | 2 +- datafusion/src/optimizer/limit_push_down.rs | 2 +- .../src/optimizer/projection_push_down.rs | 2 +- datafusion/src/optimizer/utils.rs | 16 +++++----- datafusion/src/physical_plan/planner.rs | 2 +- datafusion/tests/sql.rs | 2 +- 11 files changed, 37 insertions(+), 28 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index e62182314bbdb..dda132cd42050 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -30,6 +30,7 @@ use datafusion::datasource::TableProvider; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingTable; +use datafusion::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; use datafusion::logical_plan::{ exprlist_to_fields, window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, @@ -50,7 +51,6 @@ use std::{ boxed, convert::{TryFrom, TryInto}, }; -use datafusion::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; impl protobuf::IntervalUnit { pub fn from_arrow_interval_unit(interval_unit: &IntervalUnit) -> Self { diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 31dc3b93c0ed8..e81b4c824fb8b 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1178,6 +1178,7 @@ impl FunctionRegistry for ExecutionContextState { #[cfg(test)] mod tests { use super::*; + use crate::logical_plan::plan::ProjectionPlan; use crate::logical_plan::{binary_expr, lit, Operator}; use crate::physical_plan::functions::{make_scalar_function, Volatility}; use crate::physical_plan::{collect, collect_partitioned}; @@ -1208,7 +1209,6 @@ mod tests { use std::{io::prelude::*, sync::Mutex}; use tempfile::TempDir; use test::*; - use crate::logical_plan::plan::ProjectionPlan; #[test] fn optimize_explain() { diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 1dbacecd37b2d..191121d720949 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -25,7 +25,9 @@ use crate::datasource::{ MemTable, TableProvider, }; use crate::error::{DataFusionError, Result}; -use crate::logical_plan::plan::{FilterPlan, ProjectionPlan, ToStringifiedPlan, WindowPlan}; +use crate::logical_plan::plan::{ + FilterPlan, ProjectionPlan, ToStringifiedPlan, WindowPlan, +}; use crate::prelude::*; use crate::scalar::ScalarValue; use arrow::{ diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 43724b7df5f42..58b3dec234ddd 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -57,6 +57,8 @@ pub enum JoinConstraint { Using, } +/// Evaluates an arbitrary list of expressions (essentially a +/// SELECT with an expression list) on its input. #[derive(Clone)] pub struct ProjectionPlan { /// The list of expressions @@ -69,6 +71,14 @@ pub struct ProjectionPlan { pub alias: Option, } +/// Filters rows from its input that do not match an +/// expression (essentially a WHERE clause with a predicate +/// expression). +/// +/// Semantically, `` is evaluated for each row of the input; +/// If the value of `` is true, the input row is passed to +/// the output. If the value of `` is false, the row is +/// discarded. #[derive(Clone)] pub struct FilterPlan { /// The predicate expression, which must have Boolean type. @@ -77,6 +87,7 @@ pub struct FilterPlan { pub input: Arc, } +/// Window its input based on a set of window spec and window function (e.g. SUM or RANK) #[derive(Clone)] pub struct WindowPlan { /// The incoming logical plan @@ -97,19 +108,11 @@ pub struct WindowPlan { /// from leaves up to the root to produce the query result. #[derive(Clone)] pub enum LogicalPlan { - /// Evaluates an arbitrary list of expressions (essentially a - /// SELECT with an expression list) on its input. + /// the ProjectionPlan Projection(ProjectionPlan), - /// Filters rows from its input that do not match an - /// expression (essentially a WHERE clause with a predicate - /// expression). - /// - /// Semantically, `` is evaluated for each row of the input; - /// If the value of `` is true, the input row is passed to - /// the output. If the value of `` is false, the row is - /// discarded. + /// the FilterPlan Filter(FilterPlan), - /// Window its input based on a set of window spec and window function (e.g. SUM or RANK) + /// the WindowPlan Window(WindowPlan), /// Aggregates its input based on a set of grouping and aggregate /// expressions (e.g. SUM). @@ -533,7 +536,9 @@ impl LogicalPlan { } let recurse = match self { - LogicalPlan::Projection(ProjectionPlan { input, .. }) => input.accept(visitor)?, + LogicalPlan::Projection(ProjectionPlan { input, .. }) => { + input.accept(visitor)? + } LogicalPlan::Filter(FilterPlan { input, .. }) => input.accept(visitor)?, LogicalPlan::Repartition { input, .. } => input.accept(visitor)?, LogicalPlan::Window(WindowPlan { input, .. }) => input.accept(visitor)?, diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index 443d9703090a2..ef17bf4625dc4 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -19,6 +19,7 @@ use crate::error::Result; use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; use crate::logical_plan::{ col, DFField, DFSchema, Expr, ExprRewriter, ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion, @@ -28,7 +29,6 @@ use crate::optimizer::utils; use arrow::datatypes::DataType; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use crate::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; /// A map from expression's identifier to tuple including /// - the expression itself (cloned) diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index e6bc69c9227b9..f486e90ec6e11 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -16,6 +16,7 @@ use crate::datasource::datasource::TableProviderFilterPushDown; use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::{FilterPlan, ProjectionPlan}; use crate::logical_plan::{and, replace_col, Column, LogicalPlan}; use crate::logical_plan::{DFSchema, Expr}; use crate::optimizer::optimizer::OptimizerRule; @@ -25,7 +26,6 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use crate::logical_plan::plan::{FilterPlan, ProjectionPlan}; /// Filter Push Down optimizer rule pushes filter clauses down the plan /// # Introduction diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs index 12413ee0d3801..1100c7423e5bb 100644 --- a/datafusion/src/optimizer/limit_push_down.rs +++ b/datafusion/src/optimizer/limit_push_down.rs @@ -20,10 +20,10 @@ use super::utils; use crate::error::Result; use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::ProjectionPlan; use crate::logical_plan::LogicalPlan; use crate::optimizer::optimizer::OptimizerRule; use std::sync::Arc; -use crate::logical_plan::plan::ProjectionPlan; /// Optimization rule that tries pushes down LIMIT n /// where applicable to reduce the amount of scanned / processed data diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 47aad82ce5e3c..91908e740eda3 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -20,6 +20,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::{ProjectionPlan, WindowPlan}; use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, LogicalPlanBuilder, ToDFSchema, @@ -33,7 +34,6 @@ use std::{ collections::{BTreeSet, HashSet}, sync::Arc, }; -use crate::logical_plan::plan::{ProjectionPlan, WindowPlan}; /// Optimizer that removes unused projections and aggregations from plans /// This reduces both scans and diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index c75bb294b8248..460e9ce2855c5 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -23,6 +23,7 @@ use arrow::record_batch::RecordBatch; use super::optimizer::OptimizerRule; use crate::execution::context::{ExecutionContextState, ExecutionProps}; +use crate::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; use crate::logical_plan::{ build_join_schema, Column, DFSchema, DFSchemaRef, Expr, ExprRewriter, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Recursion, RewriteRecursion, @@ -36,7 +37,6 @@ use crate::{ logical_plan::ExpressionVisitor, }; use std::{collections::HashSet, sync::Arc}; -use crate::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__"; const CASE_ELSE_MARKER: &str = "__DATAFUSION_CASE_ELSE__"; @@ -144,12 +144,14 @@ pub fn from_plan( inputs: &[LogicalPlan], ) -> Result { match plan { - LogicalPlan::Projection(ProjectionPlan { schema, alias, .. }) => Ok(LogicalPlan::Projection(ProjectionPlan { - expr: expr.to_vec(), - input: Arc::new(inputs[0].clone()), - schema: schema.clone(), - alias: alias.clone(), - })), + LogicalPlan::Projection(ProjectionPlan { schema, alias, .. }) => { + Ok(LogicalPlan::Projection(ProjectionPlan { + expr: expr.to_vec(), + input: Arc::new(inputs[0].clone()), + schema: schema.clone(), + alias: alias.clone(), + })) + } LogicalPlan::Values { schema, .. } => Ok(LogicalPlan::Values { schema: schema.clone(), values: expr diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 718ecb65d3564..50a67dfc6842b 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -23,6 +23,7 @@ use super::{ hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, }; use crate::execution::context::ExecutionContextState; +use crate::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; use crate::logical_plan::{ unnormalize_cols, DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType, ToStringifiedPlan, @@ -62,7 +63,6 @@ use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryStreamExt}; use log::debug; use std::sync::Arc; -use crate::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; fn create_function_physical_name( fun: &str, diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 893da7cad67f8..6bf59decd9f76 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -36,6 +36,7 @@ use datafusion::assert_batches_eq; use datafusion::assert_batches_sorted_eq; use datafusion::assert_contains; use datafusion::assert_not_contains; +use datafusion::logical_plan::plan::ProjectionPlan; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::functions::Volatility; use datafusion::physical_plan::metrics::MetricValue; @@ -48,7 +49,6 @@ use datafusion::{ physical_plan::ColumnarValue, }; use datafusion::{execution::context::ExecutionContext, physical_plan::displayable}; -use datafusion::logical_plan::plan::ProjectionPlan; mod common; From e30343b5c0c8a2f5cebf3eed83dec1d3a4b74858 Mon Sep 17 00:00:00 2001 From: liuli Date: Fri, 19 Nov 2021 21:59:01 +0800 Subject: [PATCH 3/5] solve conflicts --- .../core/src/serde/logical_plan/to_proto.rs | 36 +++--- datafusion/src/execution/context.rs | 5 +- datafusion/src/logical_plan/builder.rs | 15 +-- datafusion/src/logical_plan/plan.rs | 112 +++++++++++------- .../src/optimizer/common_subexpr_eliminate.rs | 54 ++++----- datafusion/src/optimizer/filter_push_down.rs | 20 ++-- datafusion/src/optimizer/limit_push_down.rs | 23 ++-- .../src/optimizer/projection_push_down.rs | 35 +++--- .../optimizer/single_distinct_to_groupby.rs | 5 +- datafusion/src/optimizer/utils.rs | 34 +++--- datafusion/src/physical_plan/planner.rs | 29 +++-- datafusion/tests/custom_sources.rs | 4 +- 12 files changed, 196 insertions(+), 176 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index a56a71bc035d5..505330618ec1f 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -779,43 +779,43 @@ impl TryInto for &LogicalPlan { ))) } } - LogicalPlan::Projection(ProjectionPlan { - expr, input, alias, .. - }) => Ok(protobuf::LogicalPlanNode { + LogicalPlan::Projection(projection_plan) => Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Projection(Box::new( protobuf::ProjectionNode { - input: Some(Box::new(input.as_ref().try_into()?)), - expr: expr.iter().map(|expr| expr.try_into()).collect::, - BallistaError, - >>( - )?, - optional_alias: alias + input: Some(Box::new(projection_plan.input.as_ref().try_into()?)), + expr: projection_plan + .expr + .iter() + .map(|expr| expr.try_into()) + .collect::, BallistaError>>()?, + optional_alias: projection_plan + .alias .clone() .map(protobuf::projection_node::OptionalAlias::Alias), }, ))), }), - LogicalPlan::Filter(FilterPlan { predicate, input }) => { - let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?; + LogicalPlan::Filter(filter_plan) => { + let input: protobuf::LogicalPlanNode = + filter_plan.input.as_ref().try_into()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Selection(Box::new( protobuf::SelectionNode { input: Some(Box::new(input)), - expr: Some(predicate.try_into()?), + expr: Some((&filter_plan.predicate).try_into()?), }, ))), }) } - LogicalPlan::Window(WindowPlan { - input, window_expr, .. - }) => { - let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?; + LogicalPlan::Window(window_plan) => { + let input: protobuf::LogicalPlanNode = + window_plan.input.as_ref().try_into()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Window(Box::new( protobuf::WindowNode { input: Some(Box::new(input)), - window_expr: window_expr + window_expr: window_plan + .window_expr .iter() .map(|expr| expr.try_into()) .collect::, _>>()?, diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 5f77b2bfaa6bc..ee501c4c57a0b 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1177,6 +1177,7 @@ impl FunctionRegistry for ExecutionContextState { #[cfg(test)] mod tests { use super::*; + use crate::logical_plan::plan::ProjectionPlan; use crate::logical_plan::TableScanPlan; use crate::logical_plan::{binary_expr, lit, Operator}; use crate::physical_plan::functions::{make_scalar_function, Volatility}; @@ -1415,7 +1416,7 @@ mod tests { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match &**input { + LogicalPlan::Projection(ProjectionPlan { input, .. }) => match &**input { LogicalPlan::TableScan(TableScanPlan { source, projected_schema, @@ -1488,7 +1489,7 @@ mod tests { let ctx = ExecutionContext::new(); let optimized_plan = ctx.optimize(&plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match &**input { + LogicalPlan::Projection(ProjectionPlan { input, .. }) => match &**input { LogicalPlan::TableScan(TableScanPlan { source, projected_schema, diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 8c3057926ce9e..fdf06f39e6503 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -26,7 +26,8 @@ use crate::datasource::{ }; use crate::error::{DataFusionError, Result}; use crate::logical_plan::plan::{ - AnalyzePlan, ExplainPlan, TableScanPlan, ToStringifiedPlan, Union, + AnalyzePlan, ExplainPlan, FilterPlan, ProjectionPlan, TableScanPlan, + ToStringifiedPlan, Union, WindowPlan, }; use crate::prelude::*; use crate::scalar::ScalarValue; @@ -451,10 +452,10 @@ impl LogicalPlanBuilder { /// Apply a filter pub fn filter(&self, expr: impl Into) -> Result { let expr = normalize_col(expr.into(), &self.plan)?; - Ok(Self::from(LogicalPlan::Filter { + Ok(Self::from(LogicalPlan::Filter(FilterPlan { predicate: expr, input: Arc::new(self.plan.clone()), - })) + }))) } /// Apply a limit @@ -658,11 +659,11 @@ impl LogicalPlanBuilder { let mut window_fields: Vec = exprlist_to_fields(all_expr, self.plan.schema())?; window_fields.extend_from_slice(self.plan.schema().fields()); - Ok(Self::from(LogicalPlan::Window { + Ok(Self::from(LogicalPlan::Window(WindowPlan { input: Arc::new(self.plan.clone()), window_expr, schema: Arc::new(DFSchema::new(window_fields)?), - })) + }))) } /// Apply an aggregate: grouping on the `group_expr` expressions @@ -898,12 +899,12 @@ pub fn project_with_alias( Some(ref alias) => input_schema.replace_qualifier(alias.as_str()), None => input_schema, }; - Ok(LogicalPlan::Projection { + Ok(LogicalPlan::Projection(ProjectionPlan { expr: projected_expr, input: Arc::new(plan.clone()), schema: DFSchemaRef::new(schema), alias, - }) + })) } /// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index c12edd98fc01e..3b1260447841f 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -57,6 +57,47 @@ pub enum JoinConstraint { Using, } +/// Evaluates an arbitrary list of expressions (essentially a +/// SELECT with an expression list) on its input. +#[derive(Clone)] +pub struct ProjectionPlan { + /// The list of expressions + pub expr: Vec, + /// The incoming logical plan + pub input: Arc, + /// The schema description of the output + pub schema: DFSchemaRef, + /// Projection output relation alias + pub alias: Option, +} + +/// Filters rows from its input that do not match an +/// expression (essentially a WHERE clause with a predicate +/// expression). +/// +/// Semantically, `` is evaluated for each row of the input; +/// If the value of `` is true, the input row is passed to +/// the output. If the value of `` is false, the row is +/// discarded. +#[derive(Clone)] +pub struct FilterPlan { + /// The predicate expression, which must have Boolean type. + pub predicate: Expr, + /// The incoming logical plan + pub input: Arc, +} + +/// Window its input based on a set of window spec and window function (e.g. SUM or RANK) +#[derive(Clone)] +pub struct WindowPlan { + /// The incoming logical plan + pub input: Arc, + /// The window function expression + pub window_expr: Vec, + /// The schema description of the window output + pub schema: DFSchemaRef, +} + /// Produces rows from a table provider by reference or from the context #[derive(Clone)] pub struct TableScanPlan { @@ -185,16 +226,7 @@ pub struct ExtensionPlan { pub enum LogicalPlan { /// Evaluates an arbitrary list of expressions (essentially a /// SELECT with an expression list) on its input. - Projection { - /// The list of expressions - expr: Vec, - /// The incoming logical plan - input: Arc, - /// The schema description of the output - schema: DFSchemaRef, - /// Projection output relation alias - alias: Option, - }, + Projection(ProjectionPlan), /// Filters rows from its input that do not match an /// expression (essentially a WHERE clause with a predicate /// expression). @@ -203,21 +235,9 @@ pub enum LogicalPlan { /// If the value of `` is true, the input row is passed to /// the output. If the value of `` is false, the row is /// discarded. - Filter { - /// The predicate expression, which must have Boolean type. - predicate: Expr, - /// The incoming logical plan - input: Arc, - }, + Filter(FilterPlan), /// Window its input based on a set of window spec and window function (e.g. SUM or RANK) - Window { - /// The incoming logical plan - input: Arc, - /// The window function expression - window_expr: Vec, - /// The schema description of the window output - schema: DFSchemaRef, - }, + Window(WindowPlan), /// Aggregates its input based on a set of grouping and aggregate /// expressions (e.g. SUM). Aggregate { @@ -310,9 +330,9 @@ impl LogicalPlan { LogicalPlan::TableScan(TableScanPlan { projected_schema, .. }) => projected_schema, - LogicalPlan::Projection { schema, .. } => schema, - LogicalPlan::Filter { input, .. } => input.schema(), - LogicalPlan::Window { schema, .. } => schema, + LogicalPlan::Projection(ProjectionPlan { schema, .. }) => schema, + LogicalPlan::Filter(FilterPlan { input, .. }) => input.schema(), + LogicalPlan::Window(WindowPlan { schema, .. }) => schema, LogicalPlan::Aggregate { schema, .. } => schema, LogicalPlan::Sort { input, .. } => input.schema(), LogicalPlan::Join { schema, .. } => schema, @@ -340,9 +360,9 @@ impl LogicalPlan { projected_schema, .. }) => vec![projected_schema], LogicalPlan::Values { schema, .. } => vec![schema], - LogicalPlan::Window { input, schema, .. } + LogicalPlan::Window(WindowPlan { input, schema, .. }) | LogicalPlan::Aggregate { input, schema, .. } - | LogicalPlan::Projection { input, schema, .. } => { + | LogicalPlan::Projection(ProjectionPlan { input, schema, .. }) => { let mut schemas = input.all_schemas(); schemas.insert(0, schema); schemas @@ -377,7 +397,7 @@ impl LogicalPlan { | LogicalPlan::Repartition(Repartition { input, .. }) | LogicalPlan::Sort { input, .. } | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) - | LogicalPlan::Filter { input, .. } => input.all_schemas(), + | LogicalPlan::Filter(FilterPlan { input, .. }) => input.all_schemas(), LogicalPlan::DropTable(_) => vec![], } } @@ -395,11 +415,11 @@ impl LogicalPlan { /// children pub fn expressions(self: &LogicalPlan) -> Vec { match self { - LogicalPlan::Projection { expr, .. } => expr.clone(), + LogicalPlan::Projection(ProjectionPlan { expr, .. }) => expr.clone(), LogicalPlan::Values { values, .. } => { values.iter().flatten().cloned().collect() } - LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()], + LogicalPlan::Filter(FilterPlan { predicate, .. }) => vec![predicate.clone()], LogicalPlan::Repartition(Repartition { partitioning_scheme, .. @@ -407,7 +427,7 @@ impl LogicalPlan { Partitioning::Hash(expr, _) => expr.clone(), _ => vec![], }, - LogicalPlan::Window { window_expr, .. } => window_expr.clone(), + LogicalPlan::Window(WindowPlan { window_expr, .. }) => window_expr.clone(), LogicalPlan::Aggregate { group_expr, aggr_expr, @@ -439,10 +459,10 @@ impl LogicalPlan { /// include inputs to inputs. pub fn inputs(self: &LogicalPlan) -> Vec<&LogicalPlan> { match self { - LogicalPlan::Projection { input, .. } => vec![input], - LogicalPlan::Filter { input, .. } => vec![input], + LogicalPlan::Projection(ProjectionPlan { input, .. }) => vec![input], + LogicalPlan::Filter(FilterPlan { input, .. }) => vec![input], LogicalPlan::Repartition(Repartition { input, .. }) => vec![input], - LogicalPlan::Window { input, .. } => vec![input], + LogicalPlan::Window(WindowPlan { input, .. }) => vec![input], LogicalPlan::Aggregate { input, .. } => vec![input], LogicalPlan::Sort { input, .. } => vec![input], LogicalPlan::Join { left, right, .. } => vec![left, right], @@ -574,12 +594,14 @@ impl LogicalPlan { } let recurse = match self { - LogicalPlan::Projection { input, .. } => input.accept(visitor)?, - LogicalPlan::Filter { input, .. } => input.accept(visitor)?, + LogicalPlan::Projection(ProjectionPlan { input, .. }) => { + input.accept(visitor)? + } + LogicalPlan::Filter(FilterPlan { input, .. }) => input.accept(visitor)?, LogicalPlan::Repartition(Repartition { input, .. }) => { input.accept(visitor)? } - LogicalPlan::Window { input, .. } => input.accept(visitor)?, + LogicalPlan::Window(WindowPlan { input, .. }) => input.accept(visitor)?, LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?, LogicalPlan::Sort { input, .. } => input.accept(visitor)?, LogicalPlan::Join { left, right, .. } @@ -842,9 +864,9 @@ impl LogicalPlan { Ok(()) } - LogicalPlan::Projection { + LogicalPlan::Projection(ProjectionPlan { ref expr, alias, .. - } => { + }) => { write!(f, "Projection: ")?; for (i, expr_item) in expr.iter().enumerate() { if i > 0 { @@ -857,13 +879,13 @@ impl LogicalPlan { } Ok(()) } - LogicalPlan::Filter { + LogicalPlan::Filter(FilterPlan { predicate: ref expr, .. - } => write!(f, "Filter: {:?}", expr), - LogicalPlan::Window { + }) => write!(f, "Filter: {:?}", expr), + LogicalPlan::Window(WindowPlan { ref window_expr, .. - } => { + }) => { write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr) } LogicalPlan::Aggregate { diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index 8788c411ef2d1..72fccba9318d6 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -78,48 +78,49 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { - let arrays = to_arrays(expr, input, &mut expr_set)?; + LogicalPlan::Projection(projection_plan) => { + let arrays = + to_arrays(&projection_plan.expr, &projection_plan.input, &mut expr_set)?; let (mut new_expr, new_input) = rewrite_expr( - &[expr], + &[&projection_plan.expr], &[&arrays], - input, + &projection_plan.input, &mut expr_set, - schema, + &projection_plan.schema, execution_props, )?; Ok(LogicalPlan::Projection(ProjectionPlan { expr: new_expr.pop().unwrap(), input: Arc::new(new_input), - schema: schema.clone(), - alias: alias.clone(), + schema: projection_plan.schema.clone(), + alias: projection_plan.alias.clone(), })) } - LogicalPlan::Filter(FilterPlan { predicate, input }) => { + LogicalPlan::Filter(filter_plan) => { let schemas = plan.all_schemas(); let all_schema = schemas.into_iter().fold(DFSchema::empty(), |mut lhs, rhs| { lhs.merge(rhs); lhs }); - let data_type = predicate.get_type(&all_schema)?; + let data_type = filter_plan.predicate.get_type(&all_schema)?; let mut id_array = vec![]; - expr_to_identifier(predicate, &mut expr_set, &mut id_array, data_type)?; + expr_to_identifier( + &filter_plan.predicate, + &mut expr_set, + &mut id_array, + data_type, + )?; let (mut new_expr, new_input) = rewrite_expr( - &[&[predicate.clone()]], + &[&[filter_plan.predicate.clone()]], &[&[id_array]], - input, + &filter_plan.input, &mut expr_set, - input.schema(), + filter_plan.input.schema(), execution_props, )?; @@ -128,26 +129,23 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { - let arrays = to_arrays(window_expr, input, &mut expr_set)?; + LogicalPlan::Window(window_plan) => { + let arrays = + to_arrays(&window_plan.window_expr, &window_plan.input, &mut expr_set)?; let (mut new_expr, new_input) = rewrite_expr( - &[window_expr], + &[&window_plan.window_expr], &[&arrays], - input, + &window_plan.input, &mut expr_set, - schema, + &window_plan.schema, execution_props, )?; Ok(LogicalPlan::Window(WindowPlan { input: Arc::new(new_input), window_expr: new_expr.pop().unwrap(), - schema: schema.clone(), + schema: window_plan.schema.clone(), })) } LogicalPlan::Aggregate { diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index cc8f01193048a..06ad92b2047af 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -16,6 +16,7 @@ use crate::datasource::datasource::TableProviderFilterPushDown; use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::{FilterPlan, ProjectionPlan}; use crate::logical_plan::{ and, replace_col, Column, CrossJoin, LogicalPlan, TableScanPlan, }; @@ -181,10 +182,10 @@ fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan { and(acc, (*predicate).to_owned()) }); - LogicalPlan::Filter { + LogicalPlan::Filter(FilterPlan { predicate, input: Arc::new(plan), - } + }) } // remove all filters from `filters` that are in `predicate_columns` @@ -287,9 +288,9 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { push_down(&state, plan) } LogicalPlan::Analyze { .. } => push_down(&state, plan), - LogicalPlan::Filter { input, predicate } => { + LogicalPlan::Filter(filter_plan) => { let mut predicates = vec![]; - split_members(predicate, &mut predicates); + split_members(&filter_plan.predicate, &mut predicates); // Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.) let mut no_col_predicates = vec![]; @@ -311,17 +312,20 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // As those contain only literals, they could be optimized using constant folding // and removal of WHERE TRUE / WHERE FALSE if !no_col_predicates.is_empty() { - Ok(add_filter(optimize(input, state)?, &no_col_predicates)) + Ok(add_filter( + optimize(&filter_plan.input, state)?, + &no_col_predicates, + )) } else { - optimize(input, state) + optimize(&filter_plan.input, state) } } - LogicalPlan::Projection { + LogicalPlan::Projection(ProjectionPlan { input, expr, schema, alias: _, - } => { + }) => { // A projection is filter-commutable, but re-writes all predicate expressions // collect projection. let projection = schema diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs index ce8b5bcf3418d..f92f1214c087a 100644 --- a/datafusion/src/optimizer/limit_push_down.rs +++ b/datafusion/src/optimizer/limit_push_down.rs @@ -20,6 +20,7 @@ use super::utils; use crate::error::Result; use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::ProjectionPlan; use crate::logical_plan::TableScanPlan; use crate::logical_plan::{LogicalPlan, Union}; use crate::optimizer::optimizer::OptimizerRule; @@ -76,27 +77,19 @@ fn limit_push_down( .or(Some(upper_limit)), projected_schema: projected_schema.clone(), })), - ( - LogicalPlan::Projection { - expr, - input, - schema, - alias, - }, - upper_limit, - ) => { + (LogicalPlan::Projection(projection_plan), upper_limit) => { // Push down limit directly (projection doesn't change number of rows) - Ok(LogicalPlan::Projection { - expr: expr.clone(), + Ok(LogicalPlan::Projection(ProjectionPlan { + expr: projection_plan.expr.clone(), input: Arc::new(limit_push_down( optimizer, upper_limit, - input.as_ref(), + projection_plan.input.as_ref(), execution_props, )?), - schema: schema.clone(), - alias: alias.clone(), - }) + schema: projection_plan.schema.clone(), + alias: projection_plan.alias.clone(), + })) } ( LogicalPlan::Union(Union { diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index b725b1d124119..3b4105bde8f0b 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -20,7 +20,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionProps; -use crate::logical_plan::plan::{AnalyzePlan, TableScanPlan}; +use crate::logical_plan::plan::{AnalyzePlan, ProjectionPlan, TableScanPlan, WindowPlan}; use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, LogicalPlanBuilder, ToDFSchema, Union, @@ -131,12 +131,7 @@ fn optimize_plan( ) -> Result { let mut new_required_columns = required_columns.clone(); match plan { - LogicalPlan::Projection { - input, - expr, - schema, - alias, - } => { + LogicalPlan::Projection(projection_plan) => { // projection: // * remove any expression that is not required // * construct the new set of required columns @@ -145,17 +140,21 @@ fn optimize_plan( let mut new_fields = Vec::new(); // Gather all columns needed for expressions in this Projection - schema + projection_plan + .schema .fields() .iter() .enumerate() .try_for_each(|(i, field)| { if required_columns.contains(&field.qualified_column()) { - new_expr.push(expr[i].clone()); + new_expr.push(projection_plan.expr[i].clone()); new_fields.push(field.clone()); // gather the new set of required columns - utils::expr_to_columns(&expr[i], &mut new_required_columns) + utils::expr_to_columns( + &projection_plan.expr[i], + &mut new_required_columns, + ) } else { Ok(()) } @@ -163,7 +162,7 @@ fn optimize_plan( let new_input = optimize_plan( optimizer, - input, + &projection_plan.input, &new_required_columns, true, execution_props, @@ -182,12 +181,12 @@ fn optimize_plan( // no need for an expression at all Ok(new_input) } else { - Ok(LogicalPlan::Projection { + Ok(LogicalPlan::Projection(ProjectionPlan { expr: new_expr, input: Arc::new(new_input), schema: DFSchemaRef::new(DFSchema::new(new_fields)?), - alias: alias.clone(), - }) + alias: projection_plan.alias.clone(), + })) } } LogicalPlan::Join { @@ -236,12 +235,12 @@ fn optimize_plan( null_equals_null: *null_equals_null, }) } - LogicalPlan::Window { + LogicalPlan::Window(WindowPlan { schema, window_expr, input, .. - } => { + }) => { // Gather all columns needed for expressions in this Window let mut new_window_expr = Vec::new(); { @@ -745,12 +744,12 @@ mod tests { let expr = vec![col("a"), col("b")]; let projected_fields = exprlist_to_fields(&expr, input_schema).unwrap(); let projected_schema = DFSchema::new(projected_fields).unwrap(); - let plan = LogicalPlan::Projection { + let plan = LogicalPlan::Projection(ProjectionPlan { expr, input: Arc::new(table_scan), schema: Arc::new(projected_schema), alias: None, - }; + }); assert_fields_eq(&plan, vec!["a", "b"]); diff --git a/datafusion/src/optimizer/single_distinct_to_groupby.rs b/datafusion/src/optimizer/single_distinct_to_groupby.rs index f6178a2fe9b2f..0a7cff423bf8d 100644 --- a/datafusion/src/optimizer/single_distinct_to_groupby.rs +++ b/datafusion/src/optimizer/single_distinct_to_groupby.rs @@ -19,6 +19,7 @@ use crate::error::Result; use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::ProjectionPlan; use crate::logical_plan::{columnize_expr, DFSchema, Expr, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; @@ -124,12 +125,12 @@ fn optimize(plan: &LogicalPlan) -> Result { )); }); - Ok(LogicalPlan::Projection { + Ok(LogicalPlan::Projection(ProjectionPlan { expr: alias_expr, input: Arc::new(final_agg), schema: schema.clone(), alias: Option::None, - }) + })) } else { optimize_children(plan) } diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index b9a3b996377cb..eaaa49383db0b 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -23,7 +23,9 @@ use arrow::record_batch::RecordBatch; use super::optimizer::OptimizerRule; use crate::execution::context::{ExecutionContextState, ExecutionProps}; -use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan}; +use crate::logical_plan::plan::{ + AnalyzePlan, ExtensionPlan, FilterPlan, ProjectionPlan, WindowPlan, +}; use crate::logical_plan::{ build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr, ExprRewriter, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Recursion, @@ -145,12 +147,14 @@ pub fn from_plan( inputs: &[LogicalPlan], ) -> Result { match plan { - LogicalPlan::Projection { schema, alias, .. } => Ok(LogicalPlan::Projection { - expr: expr.to_vec(), - input: Arc::new(inputs[0].clone()), - schema: schema.clone(), - alias: alias.clone(), - }), + LogicalPlan::Projection(projection_plan) => { + Ok(LogicalPlan::Projection(ProjectionPlan { + expr: expr.to_vec(), + input: Arc::new(inputs[0].clone()), + schema: projection_plan.schema.clone(), + alias: projection_plan.alias.clone(), + })) + } LogicalPlan::Values { schema, .. } => Ok(LogicalPlan::Values { schema: schema.clone(), values: expr @@ -158,10 +162,10 @@ pub fn from_plan( .map(|s| s.to_vec()) .collect::>(), }), - LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter { + LogicalPlan::Filter(_) => Ok(LogicalPlan::Filter(FilterPlan { predicate: expr[0].clone(), input: Arc::new(inputs[0].clone()), - }), + })), LogicalPlan::Repartition(Repartition { partitioning_scheme, .. @@ -177,15 +181,11 @@ pub fn from_plan( input: Arc::new(inputs[0].clone()), })), }, - LogicalPlan::Window { - window_expr, - schema, - .. - } => Ok(LogicalPlan::Window { + LogicalPlan::Window(window_plan) => Ok(LogicalPlan::Window(WindowPlan { input: Arc::new(inputs[0].clone()), - window_expr: expr[0..window_expr.len()].to_vec(), - schema: schema.clone(), - }), + window_expr: expr[0..window_plan.window_expr.len()].to_vec(), + schema: window_plan.schema.clone(), + })), LogicalPlan::Aggregate { group_expr, schema, .. } => Ok(LogicalPlan::Aggregate { diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index ad858be84171d..10283f5652b2a 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -23,6 +23,7 @@ use super::{ hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, }; use crate::execution::context::ExecutionContextState; +use crate::logical_plan::plan::{FilterPlan, ProjectionPlan}; use crate::logical_plan::TableScanPlan; use crate::logical_plan::{ unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator, @@ -371,20 +372,18 @@ impl DefaultPhysicalPlanner { )?; Ok(Arc::new(value_exec)) } - LogicalPlan::Window { - input, window_expr, .. - } => { - if window_expr.is_empty() { + LogicalPlan::Window(window_plan) => { + if window_plan.window_expr.is_empty() { return Err(DataFusionError::Internal( "Impossibly got empty window expression".to_owned(), )); } - let input_exec = self.create_initial_plan(input, ctx_state).await?; + let input_exec = self.create_initial_plan(&window_plan.input, ctx_state).await?; // at this moment we are guaranteed by the logical planner // to have all the window_expr to have equal sort key - let partition_keys = window_expr_common_partition_keys(window_expr)?; + let partition_keys = window_expr_common_partition_keys(&window_plan.window_expr)?; let can_repartition = !partition_keys.is_empty() && ctx_state.config.target_partitions > 1 @@ -396,7 +395,7 @@ impl DefaultPhysicalPlanner { .map(|e| { self.create_physical_expr( e, - input.schema(), + window_plan.input.schema(), &input_exec.schema(), ctx_state, ) @@ -422,17 +421,17 @@ impl DefaultPhysicalPlanner { } => generate_sort_key(partition_by, order_by), _ => unreachable!(), }; - let sort_keys = get_sort_keys(&window_expr[0]); - if window_expr.len() > 1 { + let sort_keys = get_sort_keys(&window_plan.window_expr[0]); + if window_plan.window_expr.len() > 1 { debug_assert!( - window_expr[1..] + window_plan.window_expr[1..] .iter() .all(|expr| get_sort_keys(expr) == sort_keys), "all window expressions shall have the same sort keys, as guaranteed by logical planning" ); } - let logical_input_schema = input.schema(); + let logical_input_schema = window_plan.input.schema(); let input_exec = if sort_keys.is_empty() { input_exec @@ -466,7 +465,7 @@ impl DefaultPhysicalPlanner { }; let physical_input_schema = input_exec.schema(); - let window_expr = window_expr + let window_expr = window_plan.window_expr .iter() .map(|e| { self.create_window_expr( @@ -577,7 +576,7 @@ impl DefaultPhysicalPlanner { physical_input_schema.clone(), )?) ) } - LogicalPlan::Projection { input, expr, .. } => { + LogicalPlan::Projection(ProjectionPlan { input, expr, .. }) => { let input_exec = self.create_initial_plan(input, ctx_state).await?; let input_schema = input.as_ref().schema(); @@ -629,9 +628,9 @@ impl DefaultPhysicalPlanner { input_exec, )?) ) } - LogicalPlan::Filter { + LogicalPlan::Filter(FilterPlan { input, predicate, .. - } => { + }) => { let physical_input = self.create_initial_plan(input, ctx_state).await?; let input_schema = physical_input.as_ref().schema(); let input_dfschema = input.as_ref().schema(); diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index 25040a6ca6db7..c95b884e7941e 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -45,6 +45,8 @@ use std::sync::Arc; use std::task::{Context, Poll}; use async_trait::async_trait; +use datafusion::logical_plan::plan::ProjectionPlan; +use datafusion::logical_plan::LogicalPlan::Projection; //// Custom source dataframe tests //// @@ -216,7 +218,7 @@ async fn custom_source_dataframe() -> Result<()> { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match &**input { + LogicalPlan::Projection(ProjectionPlan { input, .. }) => match &**input { LogicalPlan::TableScan(TableScanPlan { source, projected_schema, From eb077391ef5aa101598c61a296d419cc0ee4999a Mon Sep 17 00:00:00 2001 From: liuli Date: Sat, 20 Nov 2021 00:00:34 +0800 Subject: [PATCH 4/5] solve conflicts --- datafusion/tests/custom_sources.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index c95b884e7941e..a33cf5c19fe45 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -46,7 +46,6 @@ use std::task::{Context, Poll}; use async_trait::async_trait; use datafusion::logical_plan::plan::ProjectionPlan; -use datafusion::logical_plan::LogicalPlan::Projection; //// Custom source dataframe tests //// From 4baf6153f5334f64f4c909edbb4fea741e5fb58b Mon Sep 17 00:00:00 2001 From: liuli Date: Sun, 21 Nov 2021 01:39:59 +0800 Subject: [PATCH 5/5] solve conflicts --- .../core/src/serde/logical_plan/to_proto.rs | 12 +- datafusion/src/execution/context.rs | 6 +- datafusion/src/logical_plan/builder.rs | 15 +-- datafusion/src/logical_plan/plan.rs | 110 +++++++++++------- .../src/optimizer/common_subexpr_eliminate.rs | 64 +++++----- datafusion/src/optimizer/filter_push_down.rs | 17 ++- datafusion/src/optimizer/limit_push_down.rs | 9 +- .../src/optimizer/projection_push_down.rs | 29 ++--- .../optimizer/single_distinct_to_groupby.rs | 4 +- datafusion/src/optimizer/utils.rs | 26 +++-- datafusion/src/physical_plan/planner.rs | 12 +- datafusion/tests/custom_sources.rs | 4 +- datafusion/tests/sql.rs | 4 +- 13 files changed, 168 insertions(+), 144 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 9c138101503ca..0e0b989a91fa6 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -30,7 +30,7 @@ use datafusion::datasource::TableProvider; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingTable; -use datafusion::logical_plan::plan::EmptyRelation; +use datafusion::logical_plan::plan::{EmptyRelation, Filter, Projection, Window}; use datafusion::logical_plan::{ exprlist_to_fields, window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, @@ -779,9 +779,9 @@ impl TryInto for &LogicalPlan { ))) } } - LogicalPlan::Projection { + LogicalPlan::Projection(Projection { expr, input, alias, .. - } => Ok(protobuf::LogicalPlanNode { + }) => Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Projection(Box::new( protobuf::ProjectionNode { input: Some(Box::new(input.as_ref().try_into()?)), @@ -796,7 +796,7 @@ impl TryInto for &LogicalPlan { }, ))), }), - LogicalPlan::Filter { predicate, input } => { + LogicalPlan::Filter(Filter { predicate, input }) => { let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Selection(Box::new( @@ -807,9 +807,9 @@ impl TryInto for &LogicalPlan { ))), }) } - LogicalPlan::Window { + LogicalPlan::Window(Window { input, window_expr, .. - } => { + }) => { let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Window(Box::new( diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index ee501c4c57a0b..9e5ac1f196789 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1177,7 +1177,7 @@ impl FunctionRegistry for ExecutionContextState { #[cfg(test)] mod tests { use super::*; - use crate::logical_plan::plan::ProjectionPlan; + use crate::logical_plan::plan::Projection; use crate::logical_plan::TableScanPlan; use crate::logical_plan::{binary_expr, lit, Operator}; use crate::physical_plan::functions::{make_scalar_function, Volatility}; @@ -1416,7 +1416,7 @@ mod tests { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::Projection(ProjectionPlan { input, .. }) => match &**input { + LogicalPlan::Projection(Projection { input, .. }) => match &**input { LogicalPlan::TableScan(TableScanPlan { source, projected_schema, @@ -1489,7 +1489,7 @@ mod tests { let ctx = ExecutionContext::new(); let optimized_plan = ctx.optimize(&plan)?; match &optimized_plan { - LogicalPlan::Projection(ProjectionPlan { input, .. }) => match &**input { + LogicalPlan::Projection(Projection { input, .. }) => match &**input { LogicalPlan::TableScan(TableScanPlan { source, projected_schema, diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 94e53d63bff8b..2eb0091389f71 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -26,7 +26,8 @@ use crate::datasource::{ }; use crate::error::{DataFusionError, Result}; use crate::logical_plan::plan::{ - AnalyzePlan, EmptyRelation, ExplainPlan, TableScanPlan, ToStringifiedPlan, Union, + AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Projection, TableScanPlan, + ToStringifiedPlan, Union, Window, }; use crate::prelude::*; use crate::scalar::ScalarValue; @@ -451,10 +452,10 @@ impl LogicalPlanBuilder { /// Apply a filter pub fn filter(&self, expr: impl Into) -> Result { let expr = normalize_col(expr.into(), &self.plan)?; - Ok(Self::from(LogicalPlan::Filter { + Ok(Self::from(LogicalPlan::Filter(Filter { predicate: expr, input: Arc::new(self.plan.clone()), - })) + }))) } /// Apply a limit @@ -658,11 +659,11 @@ impl LogicalPlanBuilder { let mut window_fields: Vec = exprlist_to_fields(all_expr, self.plan.schema())?; window_fields.extend_from_slice(self.plan.schema().fields()); - Ok(Self::from(LogicalPlan::Window { + Ok(Self::from(LogicalPlan::Window(Window { input: Arc::new(self.plan.clone()), window_expr, schema: Arc::new(DFSchema::new(window_fields)?), - })) + }))) } /// Apply an aggregate: grouping on the `group_expr` expressions @@ -898,12 +899,12 @@ pub fn project_with_alias( Some(ref alias) => input_schema.replace_qualifier(alias.as_str()), None => input_schema, }; - Ok(LogicalPlan::Projection { + Ok(LogicalPlan::Projection(Projection { expr: projected_expr, input: Arc::new(plan.clone()), schema: DFSchemaRef::new(schema), alias, - }) + })) } /// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 31de255b3d73b..a7aac90391208 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -57,6 +57,47 @@ pub enum JoinConstraint { Using, } +/// Evaluates an arbitrary list of expressions (essentially a +/// SELECT with an expression list) on its input. +#[derive(Clone)] +pub struct Projection { + /// The list of expressions + pub expr: Vec, + /// The incoming logical plan + pub input: Arc, + /// The schema description of the output + pub schema: DFSchemaRef, + /// Projection output relation alias + pub alias: Option, +} + +/// Filters rows from its input that do not match an +/// expression (essentially a WHERE clause with a predicate +/// expression). +/// +/// Semantically, `` is evaluated for each row of the input; +/// If the value of `` is true, the input row is passed to +/// the output. If the value of `` is false, the row is +/// discarded. +#[derive(Clone)] +pub struct Filter { + /// The predicate expression, which must have Boolean type. + pub predicate: Expr, + /// The incoming logical plan + pub input: Arc, +} + +/// Window its input based on a set of window spec and window function (e.g. SUM or RANK) +#[derive(Clone)] +pub struct Window { + /// The incoming logical plan + pub input: Arc, + /// The window function expression + pub window_expr: Vec, + /// The schema description of the window output + pub schema: DFSchemaRef, +} + /// Produces rows from a table provider by reference or from the context #[derive(Clone)] pub struct TableScanPlan { @@ -214,16 +255,7 @@ pub struct Values { pub enum LogicalPlan { /// Evaluates an arbitrary list of expressions (essentially a /// SELECT with an expression list) on its input. - Projection { - /// The list of expressions - expr: Vec, - /// The incoming logical plan - input: Arc, - /// The schema description of the output - schema: DFSchemaRef, - /// Projection output relation alias - alias: Option, - }, + Projection(Projection), /// Filters rows from its input that do not match an /// expression (essentially a WHERE clause with a predicate /// expression). @@ -232,21 +264,9 @@ pub enum LogicalPlan { /// If the value of `` is true, the input row is passed to /// the output. If the value of `` is false, the row is /// discarded. - Filter { - /// The predicate expression, which must have Boolean type. - predicate: Expr, - /// The incoming logical plan - input: Arc, - }, + Filter(Filter), /// Window its input based on a set of window spec and window function (e.g. SUM or RANK) - Window { - /// The incoming logical plan - input: Arc, - /// The window function expression - window_expr: Vec, - /// The schema description of the window output - schema: DFSchemaRef, - }, + Window(Window), /// Aggregates its input based on a set of grouping and aggregate /// expressions (e.g. SUM). Aggregate { @@ -324,9 +344,9 @@ impl LogicalPlan { LogicalPlan::TableScan(TableScanPlan { projected_schema, .. }) => projected_schema, - LogicalPlan::Projection { schema, .. } => schema, - LogicalPlan::Filter { input, .. } => input.schema(), - LogicalPlan::Window { schema, .. } => schema, + LogicalPlan::Projection(Projection { schema, .. }) => schema, + LogicalPlan::Filter(Filter { input, .. }) => input.schema(), + LogicalPlan::Window(Window { schema, .. }) => schema, LogicalPlan::Aggregate { schema, .. } => schema, LogicalPlan::Sort { input, .. } => input.schema(), LogicalPlan::Join { schema, .. } => schema, @@ -354,9 +374,9 @@ impl LogicalPlan { projected_schema, .. }) => vec![projected_schema], LogicalPlan::Values(Values { schema, .. }) => vec![schema], - LogicalPlan::Window { input, schema, .. } + LogicalPlan::Window(Window { input, schema, .. }) | LogicalPlan::Aggregate { input, schema, .. } - | LogicalPlan::Projection { input, schema, .. } => { + | LogicalPlan::Projection(Projection { input, schema, .. }) => { let mut schemas = input.all_schemas(); schemas.insert(0, schema); schemas @@ -391,7 +411,7 @@ impl LogicalPlan { | LogicalPlan::Repartition(Repartition { input, .. }) | LogicalPlan::Sort { input, .. } | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) - | LogicalPlan::Filter { input, .. } => input.all_schemas(), + | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(), LogicalPlan::DropTable(_) => vec![], } } @@ -409,11 +429,11 @@ impl LogicalPlan { /// children pub fn expressions(self: &LogicalPlan) -> Vec { match self { - LogicalPlan::Projection { expr, .. } => expr.clone(), + LogicalPlan::Projection(Projection { expr, .. }) => expr.clone(), LogicalPlan::Values(Values { values, .. }) => { values.iter().flatten().cloned().collect() } - LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()], + LogicalPlan::Filter(Filter { predicate, .. }) => vec![predicate.clone()], LogicalPlan::Repartition(Repartition { partitioning_scheme, .. @@ -421,7 +441,7 @@ impl LogicalPlan { Partitioning::Hash(expr, _) => expr.clone(), _ => vec![], }, - LogicalPlan::Window { window_expr, .. } => window_expr.clone(), + LogicalPlan::Window(Window { window_expr, .. }) => window_expr.clone(), LogicalPlan::Aggregate { group_expr, aggr_expr, @@ -453,10 +473,10 @@ impl LogicalPlan { /// include inputs to inputs. pub fn inputs(self: &LogicalPlan) -> Vec<&LogicalPlan> { match self { - LogicalPlan::Projection { input, .. } => vec![input], - LogicalPlan::Filter { input, .. } => vec![input], + LogicalPlan::Projection(Projection { input, .. }) => vec![input], + LogicalPlan::Filter(Filter { input, .. }) => vec![input], LogicalPlan::Repartition(Repartition { input, .. }) => vec![input], - LogicalPlan::Window { input, .. } => vec![input], + LogicalPlan::Window(Window { input, .. }) => vec![input], LogicalPlan::Aggregate { input, .. } => vec![input], LogicalPlan::Sort { input, .. } => vec![input], LogicalPlan::Join { left, right, .. } => vec![left, right], @@ -588,12 +608,12 @@ impl LogicalPlan { } let recurse = match self { - LogicalPlan::Projection { input, .. } => input.accept(visitor)?, - LogicalPlan::Filter { input, .. } => input.accept(visitor)?, + LogicalPlan::Projection(Projection { input, .. }) => input.accept(visitor)?, + LogicalPlan::Filter(Filter { input, .. }) => input.accept(visitor)?, LogicalPlan::Repartition(Repartition { input, .. }) => { input.accept(visitor)? } - LogicalPlan::Window { input, .. } => input.accept(visitor)?, + LogicalPlan::Window(Window { input, .. }) => input.accept(visitor)?, LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?, LogicalPlan::Sort { input, .. } => input.accept(visitor)?, LogicalPlan::Join { left, right, .. } @@ -856,9 +876,9 @@ impl LogicalPlan { Ok(()) } - LogicalPlan::Projection { + LogicalPlan::Projection(Projection { ref expr, alias, .. - } => { + }) => { write!(f, "Projection: ")?; for (i, expr_item) in expr.iter().enumerate() { if i > 0 { @@ -871,13 +891,13 @@ impl LogicalPlan { } Ok(()) } - LogicalPlan::Filter { + LogicalPlan::Filter(Filter { predicate: ref expr, .. - } => write!(f, "Filter: {:?}", expr), - LogicalPlan::Window { + }) => write!(f, "Filter: {:?}", expr), + LogicalPlan::Window(Window { ref window_expr, .. - } => { + }) => { write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr) } LogicalPlan::Aggregate { diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index c348a7011a277..2572f10dc22d3 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -19,7 +19,7 @@ use crate::error::Result; use crate::execution::context::ExecutionProps; -use crate::logical_plan::plan::{FilterPlan, ProjectionPlan, WindowPlan}; +use crate::logical_plan::plan::{Filter, Projection, Window}; use crate::logical_plan::{ col, DFField, DFSchema, Expr, ExprRewriter, ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion, @@ -78,74 +78,76 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { - let arrays = - to_arrays(&projection_plan.expr, &projection_plan.input, &mut expr_set)?; + LogicalPlan::Projection(Projection { + expr, + input, + schema, + alias, + }) => { + let arrays = to_arrays(expr, input, &mut expr_set)?; let (mut new_expr, new_input) = rewrite_expr( - &[&projection_plan.expr], + &[expr], &[&arrays], - &projection_plan.input, + input, &mut expr_set, - &projection_plan.schema, + schema, execution_props, )?; - Ok(LogicalPlan::Projection(ProjectionPlan { + Ok(LogicalPlan::Projection(Projection { expr: new_expr.pop().unwrap(), input: Arc::new(new_input), - schema: projection_plan.schema.clone(), - alias: projection_plan.alias.clone(), + schema: schema.clone(), + alias: alias.clone(), })) } - LogicalPlan::Filter(filter_plan) => { + LogicalPlan::Filter(Filter { predicate, input }) => { let schemas = plan.all_schemas(); let all_schema = schemas.into_iter().fold(DFSchema::empty(), |mut lhs, rhs| { lhs.merge(rhs); lhs }); - let data_type = filter_plan.predicate.get_type(&all_schema)?; + let data_type = predicate.get_type(&all_schema)?; let mut id_array = vec![]; - expr_to_identifier( - &filter_plan.predicate, - &mut expr_set, - &mut id_array, - data_type, - )?; + expr_to_identifier(predicate, &mut expr_set, &mut id_array, data_type)?; let (mut new_expr, new_input) = rewrite_expr( - &[&[filter_plan.predicate.clone()]], + &[&[predicate.clone()]], &[&[id_array]], - &filter_plan.input, + input, &mut expr_set, - filter_plan.input.schema(), + input.schema(), execution_props, )?; - Ok(LogicalPlan::Filter(FilterPlan { + Ok(LogicalPlan::Filter(Filter { predicate: new_expr.pop().unwrap().pop().unwrap(), input: Arc::new(new_input), })) } - LogicalPlan::Window(window_plan) => { - let arrays = - to_arrays(&window_plan.window_expr, &window_plan.input, &mut expr_set)?; + LogicalPlan::Window(Window { + input, + window_expr, + schema, + }) => { + let arrays = to_arrays(window_expr, input, &mut expr_set)?; let (mut new_expr, new_input) = rewrite_expr( - &[&window_plan.window_expr], + &[window_expr], &[&arrays], - &window_plan.input, + input, &mut expr_set, - &window_plan.schema, + schema, execution_props, )?; - Ok(LogicalPlan::Window(WindowPlan { + Ok(LogicalPlan::Window(Window { input: Arc::new(new_input), window_expr: new_expr.pop().unwrap(), - schema: window_plan.schema.clone(), + schema: schema.clone(), })) } LogicalPlan::Aggregate { @@ -264,7 +266,7 @@ fn build_project_plan( let mut schema = DFSchema::new(fields)?; schema.merge(input.schema()); - Ok(LogicalPlan::Projection(ProjectionPlan { + Ok(LogicalPlan::Projection(Projection { expr: project_exprs, input: Arc::new(input), schema: Arc::new(schema), diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index a26bd37401ba8..4214a576463e5 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -16,7 +16,7 @@ use crate::datasource::datasource::TableProviderFilterPushDown; use crate::execution::context::ExecutionProps; -use crate::logical_plan::plan::{FilterPlan, ProjectionPlan}; +use crate::logical_plan::plan::{Filter, Projection}; use crate::logical_plan::{ and, replace_col, Column, CrossJoin, Limit, LogicalPlan, TableScanPlan, }; @@ -182,7 +182,7 @@ fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan { and(acc, (*predicate).to_owned()) }); - LogicalPlan::Filter(FilterPlan { + LogicalPlan::Filter(Filter { predicate, input: Arc::new(plan), }) @@ -288,9 +288,9 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { push_down(&state, plan) } LogicalPlan::Analyze { .. } => push_down(&state, plan), - LogicalPlan::Filter(filter_plan) => { + LogicalPlan::Filter(Filter { input, predicate }) => { let mut predicates = vec![]; - split_members(&filter_plan.predicate, &mut predicates); + split_members(predicate, &mut predicates); // Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.) let mut no_col_predicates = vec![]; @@ -312,15 +312,12 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // As those contain only literals, they could be optimized using constant folding // and removal of WHERE TRUE / WHERE FALSE if !no_col_predicates.is_empty() { - Ok(add_filter( - optimize(&filter_plan.input, state)?, - &no_col_predicates, - )) + Ok(add_filter(optimize(input, state)?, &no_col_predicates)) } else { - optimize(&filter_plan.input, state) + optimize(input, state) } } - LogicalPlan::Projection(ProjectionPlan { + LogicalPlan::Projection(Projection { input, expr, schema, diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs index 320e1c7695710..e2c65de13f9f0 100644 --- a/datafusion/src/optimizer/limit_push_down.rs +++ b/datafusion/src/optimizer/limit_push_down.rs @@ -20,6 +20,7 @@ use super::utils; use crate::error::Result; use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::Projection; use crate::logical_plan::{Limit, TableScanPlan}; use crate::logical_plan::{LogicalPlan, Union}; use crate::optimizer::optimizer::OptimizerRule; @@ -77,16 +78,16 @@ fn limit_push_down( projected_schema: projected_schema.clone(), })), ( - LogicalPlan::Projection { + LogicalPlan::Projection(Projection { expr, input, schema, alias, - }, + }), upper_limit, ) => { // Push down limit directly (projection doesn't change number of rows) - Ok(LogicalPlan::Projection { + Ok(LogicalPlan::Projection(Projection { expr: expr.clone(), input: Arc::new(limit_push_down( optimizer, @@ -96,7 +97,7 @@ fn limit_push_down( )?), schema: schema.clone(), alias: alias.clone(), - }) + })) } ( LogicalPlan::Union(Union { diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 61a7c196aedbe..5b1f3ea1e5dc3 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -20,7 +20,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionProps; -use crate::logical_plan::plan::{AnalyzePlan, ProjectionPlan, TableScanPlan, WindowPlan}; +use crate::logical_plan::plan::{AnalyzePlan, Projection, TableScanPlan, Window}; use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, LogicalPlanBuilder, ToDFSchema, Union, @@ -131,7 +131,12 @@ fn optimize_plan( ) -> Result { let mut new_required_columns = required_columns.clone(); match plan { - LogicalPlan::Projection(projection_plan) => { + LogicalPlan::Projection(Projection { + input, + expr, + schema, + alias, + }) => { // projection: // * remove any expression that is not required // * construct the new set of required columns @@ -140,21 +145,17 @@ fn optimize_plan( let mut new_fields = Vec::new(); // Gather all columns needed for expressions in this Projection - projection_plan - .schema + schema .fields() .iter() .enumerate() .try_for_each(|(i, field)| { if required_columns.contains(&field.qualified_column()) { - new_expr.push(projection_plan.expr[i].clone()); + new_expr.push(expr[i].clone()); new_fields.push(field.clone()); // gather the new set of required columns - utils::expr_to_columns( - &projection_plan.expr[i], - &mut new_required_columns, - ) + utils::expr_to_columns(&expr[i], &mut new_required_columns) } else { Ok(()) } @@ -162,7 +163,7 @@ fn optimize_plan( let new_input = optimize_plan( optimizer, - &projection_plan.input, + input, &new_required_columns, true, execution_props, @@ -181,11 +182,11 @@ fn optimize_plan( // no need for an expression at all Ok(new_input) } else { - Ok(LogicalPlan::Projection(ProjectionPlan { + Ok(LogicalPlan::Projection(Projection { expr: new_expr, input: Arc::new(new_input), schema: DFSchemaRef::new(DFSchema::new(new_fields)?), - alias: projection_plan.alias.clone(), + alias: alias.clone(), })) } } @@ -235,7 +236,7 @@ fn optimize_plan( null_equals_null: *null_equals_null, }) } - LogicalPlan::Window(WindowPlan { + LogicalPlan::Window(Window { schema, window_expr, input, @@ -744,7 +745,7 @@ mod tests { let expr = vec![col("a"), col("b")]; let projected_fields = exprlist_to_fields(&expr, input_schema).unwrap(); let projected_schema = DFSchema::new(projected_fields).unwrap(); - let plan = LogicalPlan::Projection(ProjectionPlan { + let plan = LogicalPlan::Projection(Projection { expr, input: Arc::new(table_scan), schema: Arc::new(projected_schema), diff --git a/datafusion/src/optimizer/single_distinct_to_groupby.rs b/datafusion/src/optimizer/single_distinct_to_groupby.rs index 0a7cff423bf8d..358444d9c9a63 100644 --- a/datafusion/src/optimizer/single_distinct_to_groupby.rs +++ b/datafusion/src/optimizer/single_distinct_to_groupby.rs @@ -19,7 +19,7 @@ use crate::error::Result; use crate::execution::context::ExecutionProps; -use crate::logical_plan::plan::ProjectionPlan; +use crate::logical_plan::plan::Projection; use crate::logical_plan::{columnize_expr, DFSchema, Expr, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; @@ -125,7 +125,7 @@ fn optimize(plan: &LogicalPlan) -> Result { )); }); - Ok(LogicalPlan::Projection(ProjectionPlan { + Ok(LogicalPlan::Projection(Projection { expr: alias_expr, input: Arc::new(final_agg), schema: schema.clone(), diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 05c362988570d..aa559cd84cef5 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -23,7 +23,7 @@ use arrow::record_batch::RecordBatch; use super::optimizer::OptimizerRule; use crate::execution::context::{ExecutionContextState, ExecutionProps}; -use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan}; +use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan, Filter, Projection, Window}; use crate::logical_plan::{ build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr, ExprRewriter, Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, @@ -145,12 +145,14 @@ pub fn from_plan( inputs: &[LogicalPlan], ) -> Result { match plan { - LogicalPlan::Projection { schema, alias, .. } => Ok(LogicalPlan::Projection { - expr: expr.to_vec(), - input: Arc::new(inputs[0].clone()), - schema: schema.clone(), - alias: alias.clone(), - }), + LogicalPlan::Projection(Projection { schema, alias, .. }) => { + Ok(LogicalPlan::Projection(Projection { + expr: expr.to_vec(), + input: Arc::new(inputs[0].clone()), + schema: schema.clone(), + alias: alias.clone(), + })) + } LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), values: expr @@ -158,10 +160,10 @@ pub fn from_plan( .map(|s| s.to_vec()) .collect::>(), })), - LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter { + LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter { predicate: expr[0].clone(), input: Arc::new(inputs[0].clone()), - }), + })), LogicalPlan::Repartition(Repartition { partitioning_scheme, .. @@ -177,15 +179,15 @@ pub fn from_plan( input: Arc::new(inputs[0].clone()), })), }, - LogicalPlan::Window { + LogicalPlan::Window(Window { window_expr, schema, .. - } => Ok(LogicalPlan::Window { + }) => Ok(LogicalPlan::Window(Window { input: Arc::new(inputs[0].clone()), window_expr: expr[0..window_expr.len()].to_vec(), schema: schema.clone(), - }), + })), LogicalPlan::Aggregate { group_expr, schema, .. } => Ok(LogicalPlan::Aggregate { diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 44bd4b16bb5c6..594a0dda9d031 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -23,7 +23,7 @@ use super::{ hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, }; use crate::execution::context::ExecutionContextState; -use crate::logical_plan::plan::EmptyRelation; +use crate::logical_plan::plan::{EmptyRelation, Filter, Projection, Window}; use crate::logical_plan::{ unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType, Repartition, ToStringifiedPlan, Union, @@ -372,9 +372,9 @@ impl DefaultPhysicalPlanner { )?; Ok(Arc::new(value_exec)) } - LogicalPlan::Window { + LogicalPlan::Window(Window { input, window_expr, .. - } => { + }) => { if window_expr.is_empty() { return Err(DataFusionError::Internal( "Impossibly got empty window expression".to_owned(), @@ -578,7 +578,7 @@ impl DefaultPhysicalPlanner { physical_input_schema.clone(), )?) ) } - LogicalPlan::Projection { input, expr, .. } => { + LogicalPlan::Projection(Projection { input, expr, .. }) => { let input_exec = self.create_initial_plan(input, ctx_state).await?; let input_schema = input.as_ref().schema(); @@ -630,9 +630,9 @@ impl DefaultPhysicalPlanner { input_exec, )?) ) } - LogicalPlan::Filter { + LogicalPlan::Filter(Filter { input, predicate, .. - } => { + }) => { let physical_input = self.create_initial_plan(input, ctx_state).await?; let input_schema = physical_input.as_ref().schema(); let input_dfschema = input.as_ref().schema(); diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index a33cf5c19fe45..a145ca3fdf880 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -45,7 +45,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use async_trait::async_trait; -use datafusion::logical_plan::plan::ProjectionPlan; +use datafusion::logical_plan::plan::Projection; //// Custom source dataframe tests //// @@ -217,7 +217,7 @@ async fn custom_source_dataframe() -> Result<()> { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::Projection(ProjectionPlan { input, .. }) => match &**input { + LogicalPlan::Projection(Projection { input, .. }) => match &**input { LogicalPlan::TableScan(TableScanPlan { source, projected_schema, diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 5970692cedc92..42763b07f3167 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -36,7 +36,7 @@ use datafusion::assert_batches_eq; use datafusion::assert_batches_sorted_eq; use datafusion::assert_contains; use datafusion::assert_not_contains; -use datafusion::logical_plan::plan::ProjectionPlan; +use datafusion::logical_plan::plan::Projection; use datafusion::logical_plan::LogicalPlan; use datafusion::logical_plan::TableScanPlan; use datafusion::physical_plan::functions::Volatility; @@ -91,7 +91,7 @@ async fn nyc() -> Result<()> { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::Projection(ProjectionPlan { input, .. }) => match input.as_ref() { + LogicalPlan::Projection(Projection { input, .. }) => match input.as_ref() { LogicalPlan::Aggregate { input, .. } => match input.as_ref() { LogicalPlan::TableScan(TableScanPlan { ref projected_schema,