From 0309248ce0763f461bf0e6ccde0c1259c53882e8 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 23 Jun 2026 14:08:13 +0100 Subject: [PATCH 1/5] Move source-specific UDF rewrite functionality to datafusion-datasource Signed-off-by: Adam Gutglick --- .../datasource-parquet/src/opener/mod.rs | 2 +- datafusion/datasource-parquet/src/source.rs | 10 +- datafusion/datasource/Cargo.toml | 2 +- datafusion/datasource/src/mod.rs | 1 + datafusion/datasource/src/projection.rs | 2 +- datafusion/datasource/src/rewrite.rs | 333 ++++++++++++++++++ datafusion/physical-expr-adapter/src/lib.rs | 3 +- .../src/schema_rewriter.rs | 299 +--------------- 8 files changed, 349 insertions(+), 303 deletions(-) create mode 100644 datafusion/datasource/src/rewrite.rs diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index 8ded4ea5b13e3..25bc41616068f 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -38,9 +38,9 @@ use crate::{ use arrow::array::RecordBatch; use arrow::datatypes::DataType; use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use datafusion_datasource::rewrite::rewrite_input_file_name_in_projection; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::replace_columns_with_literals; -use datafusion_physical_expr_adapter::schema_rewriter::rewrite_input_file_name_in_projection; use std::collections::{HashMap, VecDeque}; use std::fmt; use std::future::Future; diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a2503c1071748..b45470c29523e 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -43,14 +43,14 @@ use datafusion_common::config::TableParquetOptions; use datafusion_datasource::TableSchema; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::rewrite::{ + expr_references_scalar_udf, rewrite_file_row_index_projection, +}; use datafusion_functions::core::file_row_index::FileRowIndexFunc; use datafusion_physical_expr::expressions::{Column, DynamicFilterTracking}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::{EquivalenceProperties, conjunction}; -use datafusion_physical_expr_adapter::expr_references_scalar_udf; -use datafusion_physical_expr_adapter::{ - DefaultPhysicalExprAdapterFactory, rewrite_file_row_index_projection, -}; +use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_plan::DisplayFormatType; @@ -1826,10 +1826,10 @@ mod tests { use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_datasource::TableSchema; + use datafusion_datasource::rewrite::rewrite_file_row_index_expr; use datafusion_expr::{col, lit as logical_lit}; use datafusion_functions::core::expr_fn::file_row_index; use datafusion_physical_expr::planner::logical2physical; - use datafusion_physical_expr_adapter::rewrite_file_row_index_expr; use datafusion_physical_plan::filter_pushdown::PushedDown; use parquet::arrow::RowNumber; diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 2ac42ed900095..69daa79d3311b 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -52,6 +52,7 @@ datafusion-common = { workspace = true, features = ["object_store"] } datafusion-common-runtime = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-functions = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } @@ -74,7 +75,6 @@ zstd = { workspace = true, optional = true } [dev-dependencies] criterion = { workspace = true } -datafusion-functions = { workspace = true } insta = { workspace = true } tempfile = { workspace = true } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index d4dfa1180ecf2..925965b104477 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -41,6 +41,7 @@ pub mod file_stream; pub mod memory; pub mod morsel; pub mod projection; +pub mod rewrite; pub mod schema_adapter; pub mod sink; pub mod source; diff --git a/datafusion/datasource/src/projection.rs b/datafusion/datasource/src/projection.rs index f0a58771ed4ce..5ed700dde30a6 100644 --- a/datafusion/datasource/src/projection.rs +++ b/datafusion/datasource/src/projection.rs @@ -17,6 +17,7 @@ use std::sync::Arc; +use crate::rewrite::rewrite_input_file_name_in_projection; use arrow::datatypes::{Schema, SchemaRef}; use datafusion_common::{ Result, ScalarValue, @@ -26,7 +27,6 @@ use datafusion_physical_expr::{ expressions::{Column, Literal}, projection::{ProjectionExpr, ProjectionExprs}, }; -use datafusion_physical_expr_adapter::schema_rewriter::rewrite_input_file_name_in_projection; use futures::{FutureExt, StreamExt}; use itertools::Itertools; diff --git a/datafusion/datasource/src/rewrite.rs b/datafusion/datasource/src/rewrite.rs new file mode 100644 index 0000000000000..0d48b0bef5980 --- /dev/null +++ b/datafusion/datasource/src/rewrite.rs @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Rewrites that lower scan-metadata scalar UDFs into concrete physical +//! expressions for the specific file being scanned. +//! +//! Functions like `file_row_index()` and `input_file_name()` are placeholders +//! whose value is only known during a file scan. The helpers here replace those +//! UDFs with ordinary physical expressions bound to the current file: a column +//! reference into a source-provided row-index column, or a per-file literal. + +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field}; +use datafusion_common::{ + Result, ScalarValue, + tree_node::{Transformed, TreeNode, TreeNodeRecursion}, +}; +use datafusion_expr::ScalarUDFImpl; +use datafusion_functions::core::file_row_index::FileRowIndexFunc; +use datafusion_functions::core::input_file_name::InputFileNameFunc; +use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr::expressions::{CastExpr, Column, Literal}; +use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + +/// Return true if `expr` references scalar UDF `T`. +/// +/// This matches the concrete [`ScalarUDFImpl`] type rather than the function +/// name, so unrelated UDFs with the same name are not treated as matches. +pub fn expr_references_scalar_udf( + expr: &Arc, +) -> bool { + let mut found = false; + + expr.apply(|node| { + if ScalarFunctionExpr::try_downcast_func::(node.as_ref()).is_some() { + found = true; + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + }) + .expect("Infallible traversal of PhysicalExpr tree failed"); + + found +} + +/// Rewrite occurrences of scalar UDF `T` in `expr` using `replacement`. +/// +/// The rewrite matches the concrete [`ScalarUDFImpl`] type rather than the +/// function name. `replacement` is called with each matching +/// [`ScalarFunctionExpr`] after its children have been rewritten. +fn rewrite_scalar_udf( + expr: Arc, + mut replacement: F, +) -> Result> +where + T: ScalarUDFImpl, + F: FnMut(&ScalarFunctionExpr) -> Result>, +{ + expr.transform_up(|node| { + if let Some(scalar_fn) = ScalarFunctionExpr::try_downcast_func::(node.as_ref()) + { + Ok(Transformed::yes(replacement(scalar_fn)?)) + } else { + Ok(Transformed::no(node)) + } + }) + .map(|transformed| transformed.data) +} + +/// Rewrite `file_row_index()` in `expr` to read from a source-provided +/// row-index column. +/// +/// `row_index_idx` is the index of `row_index_name` in the schema that the +/// rewritten expression will be evaluated against. The rewrite uses ordinary +/// physical expressions: a [`Column`] that reads the source row-index values +/// wrapped in a [`CastExpr`] that exposes the public `file_row_index: Int64` +/// return field without source-specific extension metadata. +pub fn rewrite_file_row_index_expr( + expr: Arc, + row_index_name: &str, + row_index_idx: usize, +) -> Result> { + rewrite_scalar_udf::(expr, |_| { + let source = Arc::new(Column::new(row_index_name, row_index_idx)); + let target_field = Arc::new(Field::new("file_row_index", DataType::Int64, true)); + Ok(Arc::new(CastExpr::new_with_target_field( + source, + target_field, + None, + ))) + }) +} + +/// Rewrite `file_row_index()` in a pushed projection to read from a +/// source-provided row-index column. +/// +/// +/// For example if `row_index_column` is `__datafusion_row_idx` this function rewrites all +/// instances of `file_row_index()` to `__datafusion_row_index` column references. +/// +/// `base_projection` is the current projection already pushed into a source. +/// The row-index source column is appended to that base projection if it is not +/// already present. `projection` is rewritten to read from the projected +/// row-index column and then merged on top of the extended base projection. +pub fn rewrite_file_row_index_projection( + base_projection: &ProjectionExprs, + projection: &ProjectionExprs, + row_index_col: &Column, +) -> Result { + let mut base_exprs = base_projection.as_ref().to_vec(); + let row_index_projection_idx = + base_projection.projected_column_position(row_index_col); + + // If the column doesn't exist in the projection yet + if row_index_projection_idx.is_none() { + base_exprs.push(ProjectionExpr { + expr: Arc::new(row_index_col.clone()), + alias: row_index_col.name().to_owned(), + }); + } + + let rewritten_projection = projection.clone().try_map_exprs(|expr| { + rewrite_file_row_index_expr( + expr, + row_index_col.name(), + row_index_projection_idx.unwrap_or(base_exprs.len() - 1), + ) + })?; + + ProjectionExprs::new(base_exprs).try_merge(&rewritten_projection) +} + +/// Rewrite `input_file_name()` in a pushed projection to a per-file `Utf8` +/// literal holding `file_name`. +/// +/// If the projection contains no `input_file_name()` UDF it is returned +/// unchanged, without allocating the literal or rebuilding the projection tree +/// (the common case for queries that don't use the function). +pub fn rewrite_input_file_name_in_projection( + projection: ProjectionExprs, + file_name: &str, +) -> Result { + if !projection + .iter() + .any(|p| expr_references_scalar_udf::(&p.expr)) + { + return Ok(projection); + } + + let file_name_lit = + Arc::new(Literal::new(ScalarValue::Utf8(Some(file_name.to_string())))) + as Arc; + + projection.try_map_exprs(|expr| { + rewrite_scalar_udf::(expr, |_| { + Ok(Arc::clone(&file_name_lit)) + }) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + use arrow::datatypes::Schema; + use datafusion_common::config::ConfigOptions; + use datafusion_expr::{Operator, ScalarUDF}; + use datafusion_physical_expr::expressions; + use std::collections::HashMap; + + fn file_row_index_expr() -> Arc { + Arc::new(ScalarFunctionExpr::new( + "file_row_index", + Arc::new(ScalarUDF::from(FileRowIndexFunc::new())), + vec![], + Arc::new(Field::new("file_row_index", DataType::Int64, true)), + Arc::new(ConfigOptions::default()), + )) + } + + fn input_file_name_expr() -> Arc { + Arc::new(ScalarFunctionExpr::new( + "input_file_name", + Arc::new(ScalarUDF::from(InputFileNameFunc::new())), + vec![], + Arc::new(Field::new("input_file_name", DataType::Utf8, true)), + Arc::new(ConfigOptions::default()), + )) + } + + #[test] + fn test_rewrite_scalar_udf_replaces_nested_typed_udf() -> Result<()> { + let expr = Arc::new(expressions::BinaryExpr::new( + file_row_index_expr(), + Operator::Plus, + expressions::lit(ScalarValue::Int64(Some(1))), + )) as Arc; + + let rewritten = rewrite_scalar_udf::(expr, |_| { + Ok(expressions::lit(ScalarValue::Int64(Some(7)))) + })?; + + let binary = rewritten + .downcast_ref::() + .expect("rewritten expression should remain binary"); + assert_eq!(binary.op(), &Operator::Plus); + + let left = binary + .left() + .downcast_ref::() + .expect("left side should be rewritten to a literal"); + assert_eq!(left.value(), &ScalarValue::Int64(Some(7))); + + let right = binary + .right() + .downcast_ref::() + .expect("right side should remain the original literal"); + assert_eq!(right.value(), &ScalarValue::Int64(Some(1))); + Ok(()) + } + + #[test] + fn test_rewrite_input_file_name_in_projection() -> Result<()> { + let file_name = "part=west/data.parquet"; + let projection = ProjectionExprs::new([ + ProjectionExpr::new(input_file_name_expr(), "file_name"), + ProjectionExpr::new( + Arc::new(expressions::BinaryExpr::new( + input_file_name_expr(), + Operator::Eq, + expressions::lit(ScalarValue::Utf8(Some(file_name.to_string()))), + )), + "matches_file", + ), + ]); + + let rewritten = rewrite_input_file_name_in_projection(projection, file_name)?; + let rewritten = rewritten.as_ref(); + assert_eq!(rewritten[0].alias, "file_name"); + assert_eq!(rewritten[1].alias, "matches_file"); + + let file_name_lit = rewritten[0] + .expr + .downcast_ref::() + .expect("input_file_name should rewrite to a literal"); + assert_eq!( + file_name_lit.value(), + &ScalarValue::Utf8(Some(file_name.to_string())) + ); + + let binary = rewritten[1] + .expr + .downcast_ref::() + .expect("nested expression should remain binary"); + assert_eq!(binary.op(), &Operator::Eq); + + let left = binary + .left() + .downcast_ref::() + .expect("nested input_file_name should rewrite to a literal"); + assert_eq!( + left.value(), + &ScalarValue::Utf8(Some(file_name.to_string())) + ); + + let right = binary + .right() + .downcast_ref::() + .expect("comparison literal should remain unchanged"); + assert_eq!( + right.value(), + &ScalarValue::Utf8(Some(file_name.to_string())) + ); + Ok(()) + } + + #[test] + fn test_rewrite_file_row_index_expr_to_source_column() -> Result<()> { + let expr = rewrite_file_row_index_expr( + file_row_index_expr(), + "__datafusion_file_row_index", + 2, + )?; + + let cast_expr = expr + .downcast_ref::() + .expect("file row index expression should be a cast"); + assert_eq!(cast_expr.cast_type(), &DataType::Int64); + let target_field = cast_expr.target_field(); + assert_eq!(target_field.name(), "file_row_index"); + assert_eq!(target_field.data_type(), &DataType::Int64); + assert!(target_field.is_nullable()); + assert!(target_field.metadata().is_empty()); + + let source = cast_expr + .expr() + .downcast_ref::() + .expect("source column"); + assert_eq!(source.name(), "__datafusion_file_row_index"); + assert_eq!(source.index(), 2); + + let input_schema = Schema::new(vec![ + Field::new("value", DataType::Int64, true), + Field::new("__datafusion_file_row_index", DataType::Int64, false) + .with_metadata(HashMap::from([( + "source".to_string(), + "virtual".to_string(), + )])), + ]); + let return_field = expr.return_field(&input_schema)?; + assert_eq!(return_field.name(), "file_row_index"); + assert_eq!(return_field.data_type(), &DataType::Int64); + assert!(return_field.is_nullable()); + assert!(return_field.metadata().is_empty()); + Ok(()) + } +} diff --git a/datafusion/physical-expr-adapter/src/lib.rs b/datafusion/physical-expr-adapter/src/lib.rs index fa14bc8b4d150..ea4db19ee110e 100644 --- a/datafusion/physical-expr-adapter/src/lib.rs +++ b/datafusion/physical-expr-adapter/src/lib.rs @@ -29,6 +29,5 @@ pub mod schema_rewriter; pub use schema_rewriter::{ BatchAdapter, BatchAdapterFactory, DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, - expr_references_scalar_udf, replace_columns_with_literals, - rewrite_file_row_index_expr, rewrite_file_row_index_projection, + replace_columns_with_literals, }; diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index 36cf1e2a67157..d9eed669ba98f 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -25,21 +25,17 @@ use std::hash::Hash; use std::sync::Arc; use arrow::array::RecordBatch; -use arrow::datatypes::{DataType, Field, FieldRef, SchemaRef}; +use arrow::datatypes::{DataType, FieldRef, SchemaRef}; use datafusion_common::{ DataFusionError, Result, ScalarValue, exec_err, metadata::FieldMetadata, nested_struct::validate_data_type_compatibility, - tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion}, -}; -use datafusion_expr::ScalarUDFImpl; -use datafusion_functions::core::input_file_name::InputFileNameFunc; -use datafusion_functions::core::{ - file_row_index::FileRowIndexFunc, getfield::GetFieldFunc, + tree_node::{Transformed, TransformedResult, TreeNode}, }; +use datafusion_functions::core::getfield::GetFieldFunc; use datafusion_physical_expr::PhysicalExprSimplifier; use datafusion_physical_expr::expressions::Literal; -use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs, Projector}; +use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::{ ScalarFunctionExpr, expressions::{self, CastExpr, Column}, @@ -86,142 +82,6 @@ where .data() } -/// Return true if `expr` references scalar UDF `T`. -/// -/// This matches the concrete [`ScalarUDFImpl`] type rather than the function -/// name, so unrelated UDFs with the same name are not treated as matches. -pub fn expr_references_scalar_udf( - expr: &Arc, -) -> bool { - let mut found = false; - - expr.apply(|node| { - if ScalarFunctionExpr::try_downcast_func::(node.as_ref()).is_some() { - found = true; - return Ok(TreeNodeRecursion::Stop); - } - Ok(TreeNodeRecursion::Continue) - }) - .expect("Infallible traversal of PhysicalExpr tree failed"); - - found -} - -/// Rewrite occurrences of scalar UDF `T` in `expr` using `replacement`. -/// -/// The rewrite matches the concrete [`ScalarUDFImpl`] type rather than the -/// function name. `replacement` is called with each matching -/// [`ScalarFunctionExpr`] after its children have been rewritten. -fn rewrite_scalar_udf( - expr: Arc, - mut replacement: F, -) -> Result> -where - T: ScalarUDFImpl, - F: FnMut(&ScalarFunctionExpr) -> Result>, -{ - expr.transform_up(|node| { - if let Some(scalar_fn) = ScalarFunctionExpr::try_downcast_func::(node.as_ref()) - { - Ok(Transformed::yes(replacement(scalar_fn)?)) - } else { - Ok(Transformed::no(node)) - } - }) - .map(|transformed| transformed.data) -} - -/// Rewrite `file_row_index()` in `expr` to read from a source-provided -/// row-index column. -/// -/// `row_index_idx` is the index of `row_index_name` in the schema that the -/// rewritten expression will be evaluated against. The rewrite uses ordinary -/// physical expressions: a [`Column`] that reads the source row-index values -/// wrapped in a [`CastExpr`] that exposes the public `file_row_index: Int64` -/// return field without source-specific extension metadata. -pub fn rewrite_file_row_index_expr( - expr: Arc, - row_index_name: &str, - row_index_idx: usize, -) -> Result> { - rewrite_scalar_udf::(expr, |_| { - let source = Arc::new(Column::new(row_index_name, row_index_idx)); - let target_field = Arc::new(Field::new("file_row_index", DataType::Int64, true)); - Ok(Arc::new(CastExpr::new_with_target_field( - source, - target_field, - None, - ))) - }) -} - -/// Rewrite `file_row_index()` in a pushed projection to read from a -/// source-provided row-index column. -/// -/// -/// For example if `row_index_column` is `__datafusion_row_idx` this function rewrites all -/// instances of `file_row_index()` to `__datafusion_row_index` column references. -/// -/// `base_projection` is the current projection already pushed into a source. -/// The row-index source column is appended to that base projection if it is not -/// already present. `projection` is rewritten to read from the projected -/// row-index column and then merged on top of the extended base projection. -pub fn rewrite_file_row_index_projection( - base_projection: &ProjectionExprs, - projection: &ProjectionExprs, - row_index_col: &Column, -) -> Result { - let mut base_exprs = base_projection.as_ref().to_vec(); - let row_index_projection_idx = - base_projection.projected_column_position(row_index_col); - - // If the column doesn't exist in the projection yet - if row_index_projection_idx.is_none() { - base_exprs.push(ProjectionExpr { - expr: Arc::new(row_index_col.clone()), - alias: row_index_col.name().to_owned(), - }); - } - - let rewritten_projection = projection.clone().try_map_exprs(|expr| { - rewrite_file_row_index_expr( - expr, - row_index_col.name(), - row_index_projection_idx.unwrap_or(base_exprs.len() - 1), - ) - })?; - - ProjectionExprs::new(base_exprs).try_merge(&rewritten_projection) -} - -/// Rewrite `input_file_name()` in a pushed projection to a per-file `Utf8` -/// literal holding `file_name`. -/// -/// If the projection contains no `input_file_name()` UDF it is returned -/// unchanged, without allocating the literal or rebuilding the projection tree -/// (the common case for queries that don't use the function). -pub fn rewrite_input_file_name_in_projection( - projection: ProjectionExprs, - file_name: &str, -) -> Result { - if !projection - .iter() - .any(|p| expr_references_scalar_udf::(&p.expr)) - { - return Ok(projection); - } - - let file_name_lit = - Arc::new(Literal::new(ScalarValue::Utf8(Some(file_name.to_string())))) - as Arc; - - projection.try_map_exprs(|expr| { - rewrite_scalar_udf::(expr, |_| { - Ok(Arc::clone(&file_name_lit)) - }) - }) -} - /// Trait for adapting [`PhysicalExpr`] expressions to match a target schema. /// /// This is used in file scans to rewrite expressions so that they can be @@ -770,8 +630,8 @@ mod tests { RecordBatchOptions, StringArray, StringViewArray, StructArray, }; use arrow::datatypes::{Field, Fields, Schema}; - use datafusion_common::{assert_contains, config::ConfigOptions, record_batch}; - use datafusion_expr::{Operator, ScalarUDF}; + use datafusion_common::{assert_contains, record_batch}; + use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{Column, Literal, col}; fn assert_cast_expr(expr: &Arc) -> &CastExpr { @@ -787,153 +647,6 @@ mod tests { assert_eq!(inner_col.index(), index); } - fn file_row_index_expr() -> Arc { - Arc::new(ScalarFunctionExpr::new( - "file_row_index", - Arc::new(ScalarUDF::from(FileRowIndexFunc::new())), - vec![], - Arc::new(Field::new("file_row_index", DataType::Int64, true)), - Arc::new(ConfigOptions::default()), - )) - } - - fn input_file_name_expr() -> Arc { - Arc::new(ScalarFunctionExpr::new( - "input_file_name", - Arc::new(ScalarUDF::from(InputFileNameFunc::new())), - vec![], - Arc::new(Field::new("input_file_name", DataType::Utf8, true)), - Arc::new(ConfigOptions::default()), - )) - } - - #[test] - fn test_rewrite_scalar_udf_replaces_nested_typed_udf() -> Result<()> { - let expr = Arc::new(expressions::BinaryExpr::new( - file_row_index_expr(), - Operator::Plus, - expressions::lit(ScalarValue::Int64(Some(1))), - )) as Arc; - - let rewritten = rewrite_scalar_udf::(expr, |_| { - Ok(expressions::lit(ScalarValue::Int64(Some(7)))) - })?; - - let binary = rewritten - .downcast_ref::() - .expect("rewritten expression should remain binary"); - assert_eq!(binary.op(), &Operator::Plus); - - let left = binary - .left() - .downcast_ref::() - .expect("left side should be rewritten to a literal"); - assert_eq!(left.value(), &ScalarValue::Int64(Some(7))); - - let right = binary - .right() - .downcast_ref::() - .expect("right side should remain the original literal"); - assert_eq!(right.value(), &ScalarValue::Int64(Some(1))); - Ok(()) - } - - #[test] - fn test_rewrite_input_file_name_in_projection() -> Result<()> { - let file_name = "part=west/data.parquet"; - let projection = ProjectionExprs::new([ - ProjectionExpr::new(input_file_name_expr(), "file_name"), - ProjectionExpr::new( - Arc::new(expressions::BinaryExpr::new( - input_file_name_expr(), - Operator::Eq, - expressions::lit(ScalarValue::Utf8(Some(file_name.to_string()))), - )), - "matches_file", - ), - ]); - - let rewritten = rewrite_input_file_name_in_projection(projection, file_name)?; - let rewritten = rewritten.as_ref(); - assert_eq!(rewritten[0].alias, "file_name"); - assert_eq!(rewritten[1].alias, "matches_file"); - - let file_name_lit = rewritten[0] - .expr - .downcast_ref::() - .expect("input_file_name should rewrite to a literal"); - assert_eq!( - file_name_lit.value(), - &ScalarValue::Utf8(Some(file_name.to_string())) - ); - - let binary = rewritten[1] - .expr - .downcast_ref::() - .expect("nested expression should remain binary"); - assert_eq!(binary.op(), &Operator::Eq); - - let left = binary - .left() - .downcast_ref::() - .expect("nested input_file_name should rewrite to a literal"); - assert_eq!( - left.value(), - &ScalarValue::Utf8(Some(file_name.to_string())) - ); - - let right = binary - .right() - .downcast_ref::() - .expect("comparison literal should remain unchanged"); - assert_eq!( - right.value(), - &ScalarValue::Utf8(Some(file_name.to_string())) - ); - Ok(()) - } - - #[test] - fn test_rewrite_file_row_index_expr_to_source_column() -> Result<()> { - let expr = rewrite_file_row_index_expr( - file_row_index_expr(), - "__datafusion_file_row_index", - 2, - )?; - - let cast_expr = expr - .downcast_ref::() - .expect("file row index expression should be a cast"); - assert_eq!(cast_expr.cast_type(), &DataType::Int64); - let target_field = cast_expr.target_field(); - assert_eq!(target_field.name(), "file_row_index"); - assert_eq!(target_field.data_type(), &DataType::Int64); - assert!(target_field.is_nullable()); - assert!(target_field.metadata().is_empty()); - - let source = cast_expr - .expr() - .downcast_ref::() - .expect("source column"); - assert_eq!(source.name(), "__datafusion_file_row_index"); - assert_eq!(source.index(), 2); - - let input_schema = Schema::new(vec![ - Field::new("value", DataType::Int64, true), - Field::new("__datafusion_file_row_index", DataType::Int64, false) - .with_metadata(HashMap::from([( - "source".to_string(), - "virtual".to_string(), - )])), - ]); - let return_field = expr.return_field(&input_schema)?; - assert_eq!(return_field.name(), "file_row_index"); - assert_eq!(return_field.data_type(), &DataType::Int64); - assert!(return_field.is_nullable()); - assert!(return_field.metadata().is_empty()); - Ok(()) - } - fn stale_index_cast_schemas() -> (SchemaRef, SchemaRef) { let physical_schema = Arc::new(Schema::new(vec![ Field::new("b", DataType::Binary, true), From e2eaab52a8658fff8395edec9cb1515b0beba691 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 23 Jun 2026 14:41:19 +0100 Subject: [PATCH 2/5] doc CR --- datafusion/datasource/src/rewrite.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource/src/rewrite.rs b/datafusion/datasource/src/rewrite.rs index 0d48b0bef5980..03f13802c1d83 100644 --- a/datafusion/datasource/src/rewrite.rs +++ b/datafusion/datasource/src/rewrite.rs @@ -15,13 +15,12 @@ // specific language governing permissions and limitations // under the License. -//! Rewrites that lower scan-metadata scalar UDFs into concrete physical -//! expressions for the specific file being scanned. +//! Rewrite expressions in preparation for files being scanned, such as scan-metadata scalar UDFs. //! -//! Functions like `file_row_index()` and `input_file_name()` are placeholders +//! Functions like [`file_row_index`] and [`input_file_name()`] are placeholders //! whose value is only known during a file scan. The helpers here replace those //! UDFs with ordinary physical expressions bound to the current file: a column -//! reference into a source-provided row-index column, or a per-file literal. +//! reference into a source-provided row-index column, or a per-file literal, etc. use std::sync::Arc; From a015ecdedced3799abeb94ff4fb29d9ac0640a94 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 23 Jun 2026 14:50:33 +0100 Subject: [PATCH 3/5] move to physical-expr-adapter --- datafusion/datasource-parquet/src/opener/mod.rs | 2 +- datafusion/datasource-parquet/src/source.rs | 8 ++++---- datafusion/datasource/Cargo.toml | 2 +- datafusion/datasource/src/mod.rs | 1 - datafusion/datasource/src/projection.rs | 2 +- datafusion/physical-expr-adapter/src/lib.rs | 1 + .../{datasource => physical-expr-adapter}/src/rewrite.rs | 5 ++++- 7 files changed, 12 insertions(+), 9 deletions(-) rename datafusion/{datasource => physical-expr-adapter}/src/rewrite.rs (97%) diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index 25bc41616068f..af50b8990130d 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -38,9 +38,9 @@ use crate::{ use arrow::array::RecordBatch; use arrow::datatypes::DataType; use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; -use datafusion_datasource::rewrite::rewrite_input_file_name_in_projection; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::replace_columns_with_literals; +use datafusion_physical_expr_adapter::rewrite::rewrite_input_file_name_in_projection; use std::collections::{HashMap, VecDeque}; use std::fmt; use std::future::Future; diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index b45470c29523e..3443b08475e0d 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -43,14 +43,14 @@ use datafusion_common::config::TableParquetOptions; use datafusion_datasource::TableSchema; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; -use datafusion_datasource::rewrite::{ - expr_references_scalar_udf, rewrite_file_row_index_projection, -}; use datafusion_functions::core::file_row_index::FileRowIndexFunc; use datafusion_physical_expr::expressions::{Column, DynamicFilterTracking}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::{EquivalenceProperties, conjunction}; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; +use datafusion_physical_expr_adapter::rewrite::{ + expr_references_scalar_udf, rewrite_file_row_index_projection, +}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_plan::DisplayFormatType; @@ -1826,10 +1826,10 @@ mod tests { use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_datasource::TableSchema; - use datafusion_datasource::rewrite::rewrite_file_row_index_expr; use datafusion_expr::{col, lit as logical_lit}; use datafusion_functions::core::expr_fn::file_row_index; use datafusion_physical_expr::planner::logical2physical; + use datafusion_physical_expr_adapter::rewrite::rewrite_file_row_index_expr; use datafusion_physical_plan::filter_pushdown::PushedDown; use parquet::arrow::RowNumber; diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 69daa79d3311b..2ac42ed900095 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -52,7 +52,6 @@ datafusion-common = { workspace = true, features = ["object_store"] } datafusion-common-runtime = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } -datafusion-functions = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } @@ -75,6 +74,7 @@ zstd = { workspace = true, optional = true } [dev-dependencies] criterion = { workspace = true } +datafusion-functions = { workspace = true } insta = { workspace = true } tempfile = { workspace = true } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 925965b104477..d4dfa1180ecf2 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -41,7 +41,6 @@ pub mod file_stream; pub mod memory; pub mod morsel; pub mod projection; -pub mod rewrite; pub mod schema_adapter; pub mod sink; pub mod source; diff --git a/datafusion/datasource/src/projection.rs b/datafusion/datasource/src/projection.rs index 5ed700dde30a6..16207c086f7bc 100644 --- a/datafusion/datasource/src/projection.rs +++ b/datafusion/datasource/src/projection.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use crate::rewrite::rewrite_input_file_name_in_projection; use arrow::datatypes::{Schema, SchemaRef}; use datafusion_common::{ Result, ScalarValue, @@ -27,6 +26,7 @@ use datafusion_physical_expr::{ expressions::{Column, Literal}, projection::{ProjectionExpr, ProjectionExprs}, }; +use datafusion_physical_expr_adapter::rewrite::rewrite_input_file_name_in_projection; use futures::{FutureExt, StreamExt}; use itertools::Itertools; diff --git a/datafusion/physical-expr-adapter/src/lib.rs b/datafusion/physical-expr-adapter/src/lib.rs index ea4db19ee110e..b224d8f4b8fe9 100644 --- a/datafusion/physical-expr-adapter/src/lib.rs +++ b/datafusion/physical-expr-adapter/src/lib.rs @@ -24,6 +24,7 @@ //! Physical expression schema adaptation utilities for DataFusion +pub mod rewrite; pub mod schema_rewriter; pub use schema_rewriter::{ diff --git a/datafusion/datasource/src/rewrite.rs b/datafusion/physical-expr-adapter/src/rewrite.rs similarity index 97% rename from datafusion/datasource/src/rewrite.rs rename to datafusion/physical-expr-adapter/src/rewrite.rs index 03f13802c1d83..d229ce641ff4c 100644 --- a/datafusion/datasource/src/rewrite.rs +++ b/datafusion/physical-expr-adapter/src/rewrite.rs @@ -17,10 +17,13 @@ //! Rewrite expressions in preparation for files being scanned, such as scan-metadata scalar UDFs. //! -//! Functions like [`file_row_index`] and [`input_file_name()`] are placeholders +//! Functions like [`file_row_index()`] and [`input_file_name()`] are placeholders //! whose value is only known during a file scan. The helpers here replace those //! UDFs with ordinary physical expressions bound to the current file: a column //! reference into a source-provided row-index column, or a per-file literal, etc. +//! +//! [`file_row_index()`]: datafusion_functions::core::file_row_index::FileRowIndexFunc; +//! [`input_file_name()`]: datafusion_functions::core::input_file_name::InputFileNameFunc; use std::sync::Arc; From d2078c1cefa69bc4b7886847ce45766e1c87890d Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 23 Jun 2026 15:07:37 +0100 Subject: [PATCH 4/5] fmt --- datafusion/physical-expr-adapter/src/rewrite.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr-adapter/src/rewrite.rs b/datafusion/physical-expr-adapter/src/rewrite.rs index d229ce641ff4c..d4eebd81a2e76 100644 --- a/datafusion/physical-expr-adapter/src/rewrite.rs +++ b/datafusion/physical-expr-adapter/src/rewrite.rs @@ -21,7 +21,7 @@ //! whose value is only known during a file scan. The helpers here replace those //! UDFs with ordinary physical expressions bound to the current file: a column //! reference into a source-provided row-index column, or a per-file literal, etc. -//! +//! //! [`file_row_index()`]: datafusion_functions::core::file_row_index::FileRowIndexFunc; //! [`input_file_name()`]: datafusion_functions::core::input_file_name::InputFileNameFunc; From a0eec5a48eb0c9ebb81bd64a679d6b81cc500f7d Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 23 Jun 2026 16:28:41 +0100 Subject: [PATCH 5/5] Update docs and links --- .../physical-expr-adapter/src/rewrite.rs | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-expr-adapter/src/rewrite.rs b/datafusion/physical-expr-adapter/src/rewrite.rs index d4eebd81a2e76..7345a587ee6a4 100644 --- a/datafusion/physical-expr-adapter/src/rewrite.rs +++ b/datafusion/physical-expr-adapter/src/rewrite.rs @@ -22,8 +22,8 @@ //! UDFs with ordinary physical expressions bound to the current file: a column //! reference into a source-provided row-index column, or a per-file literal, etc. //! -//! [`file_row_index()`]: datafusion_functions::core::file_row_index::FileRowIndexFunc; -//! [`input_file_name()`]: datafusion_functions::core::input_file_name::InputFileNameFunc; +//! [`file_row_index()`]: datafusion_functions::core::file_row_index::FileRowIndexFunc +//! [`input_file_name()`]: datafusion_functions::core::input_file_name::InputFileNameFunc use std::sync::Arc; @@ -40,7 +40,7 @@ use datafusion_physical_expr::expressions::{CastExpr, Column, Literal}; use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -/// Return true if `expr` references scalar UDF `T`. +/// Return true if a [`PhysicalExpr`] references scalar UDF `T`. /// /// This matches the concrete [`ScalarUDFImpl`] type rather than the function /// name, so unrelated UDFs with the same name are not treated as matches. @@ -61,7 +61,8 @@ pub fn expr_references_scalar_udf( found } -/// Rewrite occurrences of scalar UDF `T` in `expr` using `replacement`. +/// Rewrite occurrences of scalar UDF `T` in a [`PhysicalExpr`] using +/// `replacement`. /// /// The rewrite matches the concrete [`ScalarUDFImpl`] type rather than the /// function name. `replacement` is called with each matching @@ -85,8 +86,8 @@ where .map(|transformed| transformed.data) } -/// Rewrite `file_row_index()` in `expr` to read from a source-provided -/// row-index column. +/// Rewrite [`file_row_index()`][FileRowIndexFunc] in a [`PhysicalExpr`] to +/// read from a source-provided row-index column. /// /// `row_index_idx` is the index of `row_index_name` in the schema that the /// rewritten expression will be evaluated against. The rewrite uses ordinary @@ -109,12 +110,13 @@ pub fn rewrite_file_row_index_expr( }) } -/// Rewrite `file_row_index()` in a pushed projection to read from a -/// source-provided row-index column. +/// Rewrite [`file_row_index()`][FileRowIndexFunc] in pushed [`ProjectionExprs`] +/// to read from a source-provided row-index column. /// /// /// For example if `row_index_column` is `__datafusion_row_idx` this function rewrites all -/// instances of `file_row_index()` to `__datafusion_row_index` column references. +/// instances of [`file_row_index()`][FileRowIndexFunc] to +/// `__datafusion_row_index` [`Column`] references. /// /// `base_projection` is the current projection already pushed into a source. /// The row-index source column is appended to that base projection if it is not @@ -148,12 +150,12 @@ pub fn rewrite_file_row_index_projection( ProjectionExprs::new(base_exprs).try_merge(&rewritten_projection) } -/// Rewrite `input_file_name()` in a pushed projection to a per-file `Utf8` -/// literal holding `file_name`. +/// Rewrite [`input_file_name()`][InputFileNameFunc] in pushed +/// [`ProjectionExprs`] to a per-file [`Literal`] holding `file_name`. /// -/// If the projection contains no `input_file_name()` UDF it is returned -/// unchanged, without allocating the literal or rebuilding the projection tree -/// (the common case for queries that don't use the function). +/// If the projection contains no [`input_file_name()`][InputFileNameFunc] UDF it +/// is returned unchanged, without allocating the literal or rebuilding the +/// projection tree (the common case for queries that don't use the function). pub fn rewrite_input_file_name_in_projection( projection: ProjectionExprs, file_name: &str,