Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ cargo run --example csv_sql
- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
- [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
- ['parquet_index.rs'](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates
- - ['parquet_index.rs'](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
- ['parquet_exec_visitor.rs'](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
Expand Down
210 changes: 210 additions & 0 deletions datafusion-examples/examples/optimizer_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::DataType;
use datafusion::prelude::SessionContext;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{
BinaryExpr, ColumnarValue, Expr, LogicalPlan, Operator, ScalarUDF, ScalarUDFImpl,
Signature, Volatility,
};
use datafusion_optimizer::optimizer::ApplyOrder;
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
use std::any::Any;
use std::sync::Arc;

/// This example demonstrates how to add your own [`OptimizerRule`]
/// to DataFusion.
///
/// [`OptimizerRule`]s transform [`LogicalPlan`]s into an equivalent (but
/// hopefully faster) form.
///
/// See [analyzer_rule.rs] for an example of AnalyzerRules, which are for
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is analyzer_rule.rs a file in the repo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be once I can get someone to approve #11089

/// changing plan semantics.
#[tokio::main]
pub async fn main() -> Result<()> {
// DataFusion includes many built in OptimizerRules for tasks such as outer
// to inner join conversion and constant folding.
//
// Note you can change the order of optimizer rules using the lower level
// `SessionState` API
let ctx = SessionContext::new();
ctx.add_optimizer_rule(Arc::new(MyOptimizerRule {}));

// Now, let's plan and run queries with the new rule
ctx.register_batch("person", person_batch())?;
let sql = "SELECT * FROM person WHERE age = 22";
let plan = ctx.sql(sql).await?.into_optimized_plan()?;

// We can see the effect of our rewrite on the output plan that the filter
// has been rewritten to `my_eq`
//
// Filter: my_eq(person.age, Int32(22))
// TableScan: person projection=[name, age]
println!("Logical Plan:\n\n{}\n", plan.display_indent());

// And the output verifies the predicates have been changed (as the my_eq
// function always returns true)
//
// +--------+-----+
// | name | age |
// +--------+-----+
// | Andy | 11 |
// | Andrew | 22 |
// | Oleks | 33 |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// +--------+-----+
ctx.sql(sql).await?.show().await?;

// however we can see the rule doesn't trigger for queries with predicates
// other than `=`
//
// +-------+-----+
// | name | age |
// +-------+-----+
// | Andy | 11 |
// | Oleks | 33 |
// +-------+-----+
ctx.sql("SELECT * FROM person WHERE age <> 22")
.await?
.show()
.await?;

Ok(())
}

/// An example OptimizerRule that replaces all `col = <const>` predicates with a
/// user defined function
struct MyOptimizerRule {}

impl OptimizerRule for MyOptimizerRule {
fn name(&self) -> &str {
"my_optimizer_rule"
}

// New OptimizerRules should use the "rewrite" api as it is more efficient
fn supports_rewrite(&self) -> bool {
true
}

/// Ask the optimizer to handle the plan recursion. `rewrite` will be called
/// on each plan node.
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
plan.map_expressions(|expr| {
// This closure is called for all expressions in the current plan
//
// For example, given a plan like `SELECT a + b, 5 + 10`
//
// The closure would be called twice:
// 1. once for `a + b`
// 2. once for `5 + 10`
self.rewrite_expr(expr)
})
}
}

impl MyOptimizerRule {
/// Rewrites an Expr replacing all `<col> = <const>` expressions with
/// a call to my_eq udf
fn rewrite_expr(&self, expr: Expr) -> Result<Transformed<Expr>> {
// do a bottom up rewrite of the expression tree
expr.transform_up(|expr| {
// Closure called for each sub tree
match expr {
Expr::BinaryExpr(binary_expr) if is_binary_eq(&binary_expr) => {
// destruture the expression
let BinaryExpr { left, op: _, right } = binary_expr;
// rewrite to `my_eq(left, right)`
let udf = ScalarUDF::new_from_impl(MyEq::new());
let call = udf.call(vec![*left, *right]);
Ok(Transformed::yes(call))
}
_ => Ok(Transformed::no(expr)),
}
})
// Note that the TreeNode API handles propagating the transformed flag
// and errors up the call chain
}
}

/// return true of the expression is an equality expression for a literal or
/// column reference
fn is_binary_eq(binary_expr: &BinaryExpr) -> bool {
binary_expr.op == Operator::Eq
&& is_lit_or_col(binary_expr.left.as_ref())
&& is_lit_or_col(binary_expr.right.as_ref())
}

/// Return true if the expression is a literal or column reference
fn is_lit_or_col(expr: &Expr) -> bool {
matches!(expr, Expr::Column(_) | Expr::Literal(_))
}

/// A simple user defined filter function
#[derive(Debug, Clone)]
struct MyEq {
signature: Signature,
}

impl MyEq {
fn new() -> Self {
Self {
signature: Signature::any(2, Volatility::Stable),
}
}
}

impl ScalarUDFImpl for MyEq {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"my_eq"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Boolean)
}

fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
// this example simply returns "true" which is not what a real
// implementation would do.
Ok(ColumnarValue::Scalar(ScalarValue::from(true)))
}
}

/// Return a RecordBatch with made up data
fn person_batch() -> RecordBatch {
let name: ArrayRef =
Arc::new(StringArray::from_iter_values(["Andy", "Andrew", "Oleks"]));
let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22, 33]));
RecordBatch::try_from_iter(vec![("name", name), ("age", age)]).unwrap()
}
22 changes: 15 additions & 7 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ use url::Url;
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
use datafusion_optimizer::AnalyzerRule;
use datafusion_optimizer::{AnalyzerRule, OptimizerRule};

mod avro;
mod csv;
Expand Down Expand Up @@ -332,13 +332,21 @@ impl SessionContext {
self
}

/// Adds an analyzer rule to the `SessionState` in the current `SessionContext`.
pub fn add_analyzer_rule(
self,
analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
) -> Self {
/// Adds an optimizer rule to the end of the existing rules.
///
/// See [`SessionState`] for more control of when the rule is applied.
pub fn add_optimizer_rule(
&self,
optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
) {
self.state.write().append_optimizer_rule(optimizer_rule);
}

/// Adds an analyzer rule to the end of the existing rules.
///
/// See [`SessionState`] for more control of when the rule is applied.
pub fn add_analyzer_rule(&self, analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API was added in #10849 by @pingsutw and not yet released, thus it is not an API change

self.state.write().add_analyzer_rule(analyzer_rule);
self
}

/// Registers an [`ObjectStore`] to be used with a specific URL prefix.
Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,16 @@ impl SessionState {
self
}

// the add_optimizer_rule takes an owned reference
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed the naming in SessionState is pretty inconsistent

// it should probably be renamed to `with_optimizer_rule` to follow builder style
// and `add_optimizer_rule` that takes &mut self added instead of this
pub(crate) fn append_optimizer_rule(
&mut self,
optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
) {
self.optimizer.rules.push(optimizer_rule);
}

/// Add `physical_optimizer_rule` to the end of the list of
/// [`PhysicalOptimizerRule`]s used to rewrite queries.
pub fn add_physical_optimizer_rule(
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ async fn normal_query() -> Result<()> {
#[tokio::test]
// Run the query using default planners, optimizer and custom analyzer rule
async fn normal_query_with_analyzer() -> Result<()> {
let ctx = SessionContext::new().add_analyzer_rule(Arc::new(MyAnalyzerRule {}));
let ctx = SessionContext::new();
ctx.add_analyzer_rule(Arc::new(MyAnalyzerRule {}));
run_and_compare_query_with_analyzer_rule(ctx, "MyAnalyzerRule").await
}

Expand Down