diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 0e0b989a91fa6..897558ca0a149 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -30,7 +30,9 @@ use datafusion::datasource::TableProvider; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingTable; -use datafusion::logical_plan::plan::{EmptyRelation, Filter, Projection, Window}; +use datafusion::logical_plan::plan::{ + Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window, +}; use datafusion::logical_plan::{ exprlist_to_fields, window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, @@ -823,12 +825,12 @@ impl TryInto for &LogicalPlan { ))), }) } - LogicalPlan::Aggregate { - input, + LogicalPlan::Aggregate(Aggregate { group_expr, aggr_expr, + input, .. - } => { + }) => { let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?; Ok(protobuf::LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new( @@ -846,7 +848,7 @@ impl TryInto for &LogicalPlan { ))), }) } - LogicalPlan::Join { + LogicalPlan::Join(Join { left, right, on, @@ -854,7 +856,7 @@ impl TryInto for &LogicalPlan { join_constraint, null_equals_null, .. - } => { + }) => { let left: protobuf::LogicalPlanNode = left.as_ref().try_into()?; let right: protobuf::LogicalPlanNode = right.as_ref().try_into()?; let (left_join_column, right_join_column) = @@ -887,7 +889,7 @@ impl TryInto for &LogicalPlan { ))), }) } - LogicalPlan::Sort { input, expr } => { + LogicalPlan::Sort(Sort { input, expr }) => { let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?; let selection_expr: Vec = expr .iter() diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 2eb0091389f71..e7f34412568da 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -26,8 +26,8 @@ use crate::datasource::{ }; use crate::error::{DataFusionError, Result}; use crate::logical_plan::plan::{ - AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Projection, TableScanPlan, - ToStringifiedPlan, Union, Window, + Aggregate, AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Join, Projection, Sort, + TableScanPlan, ToStringifiedPlan, Union, Window, }; use crate::prelude::*; use crate::scalar::ScalarValue; @@ -468,10 +468,10 @@ impl LogicalPlanBuilder { /// Apply a sort pub fn sort(&self, exprs: impl IntoIterator>) -> Result { - Ok(Self::from(LogicalPlan::Sort { + Ok(Self::from(LogicalPlan::Sort(Sort { expr: normalize_cols(exprs, &self.plan)?, input: Arc::new(self.plan.clone()), - })) + }))) } /// Apply a union @@ -587,7 +587,7 @@ impl LogicalPlanBuilder { let join_schema = build_join_schema(self.plan.schema(), right.schema(), &join_type)?; - Ok(Self::from(LogicalPlan::Join { + Ok(Self::from(LogicalPlan::Join(Join { left: Arc::new(self.plan.clone()), right: Arc::new(right.clone()), on, @@ -595,7 +595,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, schema: DFSchemaRef::new(join_schema), null_equals_null, - })) + }))) } /// Apply a join with using constraint, which duplicates all join columns in output schema. @@ -619,7 +619,7 @@ impl LogicalPlanBuilder { let join_schema = build_join_schema(self.plan.schema(), right.schema(), &join_type)?; - Ok(Self::from(LogicalPlan::Join { + Ok(Self::from(LogicalPlan::Join(Join { left: Arc::new(self.plan.clone()), right: Arc::new(right.clone()), on, @@ -627,7 +627,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::Using, schema: DFSchemaRef::new(join_schema), null_equals_null: false, - })) + }))) } /// Apply a cross join @@ -680,12 +680,12 @@ impl LogicalPlanBuilder { validate_unique_names("Aggregations", all_expr.clone(), self.plan.schema())?; let aggr_schema = DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?; - Ok(Self::from(LogicalPlan::Aggregate { + Ok(Self::from(LogicalPlan::Aggregate(Aggregate { input: Arc::new(self.plan.clone()), group_expr, aggr_expr, schema: DFSchemaRef::new(aggr_schema), - })) + }))) } /// Create an expression to represent the explanation of the plan diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index a7aac90391208..88bc7a28ae2df 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -243,6 +243,47 @@ pub struct Values { pub values: Vec>, } +/// Aggregates its input based on a set of grouping and aggregate +/// expressions (e.g. SUM). +#[derive(Clone)] +pub struct Aggregate { + /// The incoming logical plan + pub input: Arc, + /// Grouping expressions + pub group_expr: Vec, + /// Aggregate expressions + pub aggr_expr: Vec, + /// The schema description of the aggregate output + pub schema: DFSchemaRef, +} + +/// Sorts its input according to a list of sort expressions. +#[derive(Clone)] +pub struct Sort { + /// The sort expressions + pub expr: Vec, + /// The incoming logical plan + pub input: Arc, +} + +/// Join two logical plans on one or more join columns +#[derive(Clone)] +pub struct Join { + /// Left input + pub left: Arc, + /// Right input + pub right: Arc, + /// Equijoin clause expressed as pairs of (left, right) join columns + pub on: Vec<(Column, Column)>, + /// Join type + pub join_type: JoinType, + /// Join constraint + pub join_constraint: JoinConstraint, + /// The output schema, containing fields from the left and right inputs + pub schema: DFSchemaRef, + /// If null_equals_null is true, null == null else null != null + pub null_equals_null: bool, +} /// A LogicalPlan represents the different types of relational /// operators (such as Projection, Filter, etc) and can be created by /// the SQL query planner and the DataFrame API. @@ -269,40 +310,11 @@ pub enum LogicalPlan { Window(Window), /// Aggregates its input based on a set of grouping and aggregate /// expressions (e.g. SUM). - Aggregate { - /// The incoming logical plan - input: Arc, - /// Grouping expressions - group_expr: Vec, - /// Aggregate expressions - aggr_expr: Vec, - /// The schema description of the aggregate output - schema: DFSchemaRef, - }, + Aggregate(Aggregate), /// Sorts its input according to a list of sort expressions. - Sort { - /// The sort expressions - expr: Vec, - /// The incoming logical plan - input: Arc, - }, + Sort(Sort), /// Join two logical plans on one or more join columns - Join { - /// Left input - left: Arc, - /// Right input - right: Arc, - /// Equijoin clause expressed as pairs of (left, right) join columns - on: Vec<(Column, Column)>, - /// Join type - join_type: JoinType, - /// Join constraint - join_constraint: JoinConstraint, - /// The output schema, containing fields from the left and right inputs - schema: DFSchemaRef, - /// If null_equals_null is true, null == null else null != null - null_equals_null: bool, - }, + Join(Join), /// Apply Cross Join to two logical plans CrossJoin(CrossJoin), /// Repartition the plan based on a partitioning scheme. @@ -347,9 +359,9 @@ impl LogicalPlan { LogicalPlan::Projection(Projection { schema, .. }) => schema, LogicalPlan::Filter(Filter { input, .. }) => input.schema(), LogicalPlan::Window(Window { schema, .. }) => schema, - LogicalPlan::Aggregate { schema, .. } => schema, - LogicalPlan::Sort { input, .. } => input.schema(), - LogicalPlan::Join { schema, .. } => schema, + LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema, + LogicalPlan::Sort(Sort { input, .. }) => input.schema(), + LogicalPlan::Join(Join { schema, .. }) => schema, LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema, LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(), LogicalPlan::Limit(Limit { input, .. }) => input.schema(), @@ -375,18 +387,18 @@ impl LogicalPlan { }) => vec![projected_schema], LogicalPlan::Values(Values { schema, .. }) => vec![schema], LogicalPlan::Window(Window { input, schema, .. }) - | LogicalPlan::Aggregate { input, schema, .. } - | LogicalPlan::Projection(Projection { input, schema, .. }) => { + | LogicalPlan::Projection(Projection { input, schema, .. }) + | LogicalPlan::Aggregate(Aggregate { input, schema, .. }) => { let mut schemas = input.all_schemas(); schemas.insert(0, schema); schemas } - LogicalPlan::Join { + LogicalPlan::Join(Join { left, right, schema, .. - } + }) | LogicalPlan::CrossJoin(CrossJoin { left, right, @@ -409,7 +421,7 @@ impl LogicalPlan { } LogicalPlan::Limit(Limit { input, .. }) | LogicalPlan::Repartition(Repartition { input, .. }) - | LogicalPlan::Sort { input, .. } + | LogicalPlan::Sort(Sort { input, .. }) | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(), LogicalPlan::DropTable(_) => vec![], @@ -442,16 +454,16 @@ impl LogicalPlan { _ => vec![], }, LogicalPlan::Window(Window { window_expr, .. }) => window_expr.clone(), - LogicalPlan::Aggregate { + LogicalPlan::Aggregate(Aggregate { group_expr, aggr_expr, .. - } => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(), - LogicalPlan::Join { on, .. } => on + }) => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(), + LogicalPlan::Join(Join { on, .. }) => on .iter() .flat_map(|(l, r)| vec![Expr::Column(l.clone()), Expr::Column(r.clone())]) .collect(), - LogicalPlan::Sort { expr, .. } => expr.clone(), + LogicalPlan::Sort(Sort { expr, .. }) => expr.clone(), LogicalPlan::Extension(extension) => extension.node.expressions(), // plans without expressions LogicalPlan::TableScan { .. } @@ -477,9 +489,9 @@ impl LogicalPlan { LogicalPlan::Filter(Filter { input, .. }) => vec![input], LogicalPlan::Repartition(Repartition { input, .. }) => vec![input], LogicalPlan::Window(Window { input, .. }) => vec![input], - LogicalPlan::Aggregate { input, .. } => vec![input], - LogicalPlan::Sort { input, .. } => vec![input], - LogicalPlan::Join { left, right, .. } => vec![left, right], + LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input], + LogicalPlan::Sort(Sort { input, .. }) => vec![input], + LogicalPlan::Join(Join { left, right, .. }) => vec![left, right], LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right], LogicalPlan::Limit(Limit { input, .. }) => vec![input], LogicalPlan::Extension(extension) => extension.node.inputs(), @@ -508,11 +520,11 @@ impl LogicalPlan { type Error = DataFusionError; fn pre_visit(&mut self, plan: &LogicalPlan) -> Result { - if let LogicalPlan::Join { + if let LogicalPlan::Join(Join { join_constraint: JoinConstraint::Using, on, .. - } = plan + }) = plan { self.using_columns.push( on.iter() @@ -614,9 +626,9 @@ impl LogicalPlan { input.accept(visitor)? } LogicalPlan::Window(Window { input, .. }) => input.accept(visitor)?, - LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?, - LogicalPlan::Sort { input, .. } => input.accept(visitor)?, - LogicalPlan::Join { left, right, .. } + LogicalPlan::Aggregate(Aggregate { input, .. }) => input.accept(visitor)?, + LogicalPlan::Sort(Sort { input, .. }) => input.accept(visitor)?, + LogicalPlan::Join(Join { left, right, .. }) | LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { left.accept(visitor)? && right.accept(visitor)? } @@ -900,16 +912,16 @@ impl LogicalPlan { }) => { write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr) } - LogicalPlan::Aggregate { + LogicalPlan::Aggregate(Aggregate { ref group_expr, ref aggr_expr, .. - } => write!( + }) => write!( f, "Aggregate: groupBy=[{:?}], aggr=[{:?}]", group_expr, aggr_expr ), - LogicalPlan::Sort { ref expr, .. } => { + LogicalPlan::Sort(Sort { expr, .. }) => { write!(f, "Sort: ")?; for (i, expr_item) in expr.iter().enumerate() { if i > 0 { @@ -919,11 +931,11 @@ impl LogicalPlan { } Ok(()) } - LogicalPlan::Join { + LogicalPlan::Join(Join { on: ref keys, join_constraint, .. - } => { + }) => { let join_expr: Vec = keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect(); match join_constraint { diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index ced2c0c23dfb3..233d112197feb 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -21,8 +21,10 @@ use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_plan::plan::{Filter, Projection, Window}; use crate::logical_plan::{ - col, DFField, DFSchema, Expr, ExprRewriter, ExpressionVisitor, LogicalPlan, - Recursion, RewriteRecursion, + col, + plan::{Aggregate, Sort}, + DFField, DFSchema, Expr, ExprRewriter, ExpressionVisitor, LogicalPlan, Recursion, + RewriteRecursion, }; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; @@ -150,12 +152,12 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { + }) => { let group_arrays = to_arrays(group_expr, input, &mut expr_set)?; let aggr_arrays = to_arrays(aggr_expr, input, &mut expr_set)?; @@ -171,14 +173,14 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result { + LogicalPlan::Sort(Sort { expr, input }) => { let arrays = to_arrays(expr, input, &mut expr_set)?; let (mut new_expr, new_input) = rewrite_expr( @@ -190,10 +192,10 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result Result { utils::from_plan(plan, expr, &[new_input]) } - LogicalPlan::Aggregate { - input, aggr_expr, .. - } => { + LogicalPlan::Aggregate(Aggregate { + aggr_expr, input, .. + }) => { // An aggregate's aggreagate columns are _not_ filter-commutable => collect these: // * columns whose aggregation expression depends on // * the aggregation columns themselves @@ -394,9 +394,9 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { optimize_join(state, plan, left, right) } - LogicalPlan::Join { + LogicalPlan::Join(Join { left, right, on, .. - } => { + }) => { // duplicate filters for joined columns so filters can be pushed down to both sides. // Take the following query as an example: // diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 5b1f3ea1e5dc3..1b331ba65b296 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -20,7 +20,9 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionProps; -use crate::logical_plan::plan::{AnalyzePlan, Projection, TableScanPlan, Window}; +use crate::logical_plan::plan::{ + Aggregate, AnalyzePlan, Join, Projection, TableScanPlan, Window, +}; use crate::logical_plan::{ build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan, LogicalPlanBuilder, ToDFSchema, Union, @@ -190,7 +192,7 @@ fn optimize_plan( })) } } - LogicalPlan::Join { + LogicalPlan::Join(Join { left, right, on, @@ -198,7 +200,7 @@ fn optimize_plan( join_constraint, null_equals_null, .. - } => { + }) => { for (l, r) in on { new_required_columns.insert(l.clone()); new_required_columns.insert(r.clone()); @@ -226,7 +228,7 @@ fn optimize_plan( join_type, )?; - Ok(LogicalPlan::Join { + Ok(LogicalPlan::Join(Join { left: optimized_left, right: optimized_right, join_type: *join_type, @@ -234,7 +236,7 @@ fn optimize_plan( on: on.clone(), schema: DFSchemaRef::new(schema), null_equals_null: *null_equals_null, - }) + })) } LogicalPlan::Window(Window { schema, @@ -275,13 +277,12 @@ fn optimize_plan( .window(new_window_expr)? .build() } - LogicalPlan::Aggregate { - schema, - input, + LogicalPlan::Aggregate(Aggregate { group_expr, aggr_expr, - .. - } => { + schema, + input, + }) => { // aggregate: // * remove any aggregate expression that is not required // * construct the new set of required columns @@ -314,7 +315,7 @@ fn optimize_plan( .collect(), )?; - Ok(LogicalPlan::Aggregate { + Ok(LogicalPlan::Aggregate(Aggregate { group_expr: group_expr.clone(), aggr_expr: new_aggr_expr, input: Arc::new(optimize_plan( @@ -325,7 +326,7 @@ fn optimize_plan( execution_props, )?), schema: DFSchemaRef::new(new_schema), - }) + })) } // scans: // * remove un-used columns from the scan projection diff --git a/datafusion/src/optimizer/single_distinct_to_groupby.rs b/datafusion/src/optimizer/single_distinct_to_groupby.rs index 358444d9c9a63..3232fa03ce80f 100644 --- a/datafusion/src/optimizer/single_distinct_to_groupby.rs +++ b/datafusion/src/optimizer/single_distinct_to_groupby.rs @@ -19,7 +19,7 @@ use crate::error::Result; use crate::execution::context::ExecutionProps; -use crate::logical_plan::plan::Projection; +use crate::logical_plan::plan::{Aggregate, Projection}; use crate::logical_plan::{columnize_expr, DFSchema, Expr, LogicalPlan}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; @@ -51,12 +51,12 @@ impl SingleDistinctToGroupBy { fn optimize(plan: &LogicalPlan) -> Result { match plan { - LogicalPlan::Aggregate { + LogicalPlan::Aggregate(Aggregate { input, aggr_expr, schema, group_expr, - } => { + }) => { if is_single_distinct_agg(plan) { let mut group_fields_set = HashSet::new(); let mut all_group_args = group_expr.clone(); @@ -87,12 +87,12 @@ fn optimize(plan: &LogicalPlan) -> Result { .collect::>(); let grouped_schema = DFSchema::new(all_field).unwrap(); - let grouped_agg = LogicalPlan::Aggregate { + let grouped_agg = LogicalPlan::Aggregate(Aggregate { input: input.clone(), group_expr: all_group_args, aggr_expr: Vec::new(), schema: Arc::new(grouped_schema.clone()), - }; + }); let grouped_agg = optimize_children(&grouped_agg); let final_agg_schema = Arc::new( DFSchema::new( @@ -105,12 +105,12 @@ fn optimize(plan: &LogicalPlan) -> Result { .unwrap(), ); - let final_agg = LogicalPlan::Aggregate { + let final_agg = LogicalPlan::Aggregate(Aggregate { input: Arc::new(grouped_agg.unwrap()), group_expr: group_expr.clone(), aggr_expr: new_aggr_expr, schema: final_agg_schema.clone(), - }; + }); //so the aggregates are displayed in the same way even after the rewrite let mut alias_expr: Vec = Vec::new(); @@ -151,9 +151,9 @@ fn optimize_children(plan: &LogicalPlan) -> Result { fn is_single_distinct_agg(plan: &LogicalPlan) -> bool { match plan { - LogicalPlan::Aggregate { + LogicalPlan::Aggregate(Aggregate { input, aggr_expr, .. - } => { + }) => { let mut fields_set = HashSet::new(); aggr_expr .iter() diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index aa559cd84cef5..60a81f084a424 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -23,7 +23,9 @@ use arrow::record_batch::RecordBatch; use super::optimizer::OptimizerRule; use crate::execution::context::{ExecutionContextState, ExecutionProps}; -use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan, Filter, Projection, Window}; +use crate::logical_plan::plan::{ + Aggregate, AnalyzePlan, ExtensionPlan, Filter, Join, Projection, Sort, Window, +}; use crate::logical_plan::{ build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr, ExprRewriter, Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, @@ -188,28 +190,28 @@ pub fn from_plan( window_expr: expr[0..window_expr.len()].to_vec(), schema: schema.clone(), })), - LogicalPlan::Aggregate { + LogicalPlan::Aggregate(Aggregate { group_expr, schema, .. - } => Ok(LogicalPlan::Aggregate { + }) => Ok(LogicalPlan::Aggregate(Aggregate { group_expr: expr[0..group_expr.len()].to_vec(), aggr_expr: expr[group_expr.len()..].to_vec(), input: Arc::new(inputs[0].clone()), schema: schema.clone(), - }), - LogicalPlan::Sort { .. } => Ok(LogicalPlan::Sort { + })), + LogicalPlan::Sort(Sort { .. }) => Ok(LogicalPlan::Sort(Sort { expr: expr.to_vec(), input: Arc::new(inputs[0].clone()), - }), - LogicalPlan::Join { + })), + LogicalPlan::Join(Join { join_type, join_constraint, on, null_equals_null, .. - } => { + }) => { let schema = build_join_schema(inputs[0].schema(), inputs[1].schema(), join_type)?; - Ok(LogicalPlan::Join { + Ok(LogicalPlan::Join(Join { left: Arc::new(inputs[0].clone()), right: Arc::new(inputs[1].clone()), join_type: *join_type, @@ -217,7 +219,7 @@ pub fn from_plan( on: on.clone(), schema: DFSchemaRef::new(schema), null_equals_null: *null_equals_null, - }) + })) } LogicalPlan::CrossJoin(_) => { let left = inputs[0].clone(); diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 7fcacd2c42707..5b93ff8f013f6 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -23,13 +23,15 @@ use super::{ hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, }; use crate::execution::context::ExecutionContextState; -use crate::logical_plan::plan::{EmptyRelation, Filter, Projection, Window}; +use crate::logical_plan::plan::{ + Aggregate, EmptyRelation, Filter, Join, Projection, Sort, TableScanPlan, Window, +}; use crate::logical_plan::{ unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType, Repartition, ToStringifiedPlan, Union, UserDefinedLogicalNode, }; -use crate::logical_plan::{Limit, TableScanPlan, Values}; +use crate::logical_plan::{Limit, Values}; use crate::physical_optimizer::optimizer::PhysicalOptimizerRule; use crate::physical_plan::cross_join::CrossJoinExec; use crate::physical_plan::explain::ExplainExec; @@ -478,12 +480,12 @@ impl DefaultPhysicalPlanner { physical_input_schema, )?) ) } - LogicalPlan::Aggregate { + LogicalPlan::Aggregate(Aggregate { input, group_expr, aggr_expr, .. - } => { + }) => { // Initially need to perform the aggregate and then merge the partitions let input_exec = self.create_initial_plan(input, ctx_state).await?; let physical_input_schema = input_exec.schema(); @@ -675,7 +677,7 @@ impl DefaultPhysicalPlanner { physical_partitioning, )?) ) } - LogicalPlan::Sort { expr, input, .. } => { + LogicalPlan::Sort(Sort { expr, input, .. }) => { let physical_input = self.create_initial_plan(input, ctx_state).await?; let input_schema = physical_input.as_ref().schema(); let input_dfschema = input.as_ref().schema(); @@ -703,14 +705,14 @@ impl DefaultPhysicalPlanner { .collect::>>()?; Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?) ) } - LogicalPlan::Join { + LogicalPlan::Join(Join { left, right, on: keys, join_type, null_equals_null, .. - } => { + }) => { let left_df_schema = left.schema(); let physical_left = self.create_initial_plan(left, ctx_state).await?; let right_df_schema = right.schema(); diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 24b2a49a9ed2d..4423038ab2610 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -36,7 +36,7 @@ use datafusion::assert_batches_eq; use datafusion::assert_batches_sorted_eq; use datafusion::assert_contains; use datafusion::assert_not_contains; -use datafusion::logical_plan::plan::Projection; +use datafusion::logical_plan::plan::{Aggregate, Projection}; use datafusion::logical_plan::LogicalPlan; use datafusion::logical_plan::TableScanPlan; use datafusion::physical_plan::functions::Volatility; @@ -92,7 +92,7 @@ async fn nyc() -> Result<()> { match &optimized_plan { LogicalPlan::Projection(Projection { input, .. }) => match input.as_ref() { - LogicalPlan::Aggregate { input, .. } => match input.as_ref() { + LogicalPlan::Aggregate(Aggregate { input, .. }) => match input.as_ref() { LogicalPlan::TableScan(TableScanPlan { ref projected_schema, .. diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index a63ef2a3f412f..d9319854719b7 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -86,7 +86,7 @@ use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; use async_trait::async_trait; use datafusion::execution::context::ExecutionProps; -use datafusion::logical_plan::plan::ExtensionPlan; +use datafusion::logical_plan::plan::{ExtensionPlan, Sort}; use datafusion::logical_plan::{DFSchemaRef, Limit}; /// Execute the specified sql and return the resulting record batches @@ -289,10 +289,10 @@ impl OptimizerRule for TopKOptimizerRule { // Sort and replaces it by a TopK node. It does not handle many // edge cases (e.g multiple sort columns, sort ASC / DESC), etc. if let LogicalPlan::Limit(Limit { ref n, ref input }) = plan { - if let LogicalPlan::Sort { + if let LogicalPlan::Sort(Sort { ref expr, ref input, - } = **input + }) = **input { if expr.len() == 1 { // we found a sort with a single sort expr, replace with a a TopK