Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion::datasource::TableProvider;

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingTable;
use datafusion::logical_plan::plan::EmptyRelation;
use datafusion::logical_plan::plan::{EmptyRelation, Filter, Projection, Window};
use datafusion::logical_plan::{
exprlist_to_fields,
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Expand Down Expand Up @@ -779,9 +779,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
)))
}
}
LogicalPlan::Projection {
LogicalPlan::Projection(Projection {
expr, input, alias, ..
} => Ok(protobuf::LogicalPlanNode {
}) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
protobuf::ProjectionNode {
input: Some(Box::new(input.as_ref().try_into()?)),
Expand All @@ -796,7 +796,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
},
))),
}),
LogicalPlan::Filter { predicate, input } => {
LogicalPlan::Filter(Filter { predicate, input }) => {
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
Expand All @@ -807,9 +807,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
LogicalPlan::Window {
LogicalPlan::Window(Window {
input, window_expr, ..
} => {
}) => {
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Window(Box::new(
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,7 @@ impl FunctionRegistry for ExecutionContextState {
#[cfg(test)]
mod tests {
use super::*;
use crate::logical_plan::plan::Projection;
use crate::logical_plan::TableScanPlan;
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::functions::{make_scalar_function, Volatility};
Expand Down Expand Up @@ -1415,7 +1416,7 @@ mod tests {

let optimized_plan = ctx.optimize(&logical_plan)?;
match &optimized_plan {
LogicalPlan::Projection { input, .. } => match &**input {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScanPlan {
source,
projected_schema,
Expand Down Expand Up @@ -1488,7 +1489,7 @@ mod tests {
let ctx = ExecutionContext::new();
let optimized_plan = ctx.optimize(&plan)?;
match &optimized_plan {
LogicalPlan::Projection { input, .. } => match &**input {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScanPlan {
source,
projected_schema,
Expand Down
15 changes: 8 additions & 7 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use crate::datasource::{
};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::plan::{
AnalyzePlan, EmptyRelation, ExplainPlan, TableScanPlan, ToStringifiedPlan, Union,
AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Projection, TableScanPlan,
ToStringifiedPlan, Union, Window,
};
use crate::prelude::*;
use crate::scalar::ScalarValue;
Expand Down Expand Up @@ -451,10 +452,10 @@ impl LogicalPlanBuilder {
/// Apply a filter
pub fn filter(&self, expr: impl Into<Expr>) -> Result<Self> {
let expr = normalize_col(expr.into(), &self.plan)?;
Ok(Self::from(LogicalPlan::Filter {
Ok(Self::from(LogicalPlan::Filter(Filter {
predicate: expr,
input: Arc::new(self.plan.clone()),
}))
})))
}

/// Apply a limit
Expand Down Expand Up @@ -658,11 +659,11 @@ impl LogicalPlanBuilder {
let mut window_fields: Vec<DFField> =
exprlist_to_fields(all_expr, self.plan.schema())?;
window_fields.extend_from_slice(self.plan.schema().fields());
Ok(Self::from(LogicalPlan::Window {
Ok(Self::from(LogicalPlan::Window(Window {
input: Arc::new(self.plan.clone()),
window_expr,
schema: Arc::new(DFSchema::new(window_fields)?),
}))
})))
}

/// Apply an aggregate: grouping on the `group_expr` expressions
Expand Down Expand Up @@ -898,12 +899,12 @@ pub fn project_with_alias(
Some(ref alias) => input_schema.replace_qualifier(alias.as_str()),
None => input_schema,
};
Ok(LogicalPlan::Projection {
Ok(LogicalPlan::Projection(Projection {
expr: projected_expr,
input: Arc::new(plan.clone()),
schema: DFSchemaRef::new(schema),
alias,
})
}))
}

/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
Expand Down
110 changes: 65 additions & 45 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,47 @@ pub enum JoinConstraint {
Using,
}

/// Evaluates an arbitrary list of expressions (essentially a
/// SELECT with an expression list) on its input.
#[derive(Clone)]
pub struct Projection {
/// The list of expressions
pub expr: Vec<Expr>,
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The schema description of the output
pub schema: DFSchemaRef,
/// Projection output relation alias
pub alias: Option<String>,
}

/// Filters rows from its input that do not match an
/// expression (essentially a WHERE clause with a predicate
/// expression).
///
/// Semantically, `<predicate>` is evaluated for each row of the input;
/// If the value of `<predicate>` is true, the input row is passed to
/// the output. If the value of `<predicate>` is false, the row is
/// discarded.
#[derive(Clone)]
pub struct Filter {
/// The predicate expression, which must have Boolean type.
pub predicate: Expr,
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
}

/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
#[derive(Clone)]
pub struct Window {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// The window function expression
pub window_expr: Vec<Expr>,
/// The schema description of the window output
pub schema: DFSchemaRef,
}

/// Produces rows from a table provider by reference or from the context
#[derive(Clone)]
pub struct TableScanPlan {
Expand Down Expand Up @@ -214,16 +255,7 @@ pub struct Values {
pub enum LogicalPlan {
/// Evaluates an arbitrary list of expressions (essentially a
/// SELECT with an expression list) on its input.
Projection {
/// The list of expressions
expr: Vec<Expr>,
/// The incoming logical plan
input: Arc<LogicalPlan>,
/// The schema description of the output
schema: DFSchemaRef,
/// Projection output relation alias
alias: Option<String>,
},
Projection(Projection),
/// Filters rows from its input that do not match an
/// expression (essentially a WHERE clause with a predicate
/// expression).
Expand All @@ -232,21 +264,9 @@ pub enum LogicalPlan {
/// If the value of `<predicate>` is true, the input row is passed to
/// the output. If the value of `<predicate>` is false, the row is
/// discarded.
Filter {
/// The predicate expression, which must have Boolean type.
predicate: Expr,
/// The incoming logical plan
input: Arc<LogicalPlan>,
},
Filter(Filter),
/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
Window {
/// The incoming logical plan
input: Arc<LogicalPlan>,
/// The window function expression
window_expr: Vec<Expr>,
/// The schema description of the window output
schema: DFSchemaRef,
},
Window(Window),
/// Aggregates its input based on a set of grouping and aggregate
/// expressions (e.g. SUM).
Aggregate {
Expand Down Expand Up @@ -324,9 +344,9 @@ impl LogicalPlan {
LogicalPlan::TableScan(TableScanPlan {
projected_schema, ..
}) => projected_schema,
LogicalPlan::Projection { schema, .. } => schema,
LogicalPlan::Filter { input, .. } => input.schema(),
LogicalPlan::Window { schema, .. } => schema,
LogicalPlan::Projection(Projection { schema, .. }) => schema,
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
LogicalPlan::Window(Window { schema, .. }) => schema,
LogicalPlan::Aggregate { schema, .. } => schema,
LogicalPlan::Sort { input, .. } => input.schema(),
LogicalPlan::Join { schema, .. } => schema,
Expand Down Expand Up @@ -354,9 +374,9 @@ impl LogicalPlan {
projected_schema, ..
}) => vec![projected_schema],
LogicalPlan::Values(Values { schema, .. }) => vec![schema],
LogicalPlan::Window { input, schema, .. }
LogicalPlan::Window(Window { input, schema, .. })
| LogicalPlan::Aggregate { input, schema, .. }
| LogicalPlan::Projection { input, schema, .. } => {
| LogicalPlan::Projection(Projection { input, schema, .. }) => {
let mut schemas = input.all_schemas();
schemas.insert(0, schema);
schemas
Expand Down Expand Up @@ -391,7 +411,7 @@ impl LogicalPlan {
| LogicalPlan::Repartition(Repartition { input, .. })
| LogicalPlan::Sort { input, .. }
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::Filter { input, .. } => input.all_schemas(),
| LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
LogicalPlan::DropTable(_) => vec![],
}
}
Expand All @@ -409,19 +429,19 @@ impl LogicalPlan {
/// children
pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
match self {
LogicalPlan::Projection { expr, .. } => expr.clone(),
LogicalPlan::Projection(Projection { expr, .. }) => expr.clone(),
LogicalPlan::Values(Values { values, .. }) => {
values.iter().flatten().cloned().collect()
}
LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()],
LogicalPlan::Filter(Filter { predicate, .. }) => vec![predicate.clone()],
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
}) => match partitioning_scheme {
Partitioning::Hash(expr, _) => expr.clone(),
_ => vec![],
},
LogicalPlan::Window { window_expr, .. } => window_expr.clone(),
LogicalPlan::Window(Window { window_expr, .. }) => window_expr.clone(),
LogicalPlan::Aggregate {
group_expr,
aggr_expr,
Expand Down Expand Up @@ -453,10 +473,10 @@ impl LogicalPlan {
/// include inputs to inputs.
pub fn inputs(self: &LogicalPlan) -> Vec<&LogicalPlan> {
match self {
LogicalPlan::Projection { input, .. } => vec![input],
LogicalPlan::Filter { input, .. } => vec![input],
LogicalPlan::Projection(Projection { input, .. }) => vec![input],
LogicalPlan::Filter(Filter { input, .. }) => vec![input],
LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
LogicalPlan::Window { input, .. } => vec![input],
LogicalPlan::Window(Window { input, .. }) => vec![input],
LogicalPlan::Aggregate { input, .. } => vec![input],
LogicalPlan::Sort { input, .. } => vec![input],
LogicalPlan::Join { left, right, .. } => vec![left, right],
Expand Down Expand Up @@ -588,12 +608,12 @@ impl LogicalPlan {
}

let recurse = match self {
LogicalPlan::Projection { input, .. } => input.accept(visitor)?,
LogicalPlan::Filter { input, .. } => input.accept(visitor)?,
LogicalPlan::Projection(Projection { input, .. }) => input.accept(visitor)?,
LogicalPlan::Filter(Filter { input, .. }) => input.accept(visitor)?,
LogicalPlan::Repartition(Repartition { input, .. }) => {
input.accept(visitor)?
}
LogicalPlan::Window { input, .. } => input.accept(visitor)?,
LogicalPlan::Window(Window { input, .. }) => input.accept(visitor)?,
LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
LogicalPlan::Join { left, right, .. }
Expand Down Expand Up @@ -856,9 +876,9 @@ impl LogicalPlan {

Ok(())
}
LogicalPlan::Projection {
LogicalPlan::Projection(Projection {
ref expr, alias, ..
} => {
}) => {
write!(f, "Projection: ")?;
for (i, expr_item) in expr.iter().enumerate() {
if i > 0 {
Expand All @@ -871,13 +891,13 @@ impl LogicalPlan {
}
Ok(())
}
LogicalPlan::Filter {
LogicalPlan::Filter(Filter {
predicate: ref expr,
..
} => write!(f, "Filter: {:?}", expr),
LogicalPlan::Window {
}) => write!(f, "Filter: {:?}", expr),
LogicalPlan::Window(Window {
ref window_expr, ..
} => {
}) => {
write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
}
LogicalPlan::Aggregate {
Expand Down
Loading