From 8ed1b3c1478692ae23dde151b3f36d81ca957294 Mon Sep 17 00:00:00 2001 From: Shicong Date: Wed, 13 Nov 2024 18:01:28 +0800 Subject: [PATCH 1/5] chore: make TypeCoercionRewriter::new public --- datafusion/optimizer/src/analyzer/type_coercion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 9793c4c5490f3..b56c2dc604a9b 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -158,7 +158,7 @@ pub struct TypeCoercionRewriter<'a> { impl<'a> TypeCoercionRewriter<'a> { /// Create a new [`TypeCoercionRewriter`] with a provided schema /// representing both the inputs and output of the [`LogicalPlan`] node. - fn new(schema: &'a DFSchema) -> Self { + pub fn new(schema: &'a DFSchema) -> Self { Self { schema } } From cbd0d4f4c4872f0825b3db412697d80282317d25 Mon Sep 17 00:00:00 2001 From: Shicong Date: Wed, 13 Nov 2024 18:01:54 +0800 Subject: [PATCH 2/5] docs: add docs for type coerce expressions --- datafusion-examples/README.md | 2 +- datafusion-examples/examples/expr_api.rs | 119 ++++++++++++++++++++++- 2 files changed, 119 insertions(+), 2 deletions(-) diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 5f032c3e9cfff..75fcaddf8def3 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -61,7 +61,7 @@ cargo run --example dataframe - [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory - [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame - [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde -- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and analyze `Expr`s +- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s - [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks. - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients - [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 0eb823302acf6..675821f830ee4 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::sync::Arc; -use arrow::array::{BooleanArray, Int32Array}; +use arrow::array::{BooleanArray, Int32Array, Int8Array}; use arrow::record_batch::RecordBatch; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; @@ -28,12 +28,14 @@ use datafusion::functions_aggregate::first_last::first_value_udaf; use datafusion::optimizer::simplify_expressions::ExprSimplifier; use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries}; use datafusion::prelude::*; +use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{ScalarValue, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::BinaryExpr; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator}; +use datafusion_optimizer::analyzer::type_coercion::TypeCoercionRewriter; /// This example demonstrates the DataFusion [`Expr`] API. /// @@ -51,6 +53,7 @@ use datafusion_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator}; /// 4. Simplify expressions: [`simplify_demo`] /// 5. Analyze predicates for boundary ranges: [`range_analysis_demo`] /// 6. Get the types of the expressions: [`expression_type_demo`] +/// 7. Apply type cocercion to expressions: [`type_coercion_demo`] #[tokio::main] async fn main() -> Result<()> { // The easiest way to do create expressions is to use the @@ -80,6 +83,9 @@ async fn main() -> Result<()> { // See how to determine the data types of expressions expression_type_demo()?; + // See how to type coerce expressions. + type_coercion_demo()?; + Ok(()) } @@ -316,3 +322,114 @@ fn expression_type_demo() -> Result<()> { Ok(()) } + +/// This function demonstrates how to apply type coercion to expressions, such as binary expressions. +/// +/// In most cases, manual type coercion is not required since DataFusion handles it implicitly. +/// However, certain projects may construct `ExecutionPlan`s directly from DataFusion logical expressions, +/// bypassing the construction of DataFusion logical plans. +/// Since constructing `ExecutionPlan`s from logical expressions does not automatically apply type coercion, +/// you may need to handle type coercion manually in these cases. +/// +/// The codes in this function shows various ways to perform type coercion on expressions: +/// 1. Using `SessionContext::create_physical_expr` +/// 2. Using `ExprSimplifier::coerce` +/// 3. Using `TreeNodeRewriter::rewrite` based on `TypeCoercionRewriter` +/// 4. Using `TreeNode::transform` +/// +/// Note, this list cannot be complete and there may have other methods to apply type coercion to expressions. +fn type_coercion_demo() -> Result<()> { + // Creates a record batch for demo. + let df_schema = DFSchema::from_unqualified_fields( + vec![Field::new("a", DataType::Int8, false)].into(), + HashMap::new(), + )?; + let i8_array = Int8Array::from_iter_values(vec![0, 1, 2]); + let batch = RecordBatch::try_new( + Arc::new(df_schema.as_arrow().to_owned()), + vec![Arc::new(i8_array) as _], + )?; + + // Constructs a binary expression for demo. + // By default, the literal `1` is translated into the Int32 type and cannot be directly compared with the Int8 type. + let expr = col("a").gt(lit(1)); + + // Evaluation with an expression that has not been type coerced cannot succeed. + let props = ExecutionProps::default(); + let physical_expr = + datafusion_physical_expr::create_physical_expr(&expr, &df_schema, &props)?; + let Err(e) = physical_expr.evaluate(&batch) else { + unreachable!() + }; + assert!(e + .find_root() + .to_string() + .contains("Invalid comparison operation: Int8 > Int32")); + + // 1. Type coercion with `SessionContext::create_physical_expr` which implicitly applies type coercion before constructing the physical expr. + let physical_expr = + SessionContext::new().create_physical_expr(expr.clone(), &df_schema)?; + assert!(physical_expr.evaluate(&batch).is_ok()); + + // 2. Type coercion with `ExprSimplifier::coerce`. + let context = SimplifyContext::new(&props).with_schema(Arc::new(df_schema.clone())); + let simplifier = ExprSimplifier::new(context); + let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?; + let physical_expr = datafusion_physical_expr::create_physical_expr( + &coerced_expr, + &df_schema, + &props, + )?; + assert!(physical_expr.evaluate(&batch).is_ok()); + + // 3. Type coercion with `TypeCoercionRewriter`. + let coerced_expr = expr + .clone() + .rewrite(&mut TypeCoercionRewriter::new(&df_schema))? + .data; + let physical_expr = datafusion_physical_expr::create_physical_expr( + &coerced_expr, + &df_schema, + &props, + )?; + assert!(physical_expr.evaluate(&batch).is_ok()); + + // 4. Type coercion with manual transformation. + let coerced_expr = expr + .transform(|e| { + // Only type coerces binary expressions. + let Expr::BinaryExpr(e) = e else { + return Ok(Transformed::no(e)); + }; + if let Expr::Column(ref col_expr) = *e.left { + let field = df_schema.field_with_name(None, col_expr.name())?; + let cast_to_type = field.data_type(); + let coerced_right = e.right.cast_to(cast_to_type, &df_schema)?; + Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new( + e.left, + e.op, + Box::new(coerced_right), + )))) + } else if let Expr::Column(ref col_expr) = *e.right { + let field = df_schema.field_with_name(None, col_expr.name())?; + let cast_to_type = field.data_type(); + let coerced_left = e.left.cast_to(cast_to_type, &df_schema)?; + Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new( + Box::new(coerced_left), + e.op, + e.right, + )))) + } else { + Ok(Transformed::no(Expr::BinaryExpr(e))) + } + })? + .data; + let physical_expr = datafusion_physical_expr::create_physical_expr( + &coerced_expr, + &df_schema, + &props, + )?; + assert!(physical_expr.evaluate(&batch).is_ok()); + + Ok(()) +} From 59013c2776238ac50893718545bfc0aecfb97575 Mon Sep 17 00:00:00 2001 From: Shicong Date: Wed, 13 Nov 2024 18:02:08 +0800 Subject: [PATCH 3/5] docs: update datafusion-sql readme --- datafusion/sql/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/README.md b/datafusion/sql/README.md index 02ed1ae1f58be..98f3c4faa2ec0 100644 --- a/datafusion/sql/README.md +++ b/datafusion/sql/README.md @@ -52,8 +52,8 @@ fn main() { let statement = &ast[0]; // create a logical query plan - let schema_provider = MySchemaProvider::new(); - let sql_to_rel = SqlToRel::new(&schema_provider); + let context_provider = MyContextProvider::new(); + let sql_to_rel = SqlToRel::new(&context_provider); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); // show the plan From 56fa861aa06e85669002bc0cf9a7ace49e3a0944 Mon Sep 17 00:00:00 2001 From: Shicong Date: Wed, 13 Nov 2024 18:09:43 +0800 Subject: [PATCH 4/5] docs: update type coercion demo --- datafusion-examples/examples/expr_api.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 675821f830ee4..87d7d58dfdded 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -410,15 +410,6 @@ fn type_coercion_demo() -> Result<()> { e.op, Box::new(coerced_right), )))) - } else if let Expr::Column(ref col_expr) = *e.right { - let field = df_schema.field_with_name(None, col_expr.name())?; - let cast_to_type = field.data_type(); - let coerced_left = e.left.cast_to(cast_to_type, &df_schema)?; - Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new( - Box::new(coerced_left), - e.op, - e.right, - )))) } else { Ok(Transformed::no(Expr::BinaryExpr(e))) } From d6f25fb988cbf7ba95c3544dbd02014b7bd4c0aa Mon Sep 17 00:00:00 2001 From: niebayes Date: Wed, 13 Nov 2024 23:39:34 +0800 Subject: [PATCH 5/5] Apply suggestions from code review Co-authored-by: Andrew Lamb --- datafusion-examples/examples/expr_api.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 87d7d58dfdded..cb0796bdcf735 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -337,7 +337,7 @@ fn expression_type_demo() -> Result<()> { /// 3. Using `TreeNodeRewriter::rewrite` based on `TypeCoercionRewriter` /// 4. Using `TreeNode::transform` /// -/// Note, this list cannot be complete and there may have other methods to apply type coercion to expressions. +/// Note, this list may not be complete and there may be other methods to apply type coercion to expressions. fn type_coercion_demo() -> Result<()> { // Creates a record batch for demo. let df_schema = DFSchema::from_unqualified_fields( @@ -358,9 +358,7 @@ fn type_coercion_demo() -> Result<()> { let props = ExecutionProps::default(); let physical_expr = datafusion_physical_expr::create_physical_expr(&expr, &df_schema, &props)?; - let Err(e) = physical_expr.evaluate(&batch) else { - unreachable!() - }; + let e = physical_expr.evaluate(&batch).unwrap_err(); assert!(e .find_root() .to_string() @@ -394,7 +392,7 @@ fn type_coercion_demo() -> Result<()> { )?; assert!(physical_expr.evaluate(&batch).is_ok()); - // 4. Type coercion with manual transformation. + // 4. Apply explict type coercion by manually rewriting the expression let coerced_expr = expr .transform(|e| { // Only type coerces binary expressions.