Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ cargo run --example dataframe
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and analyze `Expr`s
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
Expand Down
108 changes: 107 additions & 1 deletion datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::{BooleanArray, Int32Array};
use arrow::array::{BooleanArray, Int32Array, Int8Array};
use arrow::record_batch::RecordBatch;

use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
Expand All @@ -28,12 +28,14 @@ use datafusion::functions_aggregate::first_last::first_value_udaf;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
use datafusion::prelude::*;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator};
use datafusion_optimizer::analyzer::type_coercion::TypeCoercionRewriter;

/// This example demonstrates the DataFusion [`Expr`] API.
///
Expand All @@ -51,6 +53,7 @@ use datafusion_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator};
/// 4. Simplify expressions: [`simplify_demo`]
/// 5. Analyze predicates for boundary ranges: [`range_analysis_demo`]
/// 6. Get the types of the expressions: [`expression_type_demo`]
/// 7. Apply type cocercion to expressions: [`type_coercion_demo`]
#[tokio::main]
async fn main() -> Result<()> {
// The easiest way to do create expressions is to use the
Expand Down Expand Up @@ -80,6 +83,9 @@ async fn main() -> Result<()> {
// See how to determine the data types of expressions
expression_type_demo()?;

// See how to type coerce expressions.
type_coercion_demo()?;

Ok(())
}

Expand Down Expand Up @@ -316,3 +322,103 @@ fn expression_type_demo() -> Result<()> {

Ok(())
}

/// This function demonstrates how to apply type coercion to expressions, such as binary expressions.
///
/// In most cases, manual type coercion is not required since DataFusion handles it implicitly.
/// However, certain projects may construct `ExecutionPlan`s directly from DataFusion logical expressions,
/// bypassing the construction of DataFusion logical plans.
/// Since constructing `ExecutionPlan`s from logical expressions does not automatically apply type coercion,
/// you may need to handle type coercion manually in these cases.
///
/// The codes in this function shows various ways to perform type coercion on expressions:
/// 1. Using `SessionContext::create_physical_expr`
/// 2. Using `ExprSimplifier::coerce`
/// 3. Using `TreeNodeRewriter::rewrite` based on `TypeCoercionRewriter`
/// 4. Using `TreeNode::transform`
///
/// Note, this list may not be complete and there may be other methods to apply type coercion to expressions.
fn type_coercion_demo() -> Result<()> {
// Creates a record batch for demo.
let df_schema = DFSchema::from_unqualified_fields(
vec![Field::new("a", DataType::Int8, false)].into(),
HashMap::new(),
)?;
let i8_array = Int8Array::from_iter_values(vec![0, 1, 2]);
let batch = RecordBatch::try_new(
Arc::new(df_schema.as_arrow().to_owned()),
vec![Arc::new(i8_array) as _],
)?;

// Constructs a binary expression for demo.
// By default, the literal `1` is translated into the Int32 type and cannot be directly compared with the Int8 type.
let expr = col("a").gt(lit(1));

// Evaluation with an expression that has not been type coerced cannot succeed.
let props = ExecutionProps::default();
let physical_expr =
datafusion_physical_expr::create_physical_expr(&expr, &df_schema, &props)?;
let e = physical_expr.evaluate(&batch).unwrap_err();
assert!(e
.find_root()
.to_string()
.contains("Invalid comparison operation: Int8 > Int32"));

// 1. Type coercion with `SessionContext::create_physical_expr` which implicitly applies type coercion before constructing the physical expr.
let physical_expr =
SessionContext::new().create_physical_expr(expr.clone(), &df_schema)?;
assert!(physical_expr.evaluate(&batch).is_ok());

// 2. Type coercion with `ExprSimplifier::coerce`.
let context = SimplifyContext::new(&props).with_schema(Arc::new(df_schema.clone()));
let simplifier = ExprSimplifier::new(context);
let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?;
let physical_expr = datafusion_physical_expr::create_physical_expr(
&coerced_expr,
&df_schema,
&props,
)?;
assert!(physical_expr.evaluate(&batch).is_ok());

// 3. Type coercion with `TypeCoercionRewriter`.
let coerced_expr = expr
.clone()
.rewrite(&mut TypeCoercionRewriter::new(&df_schema))?
.data;
let physical_expr = datafusion_physical_expr::create_physical_expr(
&coerced_expr,
&df_schema,
&props,
)?;
assert!(physical_expr.evaluate(&batch).is_ok());

// 4. Apply explict type coercion by manually rewriting the expression
let coerced_expr = expr
.transform(|e| {
// Only type coerces binary expressions.
let Expr::BinaryExpr(e) = e else {
return Ok(Transformed::no(e));
};
if let Expr::Column(ref col_expr) = *e.left {
let field = df_schema.field_with_name(None, col_expr.name())?;
let cast_to_type = field.data_type();
let coerced_right = e.right.cast_to(cast_to_type, &df_schema)?;
Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new(
e.left,
e.op,
Box::new(coerced_right),
))))
} else {
Ok(Transformed::no(Expr::BinaryExpr(e)))
}
})?
.data;
let physical_expr = datafusion_physical_expr::create_physical_expr(
&coerced_expr,
&df_schema,
&props,
)?;
assert!(physical_expr.evaluate(&batch).is_ok());

Ok(())
}
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub struct TypeCoercionRewriter<'a> {
impl<'a> TypeCoercionRewriter<'a> {
/// Create a new [`TypeCoercionRewriter`] with a provided schema
/// representing both the inputs and output of the [`LogicalPlan`] node.
fn new(schema: &'a DFSchema) -> Self {
pub fn new(schema: &'a DFSchema) -> Self {
Self { schema }
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ fn main() {
let statement = &ast[0];

// create a logical query plan
let schema_provider = MySchemaProvider::new();
let sql_to_rel = SqlToRel::new(&schema_provider);
let context_provider = MyContextProvider::new();
let sql_to_rel = SqlToRel::new(&context_provider);
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

// show the plan
Expand Down