Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 9 additions & 65 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,7 @@ fn optimize(
alias,
}) => {
let input_schema = Arc::clone(input.schema());
let all_schemas: Vec<DFSchemaRef> =
plan.all_schemas().into_iter().cloned().collect();
let arrays = to_arrays(expr, input_schema, all_schemas, &mut expr_set)?;
let arrays = to_arrays(expr, input_schema, &mut expr_set)?;

let (mut new_expr, new_input) = rewrite_expr(
&[expr],
Expand All @@ -116,17 +114,8 @@ fn optimize(
}
LogicalPlan::Filter(Filter { predicate, input }) => {
let input_schema = Arc::clone(input.schema());
let all_schemas: Vec<DFSchemaRef> =
plan.all_schemas().into_iter().cloned().collect();

let mut id_array = vec![];
expr_to_identifier(
predicate,
&mut expr_set,
&mut id_array,
input_schema,
all_schemas,
)?;
expr_to_identifier(predicate, &mut expr_set, &mut id_array, input_schema)?;

let (mut new_expr, new_input) = rewrite_expr(
&[&[predicate.clone()]],
Expand All @@ -153,10 +142,7 @@ fn optimize(
schema,
}) => {
let input_schema = Arc::clone(input.schema());
let all_schemas: Vec<DFSchemaRef> =
plan.all_schemas().into_iter().cloned().collect();
let arrays =
to_arrays(window_expr, input_schema, all_schemas, &mut expr_set)?;
let arrays = to_arrays(window_expr, input_schema, &mut expr_set)?;

let (mut new_expr, new_input) = rewrite_expr(
&[window_expr],
Expand All @@ -179,16 +165,9 @@ fn optimize(
schema,
}) => {
let input_schema = Arc::clone(input.schema());
let all_schemas: Vec<DFSchemaRef> =
plan.all_schemas().into_iter().cloned().collect();
let group_arrays = to_arrays(
group_expr,
Arc::clone(&input_schema),
all_schemas.clone(),
&mut expr_set,
)?;
let aggr_arrays =
to_arrays(aggr_expr, input_schema, all_schemas, &mut expr_set)?;
let group_arrays =
to_arrays(group_expr, Arc::clone(&input_schema), &mut expr_set)?;
let aggr_arrays = to_arrays(aggr_expr, input_schema, &mut expr_set)?;

let (mut new_expr, new_input) = rewrite_expr(
&[group_expr, aggr_expr],
Expand All @@ -210,9 +189,7 @@ fn optimize(
}
LogicalPlan::Sort(Sort { expr, input, fetch }) => {
let input_schema = Arc::clone(input.schema());
let all_schemas: Vec<DFSchemaRef> =
plan.all_schemas().into_iter().cloned().collect();
let arrays = to_arrays(expr, input_schema, all_schemas, &mut expr_set)?;
let arrays = to_arrays(expr, input_schema, &mut expr_set)?;

let (mut new_expr, new_input) = rewrite_expr(
&[expr],
Expand Down Expand Up @@ -271,19 +248,12 @@ fn pop_expr(new_expr: &mut Vec<Vec<Expr>>) -> Result<Vec<Expr>> {
fn to_arrays(
expr: &[Expr],
input_schema: DFSchemaRef,
all_schemas: Vec<DFSchemaRef>,
expr_set: &mut ExprSet,
) -> Result<Vec<Vec<(usize, String)>>> {
expr.iter()
.map(|e| {
let mut id_array = vec![];
expr_to_identifier(
e,
expr_set,
&mut id_array,
Arc::clone(&input_schema),
all_schemas.clone(),
)?;
expr_to_identifier(e, expr_set, &mut id_array, Arc::clone(&input_schema))?;

Ok(id_array)
})
Expand Down Expand Up @@ -394,13 +364,6 @@ struct ExprIdentifierVisitor<'a> {
/// input schema for the node that we're optimizing, so we can determine the correct datatype
/// for each subexpression
input_schema: DFSchemaRef,
/// all schemas in the logical plan, as a fall back if we cannot resolve an expression type
/// from the input schema alone
// This fallback should never be necessary as the expression datatype should always be
// resolvable from the input schema of the node that's being optimized.
// todo: This can likely be removed if we are sure it's safe to do so.
all_schemas: Vec<DFSchemaRef>,

// inner states
visit_stack: Vec<VisitRecord>,
/// increased in pre_visit, start from 0.
Expand Down Expand Up @@ -478,23 +441,7 @@ impl ExpressionVisitor for ExprIdentifierVisitor<'_> {
self.id_array[idx] = (self.series_number, desc.clone());
self.visit_stack.push(VisitRecord::ExprItem(desc.clone()));

let data_type = if let Ok(data_type) = expr.get_type(&self.input_schema) {
data_type
} else {
// Expression type could not be resolved in schema, fall back to all schemas.
//
// This fallback should never be necessary as the expression datatype should always be
// resolvable from the input schema of the node that's being optimized.
// todo: This else-branch can likely be removed if we are sure it's safe to do so.
let merged_schema =
self.all_schemas
.iter()
.fold(DFSchema::empty(), |mut lhs, rhs| {
lhs.merge(rhs);
lhs
});
expr.get_type(&merged_schema)?
};
let data_type = expr.get_type(&self.input_schema)?;

self.expr_set
.entry(desc)
Expand All @@ -510,13 +457,11 @@ fn expr_to_identifier(
expr_set: &mut ExprSet,
id_array: &mut Vec<(usize, Identifier)>,
input_schema: DFSchemaRef,
all_schemas: Vec<DFSchemaRef>,
) -> Result<()> {
expr.accept(ExprIdentifierVisitor {
expr_set,
id_array,
input_schema,
all_schemas,
visit_stack: vec![],
node_count: 0,
series_number: 0,
Expand Down Expand Up @@ -669,7 +614,6 @@ mod test {
&mut HashMap::new(),
&mut id_array,
Arc::clone(&schema),
vec![schema],
)?;

let expected = vec![
Expand Down