diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 9c138101503ca..0e0b989a91fa6 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -30,7 +30,7 @@ use datafusion::datasource::TableProvider; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingTable; -use datafusion::logical_plan::plan::EmptyRelation; +use datafusion::logical_plan::plan::{EmptyRelation, Filter, Projection, Window}; use datafusion::logical_plan::{ exprlist_to_fields, window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, @@ -779,9 +779,9 @@ impl TryInto for &LogicalPlan { ))) } } - LogicalPlan::Projection { + LogicalPlan::Projection(Projection { expr, input, alias, .. - } => Ok(protobuf::LogicalPlanNode { + }) => Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Projection(Box::new( protobuf::ProjectionNode { input: Some(Box::new(input.as_ref().try_into()?)), @@ -796,7 +796,7 @@ impl TryInto for &LogicalPlan { }, ))), }), - LogicalPlan::Filter { predicate, input } => { + LogicalPlan::Filter(Filter { predicate, input }) => { let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Selection(Box::new( @@ -807,9 +807,9 @@ impl TryInto for &LogicalPlan { ))), }) } - LogicalPlan::Window { + LogicalPlan::Window(Window { input, window_expr, .. - } => { + }) => { let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Window(Box::new( diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 5f77b2bfaa6bc..9e5ac1f196789 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1177,6 +1177,7 @@ impl FunctionRegistry for ExecutionContextState { #[cfg(test)] mod tests { use super::*; + use crate::logical_plan::plan::Projection; use crate::logical_plan::TableScanPlan; use crate::logical_plan::{binary_expr, lit, Operator}; use crate::physical_plan::functions::{make_scalar_function, Volatility}; @@ -1415,7 +1416,7 @@ mod tests { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match &**input { + LogicalPlan::Projection(Projection { input, .. }) => match &**input { LogicalPlan::TableScan(TableScanPlan { source, projected_schema, @@ -1488,7 +1489,7 @@ mod tests { let ctx = ExecutionContext::new(); let optimized_plan = ctx.optimize(&plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match &**input { + LogicalPlan::Projection(Projection { input, .. }) => match &**input { LogicalPlan::TableScan(TableScanPlan { source, projected_schema, diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 94e53d63bff8b..2eb0091389f71 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -26,7 +26,8 @@ use crate::datasource::{ }; use crate::error::{DataFusionError, Result}; use crate::logical_plan::plan::{ - AnalyzePlan, EmptyRelation, ExplainPlan, TableScanPlan, ToStringifiedPlan, Union, + AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Projection, TableScanPlan, + ToStringifiedPlan, Union, Window, }; use crate::prelude::*; use crate::scalar::ScalarValue; @@ -451,10 +452,10 @@ impl LogicalPlanBuilder { /// Apply a filter pub fn filter(&self, expr: impl Into) -> Result { let expr = normalize_col(expr.into(), &self.plan)?; - Ok(Self::from(LogicalPlan::Filter { + Ok(Self::from(LogicalPlan::Filter(Filter { predicate: expr, input: Arc::new(self.plan.clone()), - })) + }))) } /// Apply a limit @@ -658,11 +659,11 @@ impl LogicalPlanBuilder { let mut window_fields: Vec = exprlist_to_fields(all_expr, self.plan.schema())?; window_fields.extend_from_slice(self.plan.schema().fields()); - Ok(Self::from(LogicalPlan::Window { + Ok(Self::from(LogicalPlan::Window(Window { input: Arc::new(self.plan.clone()), window_expr, schema: Arc::new(DFSchema::new(window_fields)?), - })) + }))) } /// Apply an aggregate: grouping on the `group_expr` expressions @@ -898,12 +899,12 @@ pub fn project_with_alias( Some(ref alias) => input_schema.replace_qualifier(alias.as_str()), None => input_schema, }; - Ok(LogicalPlan::Projection { + Ok(LogicalPlan::Projection(Projection { expr: projected_expr, input: Arc::new(plan.clone()), schema: DFSchemaRef::new(schema), alias, - }) + })) } /// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 31de255b3d73b..a7aac90391208 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -57,6 +57,47 @@ pub enum JoinConstraint { Using, } +/// Evaluates an arbitrary list of expressions (essentially a +/// SELECT with an expression list) on its input. +#[derive(Clone)] +pub struct Projection { + /// The list of expressions + pub expr: Vec, + /// The incoming logical plan + pub input: Arc, + /// The schema description of the output + pub schema: DFSchemaRef, + /// Projection output relation alias + pub alias: Option, +} + +/// Filters rows from its input that do not match an +/// expression (essentially a WHERE clause with a predicate +/// expression). +/// +/// Semantically, `` is evaluated for each row of the input; +/// If the value of `` is true, the input row is passed to +/// the output. If the value of `` is false, the row is +/// discarded. +#[derive(Clone)] +pub struct Filter { + /// The predicate expression, which must have Boolean type. + pub predicate: Expr, + /// The incoming logical plan + pub input: Arc, +} + +/// Window its input based on a set of window spec and window function (e.g. SUM or RANK) +#[derive(Clone)] +pub struct Window { + /// The incoming logical plan + pub input: Arc, + /// The window function expression + pub window_expr: Vec, + /// The schema description of the window output + pub schema: DFSchemaRef, +} + /// Produces rows from a table provider by reference or from the context #[derive(Clone)] pub struct TableScanPlan { @@ -214,16 +255,7 @@ pub struct Values { pub enum LogicalPlan { /// Evaluates an arbitrary list of expressions (essentially a /// SELECT with an expression list) on its input. - Projection { - /// The list of expressions - expr: Vec, - /// The incoming logical plan - input: Arc, - /// The schema description of the output - schema: DFSchemaRef, - /// Projection output relation alias - alias: Option, - }, + Projection(Projection), /// Filters rows from its input that do not match an /// expression (essentially a WHERE clause with a predicate /// expression). @@ -232,21 +264,9 @@ pub enum LogicalPlan { /// If the value of `` is true, the input row is passed to /// the output. If the value of `` is false, the row is /// discarded. - Filter { - /// The predicate expression, which must have Boolean type. - predicate: Expr, - /// The incoming logical plan - input: Arc, - }, + Filter(Filter), /// Window its input based on a set of window spec and window function (e.g. SUM or RANK) - Window { - /// The incoming logical plan - input: Arc, - /// The window function expression - window_expr: Vec, - /// The schema description of the window output - schema: DFSchemaRef, - }, + Window(Window), /// Aggregates its input based on a set of grouping and aggregate /// expressions (e.g. SUM). Aggregate { @@ -324,9 +344,9 @@ impl LogicalPlan { LogicalPlan::TableScan(TableScanPlan { projected_schema, .. }) => projected_schema, - LogicalPlan::Projection { schema, .. } => schema, - LogicalPlan::Filter { input, .. } => input.schema(), - LogicalPlan::Window { schema, .. } => schema, + LogicalPlan::Projection(Projection { schema, .. }) => schema, + LogicalPlan::Filter(Filter { input, .. }) => input.schema(), + LogicalPlan::Window(Window { schema, .. }) => schema, LogicalPlan::Aggregate { schema, .. } => schema, LogicalPlan::Sort { input, .. } => input.schema(), LogicalPlan::Join { schema, .. } => schema, @@ -354,9 +374,9 @@ impl LogicalPlan { projected_schema, .. }) => vec![projected_schema], LogicalPlan::Values(Values { schema, .. }) => vec![schema], - LogicalPlan::Window { input, schema, .. } + LogicalPlan::Window(Window { input, schema, .. }) | LogicalPlan::Aggregate { input, schema, .. } - | LogicalPlan::Projection { input, schema, .. } => { + | LogicalPlan::Projection(Projection { input, schema, .. }) => { let mut schemas = input.all_schemas(); schemas.insert(0, schema); schemas @@ -391,7 +411,7 @@ impl LogicalPlan { | LogicalPlan::Repartition(Repartition { input, .. }) | LogicalPlan::Sort { input, .. } | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) - | LogicalPlan::Filter { input, .. } => input.all_schemas(), + | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(), LogicalPlan::DropTable(_) => vec![], } } @@ -409,11 +429,11 @@ impl LogicalPlan { /// children pub fn expressions(self: &LogicalPlan) -> Vec { match self { - LogicalPlan::Projection { expr, .. } => expr.clone(), + LogicalPlan::Projection(Projection { expr, .. }) => expr.clone(), LogicalPlan::Values(Values { values, .. }) => { values.iter().flatten().cloned().collect() } - LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()], + LogicalPlan::Filter(Filter { predicate, .. }) => vec![predicate.clone()], LogicalPlan::Repartition(Repartition { partitioning_scheme, .. @@ -421,7 +441,7 @@ impl LogicalPlan { Partitioning::Hash(expr, _) => expr.clone(), _ => vec![], }, - LogicalPlan::Window { window_expr, .. } => window_expr.clone(), + LogicalPlan::Window(Window { window_expr, .. }) => window_expr.clone(), LogicalPlan::Aggregate { group_expr, aggr_expr, @@ -453,10 +473,10 @@ impl LogicalPlan { /// include inputs to inputs. pub fn inputs(self: &LogicalPlan) -> Vec<&LogicalPlan> { match self { - LogicalPlan::Projection { input, .. } => vec![input], - LogicalPlan::Filter { input, .. } => vec![input], + LogicalPlan::Projection(Projection { input, .. }) => vec![input], + LogicalPlan::Filter(Filter { input, .. }) => vec![input], LogicalPlan::Repartition(Repartition { input, .. }) => vec![input], - LogicalPlan::Window { input, .. } => vec![input], + LogicalPlan::Window(Window { input, .. }) => vec![input], LogicalPlan::Aggregate { input, .. } => vec![input], LogicalPlan::Sort { input, .. } => vec![input], LogicalPlan::Join { left, right, .. } => vec![left, right], @@ -588,12 +608,12 @@ impl LogicalPlan { } let recurse = match self { - LogicalPlan::Projection { input, .. } => input.accept(visitor)?, - LogicalPlan::Filter { input, .. } => input.accept(visitor)?, + LogicalPlan::Projection(Projection { input, .. }) => input.accept(visitor)?, + LogicalPlan::Filter(Filter { input, .. }) => input.accept(visitor)?, LogicalPlan::Repartition(Repartition { input, .. }) => { input.accept(visitor)? } - LogicalPlan::Window { input, .. } => input.accept(visitor)?, + LogicalPlan::Window(Window { input, .. }) => input.accept(visitor)?, LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?, LogicalPlan::Sort { input, .. } => input.accept(visitor)?, LogicalPlan::Join { left, right, .. } @@ -856,9 +876,9 @@ impl LogicalPlan { Ok(()) } - LogicalPlan::Projection { + LogicalPlan::Projection(Projection { ref expr, alias, .. - } => { + }) => { write!(f, "Projection: ")?; for (i, expr_item) in expr.iter().enumerate() { if i > 0 { @@ -871,13 +891,13 @@ impl LogicalPlan { } Ok(()) } - LogicalPlan::Filter { + LogicalPlan::Filter(Filter { predicate: ref expr, .. - } => write!(f, "Filter: {:?}", expr), - LogicalPlan::Window { + }) => write!(f, "Filter: {:?}", expr), + LogicalPlan::Window(Window { ref window_expr, .. - } => { + }) => { write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr) } LogicalPlan::Aggregate { diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index c8f14040e8861..2572f10dc22d3 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -19,6 +19,7 @@ use crate::error::Result; use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::{Filter, Projection, Window}; use crate::logical_plan::{ col, DFField, DFSchema, Expr, ExprRewriter, ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion, @@ -77,12 +78,12 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { + }) => { let arrays = to_arrays(expr, input, &mut expr_set)?; let (mut new_expr, new_input) = rewrite_expr( @@ -94,14 +95,14 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { + LogicalPlan::Filter(Filter { predicate, input }) => { let schemas = plan.all_schemas(); let all_schema = schemas.into_iter().fold(DFSchema::empty(), |mut lhs, rhs| { @@ -122,16 +123,16 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { + }) => { let arrays = to_arrays(window_expr, input, &mut expr_set)?; let (mut new_expr, new_input) = rewrite_expr( @@ -143,11 +144,11 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result LogicalPlan { and(acc, (*predicate).to_owned()) }); - LogicalPlan::Filter { + LogicalPlan::Filter(Filter { predicate, input: Arc::new(plan), - } + }) } // remove all filters from `filters` that are in `predicate_columns` @@ -287,7 +288,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { push_down(&state, plan) } LogicalPlan::Analyze { .. } => push_down(&state, plan), - LogicalPlan::Filter { input, predicate } => { + LogicalPlan::Filter(Filter { input, predicate }) => { let mut predicates = vec![]; split_members(predicate, &mut predicates); @@ -316,12 +317,12 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { optimize(input, state) } } - LogicalPlan::Projection { + LogicalPlan::Projection(Projection { input, expr, schema, alias: _, - } => { + }) => { // A projection is filter-commutable, but re-writes all predicate expressions // collect projection. let projection = schema diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs index 320e1c7695710..e2c65de13f9f0 100644 --- a/datafusion/src/optimizer/limit_push_down.rs +++ b/datafusion/src/optimizer/limit_push_down.rs @@ -20,6 +20,7 @@ use super::utils; use crate::error::Result; use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::Projection; use crate::logical_plan::{Limit, TableScanPlan}; use crate::logical_plan::{LogicalPlan, Union}; use crate::optimizer::optimizer::OptimizerRule; @@ -77,16 +78,16 @@ fn limit_push_down( projected_schema: projected_schema.clone(), })), ( - LogicalPlan::Projection { + LogicalPlan::Projection(Projection { expr, input, schema, alias, - }, + }), upper_limit, ) => { // Push down limit directly (projection doesn't change number of rows) - Ok(LogicalPlan::Projection { + Ok(LogicalPlan::Projection(Projection { expr: expr.clone(), input: Arc::new(limit_push_down( optimizer, @@ -96,7 +97,7 @@ fn limit_push_down( )?), schema: schema.clone(), alias: alias.clone(), - }) + })) } ( LogicalPlan::Union(Union { diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index b6f0ff5e96fed..5b1f3ea1e5dc3 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -20,7 +20,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionProps; -use crate::logical_plan::plan::{AnalyzePlan, TableScanPlan}; +use crate::logical_plan::plan::{AnalyzePlan, Projection, TableScanPlan, Window}; use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, LogicalPlanBuilder, ToDFSchema, Union, @@ -131,12 +131,12 @@ fn optimize_plan( ) -> Result { let mut new_required_columns = required_columns.clone(); match plan { - LogicalPlan::Projection { + LogicalPlan::Projection(Projection { input, expr, schema, alias, - } => { + }) => { // projection: // * remove any expression that is not required // * construct the new set of required columns @@ -182,12 +182,12 @@ fn optimize_plan( // no need for an expression at all Ok(new_input) } else { - Ok(LogicalPlan::Projection { + Ok(LogicalPlan::Projection(Projection { expr: new_expr, input: Arc::new(new_input), schema: DFSchemaRef::new(DFSchema::new(new_fields)?), alias: alias.clone(), - }) + })) } } LogicalPlan::Join { @@ -236,12 +236,12 @@ fn optimize_plan( null_equals_null: *null_equals_null, }) } - LogicalPlan::Window { + LogicalPlan::Window(Window { schema, window_expr, input, .. - } => { + }) => { // Gather all columns needed for expressions in this Window let mut new_window_expr = Vec::new(); { @@ -745,12 +745,12 @@ mod tests { let expr = vec![col("a"), col("b")]; let projected_fields = exprlist_to_fields(&expr, input_schema).unwrap(); let projected_schema = DFSchema::new(projected_fields).unwrap(); - let plan = LogicalPlan::Projection { + let plan = LogicalPlan::Projection(Projection { expr, input: Arc::new(table_scan), schema: Arc::new(projected_schema), alias: None, - }; + }); assert_fields_eq(&plan, vec!["a", "b"]); diff --git a/datafusion/src/optimizer/single_distinct_to_groupby.rs b/datafusion/src/optimizer/single_distinct_to_groupby.rs index f6178a2fe9b2f..358444d9c9a63 100644 --- a/datafusion/src/optimizer/single_distinct_to_groupby.rs +++ b/datafusion/src/optimizer/single_distinct_to_groupby.rs @@ -19,6 +19,7 @@ use crate::error::Result; use crate::execution::context::ExecutionProps; +use crate::logical_plan::plan::Projection; use crate::logical_plan::{columnize_expr, DFSchema, Expr, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; @@ -124,12 +125,12 @@ fn optimize(plan: &LogicalPlan) -> Result { )); }); - Ok(LogicalPlan::Projection { + Ok(LogicalPlan::Projection(Projection { expr: alias_expr, input: Arc::new(final_agg), schema: schema.clone(), alias: Option::None, - }) + })) } else { optimize_children(plan) } diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 05c362988570d..aa559cd84cef5 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -23,7 +23,7 @@ use arrow::record_batch::RecordBatch; use super::optimizer::OptimizerRule; use crate::execution::context::{ExecutionContextState, ExecutionProps}; -use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan}; +use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan, Filter, Projection, Window}; use crate::logical_plan::{ build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr, ExprRewriter, Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, @@ -145,12 +145,14 @@ pub fn from_plan( inputs: &[LogicalPlan], ) -> Result { match plan { - LogicalPlan::Projection { schema, alias, .. } => Ok(LogicalPlan::Projection { - expr: expr.to_vec(), - input: Arc::new(inputs[0].clone()), - schema: schema.clone(), - alias: alias.clone(), - }), + LogicalPlan::Projection(Projection { schema, alias, .. }) => { + Ok(LogicalPlan::Projection(Projection { + expr: expr.to_vec(), + input: Arc::new(inputs[0].clone()), + schema: schema.clone(), + alias: alias.clone(), + })) + } LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), values: expr @@ -158,10 +160,10 @@ pub fn from_plan( .map(|s| s.to_vec()) .collect::>(), })), - LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter { + LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter { predicate: expr[0].clone(), input: Arc::new(inputs[0].clone()), - }), + })), LogicalPlan::Repartition(Repartition { partitioning_scheme, .. @@ -177,15 +179,15 @@ pub fn from_plan( input: Arc::new(inputs[0].clone()), })), }, - LogicalPlan::Window { + LogicalPlan::Window(Window { window_expr, schema, .. - } => Ok(LogicalPlan::Window { + }) => Ok(LogicalPlan::Window(Window { input: Arc::new(inputs[0].clone()), window_expr: expr[0..window_expr.len()].to_vec(), schema: schema.clone(), - }), + })), LogicalPlan::Aggregate { group_expr, schema, .. } => Ok(LogicalPlan::Aggregate { diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 44bd4b16bb5c6..594a0dda9d031 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -23,7 +23,7 @@ use super::{ hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, }; use crate::execution::context::ExecutionContextState; -use crate::logical_plan::plan::EmptyRelation; +use crate::logical_plan::plan::{EmptyRelation, Filter, Projection, Window}; use crate::logical_plan::{ unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType, Repartition, ToStringifiedPlan, Union, @@ -372,9 +372,9 @@ impl DefaultPhysicalPlanner { )?; Ok(Arc::new(value_exec)) } - LogicalPlan::Window { + LogicalPlan::Window(Window { input, window_expr, .. - } => { + }) => { if window_expr.is_empty() { return Err(DataFusionError::Internal( "Impossibly got empty window expression".to_owned(), @@ -578,7 +578,7 @@ impl DefaultPhysicalPlanner { physical_input_schema.clone(), )?) ) } - LogicalPlan::Projection { input, expr, .. } => { + LogicalPlan::Projection(Projection { input, expr, .. }) => { let input_exec = self.create_initial_plan(input, ctx_state).await?; let input_schema = input.as_ref().schema(); @@ -630,9 +630,9 @@ impl DefaultPhysicalPlanner { input_exec, )?) ) } - LogicalPlan::Filter { + LogicalPlan::Filter(Filter { input, predicate, .. - } => { + }) => { let physical_input = self.create_initial_plan(input, ctx_state).await?; let input_schema = physical_input.as_ref().schema(); let input_dfschema = input.as_ref().schema(); diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index 25040a6ca6db7..a145ca3fdf880 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -45,6 +45,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use async_trait::async_trait; +use datafusion::logical_plan::plan::Projection; //// Custom source dataframe tests //// @@ -216,7 +217,7 @@ async fn custom_source_dataframe() -> Result<()> { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match &**input { + LogicalPlan::Projection(Projection { input, .. }) => match &**input { LogicalPlan::TableScan(TableScanPlan { source, projected_schema, diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index c910e55d2384b..42763b07f3167 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -36,6 +36,7 @@ use datafusion::assert_batches_eq; use datafusion::assert_batches_sorted_eq; use datafusion::assert_contains; use datafusion::assert_not_contains; +use datafusion::logical_plan::plan::Projection; use datafusion::logical_plan::LogicalPlan; use datafusion::logical_plan::TableScanPlan; use datafusion::physical_plan::functions::Volatility; @@ -90,7 +91,7 @@ async fn nyc() -> Result<()> { let optimized_plan = ctx.optimize(&logical_plan)?; match &optimized_plan { - LogicalPlan::Projection { input, .. } => match input.as_ref() { + LogicalPlan::Projection(Projection { input, .. }) => match input.as_ref() { LogicalPlan::Aggregate { input, .. } => match input.as_ref() { LogicalPlan::TableScan(TableScanPlan { ref projected_schema,