diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 5f032c3e9cfff..75fcaddf8def3 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -61,7 +61,7 @@ cargo run --example dataframe - [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory - [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame - [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde -- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and analyze `Expr`s +- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s - [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks. - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients - [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 0eb823302acf6..cb0796bdcf735 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::sync::Arc; -use arrow::array::{BooleanArray, Int32Array}; +use arrow::array::{BooleanArray, Int32Array, Int8Array}; use arrow::record_batch::RecordBatch; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; @@ -28,12 +28,14 @@ use datafusion::functions_aggregate::first_last::first_value_udaf; use datafusion::optimizer::simplify_expressions::ExprSimplifier; use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries}; use datafusion::prelude::*; +use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{ScalarValue, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::BinaryExpr; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator}; +use datafusion_optimizer::analyzer::type_coercion::TypeCoercionRewriter; /// This example demonstrates the DataFusion [`Expr`] API. /// @@ -51,6 +53,7 @@ use datafusion_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator}; /// 4. Simplify expressions: [`simplify_demo`] /// 5. Analyze predicates for boundary ranges: [`range_analysis_demo`] /// 6. Get the types of the expressions: [`expression_type_demo`] +/// 7. Apply type cocercion to expressions: [`type_coercion_demo`] #[tokio::main] async fn main() -> Result<()> { // The easiest way to do create expressions is to use the @@ -80,6 +83,9 @@ async fn main() -> Result<()> { // See how to determine the data types of expressions expression_type_demo()?; + // See how to type coerce expressions. + type_coercion_demo()?; + Ok(()) } @@ -316,3 +322,103 @@ fn expression_type_demo() -> Result<()> { Ok(()) } + +/// This function demonstrates how to apply type coercion to expressions, such as binary expressions. +/// +/// In most cases, manual type coercion is not required since DataFusion handles it implicitly. +/// However, certain projects may construct `ExecutionPlan`s directly from DataFusion logical expressions, +/// bypassing the construction of DataFusion logical plans. +/// Since constructing `ExecutionPlan`s from logical expressions does not automatically apply type coercion, +/// you may need to handle type coercion manually in these cases. +/// +/// The codes in this function shows various ways to perform type coercion on expressions: +/// 1. Using `SessionContext::create_physical_expr` +/// 2. Using `ExprSimplifier::coerce` +/// 3. Using `TreeNodeRewriter::rewrite` based on `TypeCoercionRewriter` +/// 4. Using `TreeNode::transform` +/// +/// Note, this list may not be complete and there may be other methods to apply type coercion to expressions. +fn type_coercion_demo() -> Result<()> { + // Creates a record batch for demo. + let df_schema = DFSchema::from_unqualified_fields( + vec![Field::new("a", DataType::Int8, false)].into(), + HashMap::new(), + )?; + let i8_array = Int8Array::from_iter_values(vec![0, 1, 2]); + let batch = RecordBatch::try_new( + Arc::new(df_schema.as_arrow().to_owned()), + vec![Arc::new(i8_array) as _], + )?; + + // Constructs a binary expression for demo. + // By default, the literal `1` is translated into the Int32 type and cannot be directly compared with the Int8 type. + let expr = col("a").gt(lit(1)); + + // Evaluation with an expression that has not been type coerced cannot succeed. + let props = ExecutionProps::default(); + let physical_expr = + datafusion_physical_expr::create_physical_expr(&expr, &df_schema, &props)?; + let e = physical_expr.evaluate(&batch).unwrap_err(); + assert!(e + .find_root() + .to_string() + .contains("Invalid comparison operation: Int8 > Int32")); + + // 1. Type coercion with `SessionContext::create_physical_expr` which implicitly applies type coercion before constructing the physical expr. + let physical_expr = + SessionContext::new().create_physical_expr(expr.clone(), &df_schema)?; + assert!(physical_expr.evaluate(&batch).is_ok()); + + // 2. Type coercion with `ExprSimplifier::coerce`. + let context = SimplifyContext::new(&props).with_schema(Arc::new(df_schema.clone())); + let simplifier = ExprSimplifier::new(context); + let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?; + let physical_expr = datafusion_physical_expr::create_physical_expr( + &coerced_expr, + &df_schema, + &props, + )?; + assert!(physical_expr.evaluate(&batch).is_ok()); + + // 3. Type coercion with `TypeCoercionRewriter`. + let coerced_expr = expr + .clone() + .rewrite(&mut TypeCoercionRewriter::new(&df_schema))? + .data; + let physical_expr = datafusion_physical_expr::create_physical_expr( + &coerced_expr, + &df_schema, + &props, + )?; + assert!(physical_expr.evaluate(&batch).is_ok()); + + // 4. Apply explict type coercion by manually rewriting the expression + let coerced_expr = expr + .transform(|e| { + // Only type coerces binary expressions. + let Expr::BinaryExpr(e) = e else { + return Ok(Transformed::no(e)); + }; + if let Expr::Column(ref col_expr) = *e.left { + let field = df_schema.field_with_name(None, col_expr.name())?; + let cast_to_type = field.data_type(); + let coerced_right = e.right.cast_to(cast_to_type, &df_schema)?; + Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new( + e.left, + e.op, + Box::new(coerced_right), + )))) + } else { + Ok(Transformed::no(Expr::BinaryExpr(e))) + } + })? + .data; + let physical_expr = datafusion_physical_expr::create_physical_expr( + &coerced_expr, + &df_schema, + &props, + )?; + assert!(physical_expr.evaluate(&batch).is_ok()); + + Ok(()) +} diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 9793c4c5490f3..b56c2dc604a9b 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -158,7 +158,7 @@ pub struct TypeCoercionRewriter<'a> { impl<'a> TypeCoercionRewriter<'a> { /// Create a new [`TypeCoercionRewriter`] with a provided schema /// representing both the inputs and output of the [`LogicalPlan`] node. - fn new(schema: &'a DFSchema) -> Self { + pub fn new(schema: &'a DFSchema) -> Self { Self { schema } } diff --git a/datafusion/sql/README.md b/datafusion/sql/README.md index 02ed1ae1f58be..98f3c4faa2ec0 100644 --- a/datafusion/sql/README.md +++ b/datafusion/sql/README.md @@ -52,8 +52,8 @@ fn main() { let statement = &ast[0]; // create a logical query plan - let schema_provider = MySchemaProvider::new(); - let sql_to_rel = SqlToRel::new(&schema_provider); + let context_provider = MyContextProvider::new(); + let sql_to_rel = SqlToRel::new(&context_provider); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); // show the plan