From 0ebd8f32c475e5b3cfac4a68de9647efb144fcab Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 16 Jun 2026 10:29:16 +0100 Subject: [PATCH 1/5] First step Signed-off-by: Adam Gutglick --- .../functions/src/core/input_file_name.rs | 95 +++++++++++++++++++ datafusion/functions/src/core/mod.rs | 10 +- .../source/user-guide/sql/scalar_functions.md | 21 ++++ 3 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 datafusion/functions/src/core/input_file_name.rs diff --git a/datafusion/functions/src/core/input_file_name.rs b/datafusion/functions/src/core/input_file_name.rs new file mode 100644 index 0000000000000..a47e9daaf8d3c --- /dev/null +++ b/datafusion/functions/src/core/input_file_name.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`InputFileNameFunc`]: Implementation of the `input_file_name` function. + +use arrow::datatypes::DataType; +use datafusion_common::{exec_err, utils::take_function_args}; +use datafusion_doc::Documentation; +use datafusion_expr::{ + ColumnarValue, ExpressionPlacement, ScalarFunctionArgs, ScalarUDFImpl, Signature, + Volatility, +}; +use datafusion_macros::user_doc; + +#[user_doc( + doc_section(label = "Other Functions"), + description = r#"Returns the path of the input file that produced the current row. + +Note: file paths/URIs may be sensitive metadata depending on your environment. + +This function is intended to be rewritten at file-scan time (when the file is +known). If the input file is not known (for example, if this function is +evaluated outside a file scan, or was not pushed down into one), direct evaluation returns an error. +"#, + syntax_example = "input_file_name()", + sql_example = r#"```sql +SELECT input_file_name() FROM t; +```"# +)] +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct InputFileNameFunc { + signature: Signature, +} + +impl Default for InputFileNameFunc { + fn default() -> Self { + Self::new() + } +} + +impl InputFileNameFunc { + pub fn new() -> Self { + Self { + signature: Signature::nullary(Volatility::Volatile), + } + } +} + +impl ScalarUDFImpl for InputFileNameFunc { + fn name(&self) -> &str { + "input_file_name" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { + let [] = take_function_args(self.name(), arg_types)?; + Ok(DataType::Utf8) + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + let [] = take_function_args(self.name(), args.args)?; + + exec_err!( + "input_file_name() is source dependent and cannot be evaluated directly" + ) + } + + fn placement(&self, _args: &[ExpressionPlacement]) -> ExpressionPlacement { + ExpressionPlacement::MoveTowardsLeafNodes + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} diff --git a/datafusion/functions/src/core/mod.rs b/datafusion/functions/src/core/mod.rs index 4665eca99ebef..3f7a562a27ec7 100644 --- a/datafusion/functions/src/core/mod.rs +++ b/datafusion/functions/src/core/mod.rs @@ -32,6 +32,7 @@ pub mod file_row_index; pub mod getfield; pub mod greatest; mod greatest_least_utils; +pub mod input_file_name; pub mod least; pub mod named_struct; pub mod nullif; @@ -69,6 +70,7 @@ make_udf_function!(arrow_metadata::ArrowMetadataFunc, arrow_metadata); make_udf_function!(with_metadata::WithMetadataFunc, with_metadata); make_udf_function!(arrow_field::ArrowFieldFunc, arrow_field); make_udf_function!(file_row_index::FileRowIndexFunc, file_row_index); +make_udf_function!(input_file_name::InputFileNameFunc, input_file_name); pub mod expr_fn { use datafusion_expr::{Expr, Literal}; @@ -117,7 +119,12 @@ pub mod expr_fn { arrow_metadata, "Returns the metadata of the input expression", args, - ),( + ), + ( + input_file_name, + "Returns the path of the input file that produced the current row", + ), + ( with_metadata, "Attaches Arrow field metadata (key/value pairs) to the input expression", args, @@ -200,6 +207,7 @@ pub fn functions() -> Vec> { union_extract(), union_tag(), version(), + input_file_name(), r#struct(), file_row_index(), ] diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 83df7b06fd224..34a5b46f93004 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -5704,6 +5704,7 @@ union_tag(union_expression) - [cast_to_type](#cast_to_type) - [file_row_index](#file_row_index) - [get_field](#get_field) +- [input_file_name](#input_file_name) - [try_cast_to_type](#try_cast_to_type) - [version](#version) - [with_metadata](#with_metadata) @@ -5959,6 +5960,26 @@ get_field(expression, field_name[, field_name2, ...]) +--------+ ``` +### `input_file_name` + +Returns the path of the input file that produced the current row. + +Note: file paths/URIs may be sensitive metadata depending on your environment. + +This function is intended to be rewritten at file-scan time (when the file is +known). If the input file is not known (for example, if this function is +evaluated outside a file scan, or was not pushed down into one), direct evaluation returns an error. + +```sql +input_file_name() +``` + +#### Example + +```sql +SELECT input_file_name() FROM t; +``` + ### `try_cast_to_type` Casts the first argument to the data type of the second argument, returning NULL if the cast fails. Only the type of the second argument is used; its value is ignored. From 458a1a7a0d624fa58b1ddda7788b5e10f4958bd5 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 16 Jun 2026 10:46:14 +0100 Subject: [PATCH 2/5] more work Signed-off-by: Adam Gutglick --- .../datasource-parquet/src/opener/mod.rs | 5 ++ datafusion/datasource/src/projection.rs | 5 ++ .../src/schema_rewriter.rs | 30 +++++++--- .../test_files/input_file_name.slt | 55 +++++++++++++++++++ 4 files changed, 87 insertions(+), 8 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/input_file_name.slt diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index 5b517663f9c03..78ffa532cb165 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -38,6 +38,7 @@ use arrow::datatypes::DataType; use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::replace_columns_with_literals; +use datafusion_physical_expr_adapter::schema_rewriter::rewrite_input_file_name_in_projection; use std::collections::{HashMap, VecDeque}; use std::fmt; use std::future::Future; @@ -770,6 +771,10 @@ impl ParquetMorselizer { .transpose()?; } + // Replace any `input_file_name()` UDFs in the projection with a literal for this file. + projection = + rewrite_input_file_name_in_projection(projection, file_name.clone())?; + let predicate_creation_errors = MetricBuilder::new(&self.metrics) .with_category(MetricCategory::Rows) .global_counter("num_predicate_creation_errors"); diff --git a/datafusion/datasource/src/projection.rs b/datafusion/datasource/src/projection.rs index ac33a96ca8321..3e44ca8e89c04 100644 --- a/datafusion/datasource/src/projection.rs +++ b/datafusion/datasource/src/projection.rs @@ -26,6 +26,7 @@ use datafusion_physical_expr::{ expressions::{Column, Literal}, projection::{ProjectionExpr, ProjectionExprs}, }; +use datafusion_physical_expr_adapter::schema_rewriter::rewrite_input_file_name_in_projection; use futures::{FutureExt, StreamExt}; use itertools::Itertools; @@ -69,6 +70,8 @@ impl ProjectionOpener { impl FileOpener for ProjectionOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { let partition_values = partitioned_file.partition_values.clone(); + let file_name = partitioned_file.object_meta.location.to_string(); + // Modify any references to partition columns in the projection expressions // and substitute them with literal values from PartitionedFile.partition_values let projection = if self.partition_columns.is_empty() { @@ -80,6 +83,8 @@ impl FileOpener for ProjectionOpener { partition_values, ) }; + // Replace `input_file_name()` with a per-file literal if present. + let projection = rewrite_input_file_name_in_projection(projection, file_name)?; let projector = projection.make_projector(&self.input_schema)?; let inner = self.inner.open(partitioned_file)?; diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index f287caf32ecda..e553a8b6cb527 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -33,10 +33,12 @@ use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion}, }; use datafusion_expr::ScalarUDFImpl; +use datafusion_functions::core::input_file_name::InputFileNameFunc; use datafusion_functions::core::{ file_row_index::FileRowIndexFunc, getfield::GetFieldFunc, }; use datafusion_physical_expr::PhysicalExprSimplifier; +use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs, Projector}; use datafusion_physical_expr::{ ScalarFunctionExpr, @@ -192,6 +194,20 @@ pub fn rewrite_file_row_index_projection( ProjectionExprs::new(base_exprs).try_merge(&rewritten_projection) } +pub fn rewrite_input_file_name_in_projection( + projection: ProjectionExprs, + file_name: String, +) -> Result { + let file_name_lit = Arc::new(Literal::new(ScalarValue::Utf8(Some(file_name)))) + as Arc; + + projection.try_map_exprs(|expr| { + rewrite_scalar_udf::(expr, |_e| { + Ok(Arc::clone(&file_name_lit)) + }) + }) +} + /// Trait for adapting [`PhysicalExpr`] expressions to match a target schema. /// /// This is used in file scans to rewrite expressions so that they can be @@ -422,7 +438,7 @@ impl DefaultPhysicalExprAdapterRewriter { None => return Ok(None), }; - let lit = match field_name_expr.downcast_ref::() { + let lit = match field_name_expr.downcast_ref::() { Some(lit) => lit, None => return Ok(None), }; @@ -475,7 +491,7 @@ impl DefaultPhysicalExprAdapterRewriter { }; let null_value = ScalarValue::Null.cast_to(logical_struct_field.data_type())?; - Ok(Some(Arc::new(expressions::Literal::new_with_metadata( + Ok(Some(Arc::new(Literal::new_with_metadata( null_value, Some(FieldMetadata::from(logical_struct_field.as_ref())), )))) @@ -522,12 +538,10 @@ impl DefaultPhysicalExprAdapterRewriter { // If the column is missing from the physical schema fill it in with nulls. // For a different behavior, provide a custom `PhysicalExprAdapter` implementation. let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?; - return Ok(Transformed::yes(Arc::new( - expressions::Literal::new_with_metadata( - null_value, - Some(FieldMetadata::from(logical_field)), - ), - ))); + return Ok(Transformed::yes(Arc::new(Literal::new_with_metadata( + null_value, + Some(FieldMetadata::from(logical_field)), + )))); }; let fields_match = logical_field == physical_field.as_ref(); diff --git a/datafusion/sqllogictest/test_files/input_file_name.slt b/datafusion/sqllogictest/test_files/input_file_name.slt new file mode 100644 index 0000000000000..e6dda981381e4 --- /dev/null +++ b/datafusion/sqllogictest/test_files/input_file_name.slt @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +## input_file_name() tests +########## + +statement ok +CREATE EXTERNAL TABLE t(c1 INT, c2 INT, c3 BOOLEAN) +STORED AS CSV +LOCATION '../core/tests/data/partitioned_csv' +OPTIONS ('format.has_header' 'false'); + +query IIIC +SELECT + CASE + WHEN input_file_name() LIKE '%partition-0.csv' THEN 0 + WHEN input_file_name() LIKE '%partition-1.csv' THEN 1 + WHEN input_file_name() LIKE '%partition-2.csv' THEN 2 + WHEN input_file_name() LIKE '%partition-3.csv' THEN 3 + ELSE -1 + END AS file_id, + c1, + c2 +FROM t +ORDER BY c2, c1 +LIMIT 8; +---- +0 0 0 +1 1 0 +2 2 0 +3 3 0 +0 0 1 +1 1 1 +2 2 1 +3 3 1 + +query T +SELECT input_file_name() FROM (VALUES (1)) v(x); +---- +NULL \ No newline at end of file From a672b3b252988a6414d56ec5b2710b17e8202f7b Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 16 Jun 2026 11:06:06 +0100 Subject: [PATCH 3/5] SLT test Signed-off-by: Adam Gutglick --- .../test_files/input_file_name.slt | 57 ++++++++++--------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/datafusion/sqllogictest/test_files/input_file_name.slt b/datafusion/sqllogictest/test_files/input_file_name.slt index e6dda981381e4..c87fd37fd018c 100644 --- a/datafusion/sqllogictest/test_files/input_file_name.slt +++ b/datafusion/sqllogictest/test_files/input_file_name.slt @@ -20,36 +20,37 @@ ########## statement ok -CREATE EXTERNAL TABLE t(c1 INT, c2 INT, c3 BOOLEAN) +COPY (VALUES (10), (20), (30)) +TO 'test_files/scratch/input_file_name/csv/first.csv' +STORED AS CSV; + +statement ok +COPY (VALUES (40), (50), (60)) +TO 'test_files/scratch/input_file_name/csv/second.csv' +STORED AS CSV; + +statement ok +CREATE EXTERNAL TABLE csv_table(column1 int) STORED AS CSV -LOCATION '../core/tests/data/partitioned_csv' -OPTIONS ('format.has_header' 'false'); +LOCATION 'test_files/scratch/input_file_name/csv/'; -query IIIC +query TI SELECT - CASE - WHEN input_file_name() LIKE '%partition-0.csv' THEN 0 - WHEN input_file_name() LIKE '%partition-1.csv' THEN 1 - WHEN input_file_name() LIKE '%partition-2.csv' THEN 2 - WHEN input_file_name() LIKE '%partition-3.csv' THEN 3 - ELSE -1 - END AS file_id, - c1, - c2 -FROM t -ORDER BY c2, c1 -LIMIT 8; + input_file_name(), + column1 +FROM csv_table +ORDER BY column1 ---- -0 0 0 -1 1 0 -2 2 0 -3 3 0 -0 0 1 -1 1 1 -2 2 1 -3 3 1 - -query T +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/input_file_name/csv/first.csv 10 +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/input_file_name/csv/first.csv 20 +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/input_file_name/csv/first.csv 30 +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/input_file_name/csv/second.csv 40 +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/input_file_name/csv/second.csv 50 +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/input_file_name/csv/second.csv 60 + + +query error Execution error: input_file_name\(\) is source dependent and cannot be evaluated directly SELECT input_file_name() FROM (VALUES (1)) v(x); ----- -NULL \ No newline at end of file + +statement ok +DROP TABLE csv_table; \ No newline at end of file From 22623f52bbbdafedfcddbb04c644a4d344fc4d47 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 16 Jun 2026 11:58:18 +0100 Subject: [PATCH 4/5] Testing Signed-off-by: Adam Gutglick --- Cargo.lock | 1 + .../datasource-parquet/src/opener/mod.rs | 57 ++++++++++- datafusion/datasource/Cargo.toml | 1 + datafusion/datasource/src/projection.rs | 94 +++++++++++++++++-- .../src/schema_rewriter.rs | 89 ++++++++++++++++-- .../test_files/input_file_name.slt | 25 +++++ .../test_files/parquet_metadata_functions.slt | 84 +++++++++++++++++ 7 files changed, 335 insertions(+), 16 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/parquet_metadata_functions.slt diff --git a/Cargo.lock b/Cargo.lock index 543750f8d8355..a7d5d550a6ef5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1913,6 +1913,7 @@ dependencies = [ "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", + "datafusion-functions", "datafusion-physical-expr", "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index 78ffa532cb165..006de31ddec0d 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -772,8 +772,7 @@ impl ParquetMorselizer { } // Replace any `input_file_name()` UDFs in the projection with a literal for this file. - projection = - rewrite_input_file_name_in_projection(projection, file_name.clone())?; + projection = rewrite_input_file_name_in_projection(projection, &file_name)?; let predicate_creation_errors = MetricBuilder::new(&self.metrics) .with_category(MetricCategory::Rows) @@ -2975,8 +2974,12 @@ mod test { /// (e.g. `row_number`) plumbed through `TableSchema`/`ParquetOpener`. mod virtual_columns { use super::*; - use arrow::array::{Array, Int64Array}; + use arrow::array::{Array, Int64Array, StringArray}; use arrow::datatypes::FieldRef; + use datafusion_common::config::ConfigOptions; + use datafusion_expr::ScalarUDF; + use datafusion_functions::core::input_file_name::InputFileNameFunc; + use datafusion_physical_expr::{ScalarFunctionExpr, projection::ProjectionExpr}; use parquet::arrow::RowNumber; /// Build a parquet `row_number` virtual column field. Spark's @@ -2990,6 +2993,16 @@ mod test { ) } + fn input_file_name_expr() -> Arc { + Arc::new(ScalarFunctionExpr::new( + "input_file_name", + Arc::new(ScalarUDF::from(InputFileNameFunc::new())), + vec![], + Arc::new(Field::new("input_file_name", DataType::Utf8, true)), + Arc::new(ConfigOptions::default()), + )) + } + /// Collect every `Int64` value from the given column in every batch /// of a stream. Used to verify the `row_number` column end to end. async fn collect_int64_values( @@ -3109,6 +3122,44 @@ mod test { assert_eq!(row_numbers, vec![0, 1, 2, 3]); } + #[tokio::test] + async fn test_input_file_name_projection() { + let store = Arc::new(InMemory::new()) as Arc; + let path = "dir/input_file_name.parquet"; + let (file_schema, data_size) = write_grouped_file(&store, path, 1, 3).await; + + let projection = ProjectionExprs::new([ + ProjectionExpr::new(Arc::new(Column::new("value", 0)), "value"), + ProjectionExpr::new(input_file_name_expr(), "file_name"), + ]); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(file_schema) + .with_projection(projection) + .build(); + + let file = + PartitionedFile::new(path.to_string(), u64::try_from(data_size).unwrap()); + let mut stream = open_file(&morselizer, file).await.unwrap(); + let batch = stream.next().await.unwrap().unwrap(); + assert!(stream.next().await.is_none()); + + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.schema().field(0).name(), "value"); + assert_eq!(batch.schema().field(1).name(), "file_name"); + + let file_names = batch + .column(1) + .as_any() + .downcast_ref::() + .expect("file_name column should be Utf8"); + assert_eq!(file_names.len(), 3); + for i in 0..file_names.len() { + assert_eq!(file_names.value(i), path); + } + } + #[tokio::test] async fn test_row_index_multi_row_group() { let store = Arc::new(InMemory::new()) as Arc; diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 40e2271f45205..2ac42ed900095 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -74,6 +74,7 @@ zstd = { workspace = true, optional = true } [dev-dependencies] criterion = { workspace = true } +datafusion-functions = { workspace = true } insta = { workspace = true } tempfile = { workspace = true } diff --git a/datafusion/datasource/src/projection.rs b/datafusion/datasource/src/projection.rs index 3e44ca8e89c04..f0a58771ed4ce 100644 --- a/datafusion/datasource/src/projection.rs +++ b/datafusion/datasource/src/projection.rs @@ -70,7 +70,6 @@ impl ProjectionOpener { impl FileOpener for ProjectionOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { let partition_values = partitioned_file.partition_values.clone(); - let file_name = partitioned_file.object_meta.location.to_string(); // Modify any references to partition columns in the projection expressions // and substitute them with literal values from PartitionedFile.partition_values @@ -84,7 +83,10 @@ impl FileOpener for ProjectionOpener { ) }; // Replace `input_file_name()` with a per-file literal if present. - let projection = rewrite_input_file_name_in_projection(projection, file_name)?; + let projection = rewrite_input_file_name_in_projection( + projection, + partitioned_file.object_meta.location.as_ref(), + )?; let projector = projection.make_projector(&self.input_schema)?; let inner = self.inner.open(partitioned_file)?; @@ -292,15 +294,31 @@ impl SplitProjection { mod test { use std::sync::Arc; - use arrow::array::AsArray; - use arrow::datatypes::{DataType, SchemaRef}; - use datafusion_common::{DFSchema, ScalarValue, record_batch}; - use datafusion_expr::{Expr, col, execution_props::ExecutionProps}; - use datafusion_physical_expr::{create_physical_exprs, projection::ProjectionExpr}; + use arrow::array::{AsArray, RecordBatch}; + use arrow::datatypes::{DataType, Field, SchemaRef}; + use datafusion_common::{DFSchema, ScalarValue, config::ConfigOptions, record_batch}; + use datafusion_expr::{Expr, ScalarUDF, col, execution_props::ExecutionProps}; + use datafusion_functions::core::input_file_name::InputFileNameFunc; + use datafusion_physical_expr::{ + ScalarFunctionExpr, create_physical_exprs, projection::ProjectionExpr, + }; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use futures::{FutureExt, StreamExt}; use itertools::Itertools; use super::*; + struct StaticBatchOpener { + batch: RecordBatch, + } + + impl FileOpener for StaticBatchOpener { + fn open(&self, _partitioned_file: PartitionedFile) -> Result { + let batch = self.batch.clone(); + Ok(async move { Ok(futures::stream::iter([Ok(batch)]).boxed()) }.boxed()) + } + } + fn create_projection_exprs<'a>( exprs: impl IntoIterator, schema: &SchemaRef, @@ -316,6 +334,68 @@ mod test { ProjectionExprs::from(projection_exprs) } + fn input_file_name_expr() -> Arc { + Arc::new(ScalarFunctionExpr::new( + "input_file_name", + Arc::new(ScalarUDF::from(InputFileNameFunc::new())), + vec![], + Arc::new(Field::new("input_file_name", DataType::Utf8, true)), + Arc::new(ConfigOptions::default()), + )) + } + + #[tokio::test] + async fn test_projection_opener_rewrites_input_file_name_with_partitions() { + let file_schema = Schema::new(vec![Field::new("value", DataType::Int32, false)]); + let projection = ProjectionExprs::new([ + ProjectionExpr::new(Arc::new(Column::new("value", 0)), "value"), + ProjectionExpr::new(Arc::new(Column::new("part", 1)), "part"), + ProjectionExpr::new(input_file_name_expr(), "file_name"), + ]); + let split = SplitProjection::new(&file_schema, &projection); + let input_batch = + record_batch!(("value", Int32, vec![10, 20])).expect("input batch"); + + let opener = ProjectionOpener::try_new( + split, + Arc::new(StaticBatchOpener { batch: input_batch }), + &file_schema, + ) + .expect("projection opener"); + + let mut file = PartitionedFile::new("part=west/data.csv", 100); + file.partition_values = vec![ScalarValue::from("west")]; + let mut stream = opener + .open(file) + .expect("open projection") + .await + .expect("inner stream"); + let batch = stream + .next() + .await + .expect("one projected batch") + .expect("projected batch"); + assert!(stream.next().await.is_none()); + + assert_eq!(batch.schema().field(0).name(), "value"); + assert_eq!(batch.schema().field(1).name(), "part"); + assert_eq!(batch.schema().field(2).name(), "file_name"); + + let values = batch + .column(0) + .as_primitive::(); + assert_eq!(values.value(0), 10); + assert_eq!(values.value(1), 20); + + let parts = batch.column(1).as_string::(); + assert_eq!(parts.value(0), "west"); + assert_eq!(parts.value(1), "west"); + + let file_names = batch.column(2).as_string::(); + assert_eq!(file_names.value(0), "part=west/data.csv"); + assert_eq!(file_names.value(1), "part=west/data.csv"); + } + #[test] fn test_split_projection_with_partition_columns() { use arrow::array::AsArray; diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index e553a8b6cb527..cfd333b4bc00e 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -194,17 +194,29 @@ pub fn rewrite_file_row_index_projection( ProjectionExprs::new(base_exprs).try_merge(&rewritten_projection) } +/// Rewrite `input_file_name()` in a pushed projection to a per-file `Utf8` +/// literal holding `file_name`. +/// +/// If the projection contains no `input_file_name()` UDF it is returned +/// unchanged, without allocating the literal or rebuilding the projection tree +/// (the common case for queries that don't use the function). pub fn rewrite_input_file_name_in_projection( projection: ProjectionExprs, - file_name: String, + file_name: &str, ) -> Result { - let file_name_lit = Arc::new(Literal::new(ScalarValue::Utf8(Some(file_name)))) - as Arc; + if !projection + .iter() + .any(|p| expr_references_scalar_udf::(&p.expr)) + { + return Ok(projection); + } + + let file_name_lit = + Arc::new(Literal::new(ScalarValue::Utf8(Some(file_name.to_string())))) + as Arc; projection.try_map_exprs(|expr| { - rewrite_scalar_udf::(expr, |_e| { - Ok(Arc::clone(&file_name_lit)) - }) + rewrite_scalar_udf::(expr, |_| Ok(Arc::clone(&file_name_lit))) }) } @@ -783,6 +795,16 @@ mod tests { )) } + fn input_file_name_expr() -> Arc { + Arc::new(ScalarFunctionExpr::new( + "input_file_name", + Arc::new(ScalarUDF::from(InputFileNameFunc::new())), + vec![], + Arc::new(Field::new("input_file_name", DataType::Utf8, true)), + Arc::new(ConfigOptions::default()), + )) + } + #[test] fn test_rewrite_scalar_udf_replaces_nested_typed_udf() -> Result<()> { let expr = Arc::new(expressions::BinaryExpr::new( @@ -814,6 +836,61 @@ mod tests { Ok(()) } + #[test] + fn test_rewrite_input_file_name_in_projection() -> Result<()> { + let file_name = "part=west/data.parquet"; + let projection = ProjectionExprs::new([ + ProjectionExpr::new(input_file_name_expr(), "file_name"), + ProjectionExpr::new( + Arc::new(expressions::BinaryExpr::new( + input_file_name_expr(), + Operator::Eq, + expressions::lit(ScalarValue::Utf8(Some(file_name.to_string()))), + )), + "matches_file", + ), + ]); + + let rewritten = rewrite_input_file_name_in_projection(projection, file_name)?; + let rewritten = rewritten.as_ref(); + assert_eq!(rewritten[0].alias, "file_name"); + assert_eq!(rewritten[1].alias, "matches_file"); + + let file_name_lit = rewritten[0] + .expr + .downcast_ref::() + .expect("input_file_name should rewrite to a literal"); + assert_eq!( + file_name_lit.value(), + &ScalarValue::Utf8(Some(file_name.to_string())) + ); + + let binary = rewritten[1] + .expr + .downcast_ref::() + .expect("nested expression should remain binary"); + assert_eq!(binary.op(), &Operator::Eq); + + let left = binary + .left() + .downcast_ref::() + .expect("nested input_file_name should rewrite to a literal"); + assert_eq!( + left.value(), + &ScalarValue::Utf8(Some(file_name.to_string())) + ); + + let right = binary + .right() + .downcast_ref::() + .expect("comparison literal should remain unchanged"); + assert_eq!( + right.value(), + &ScalarValue::Utf8(Some(file_name.to_string())) + ); + Ok(()) + } + #[test] fn test_rewrite_file_row_index_expr_to_source_column() -> Result<()> { let expr = rewrite_file_row_index_expr( diff --git a/datafusion/sqllogictest/test_files/input_file_name.slt b/datafusion/sqllogictest/test_files/input_file_name.slt index c87fd37fd018c..6198a02325bf9 100644 --- a/datafusion/sqllogictest/test_files/input_file_name.slt +++ b/datafusion/sqllogictest/test_files/input_file_name.slt @@ -48,6 +48,31 @@ WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/input_file_name/csv/se WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/input_file_name/csv/second.csv 50 WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/input_file_name/csv/second.csv 60 +query I +SELECT column1 +FROM csv_table +WHERE input_file_name() LIKE '%/first.csv' +ORDER BY column1; +---- +10 +20 +30 + +query TT +EXPLAIN SELECT column1 +FROM csv_table +WHERE input_file_name() LIKE '%/first.csv'; +---- +logical_plan +01)Projection: csv_table.column1 +02)--Filter: __datafusion_extracted_1 LIKE Utf8("%/first.csv") +03)----Projection: input_file_name() AS __datafusion_extracted_1, csv_table.column1 +04)------TableScan: csv_table projection=[column1] +physical_plan +01)FilterExec: __datafusion_extracted_1@0 LIKE %/first.csv, projection=[column1@1] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/input_file_name/csv/first.csv], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/input_file_name/csv/second.csv]]}, projection=[input_file_name() as __datafusion_extracted_1, column1], file_type=csv, has_header=true + query error Execution error: input_file_name\(\) is source dependent and cannot be evaluated directly SELECT input_file_name() FROM (VALUES (1)) v(x); diff --git a/datafusion/sqllogictest/test_files/parquet_metadata_functions.slt b/datafusion/sqllogictest/test_files/parquet_metadata_functions.slt new file mode 100644 index 0000000000000..c83cb84c34fe6 --- /dev/null +++ b/datafusion/sqllogictest/test_files/parquet_metadata_functions.slt @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test for Parquet scans with metadata function + +statement ok +COPY (VALUES (10), (20), (30)) +TO 'test_files/scratch/parquet_metadata_functions/first.parquet' +STORED AS PARQUET; + +statement ok +COPY (VALUES (40), (50), (60)) +TO 'test_files/scratch/parquet_metadata_functions/second.parquet' +STORED AS PARQUET; + +statement ok +CREATE EXTERNAL TABLE test_table(column1 int) +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_metadata_functions/'; + +query TII rowsort +SELECT input_file_name(), file_row_index(), column1 +FROM test_table +---- +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_metadata_functions/first.parquet 0 10 +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_metadata_functions/first.parquet 1 20 +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_metadata_functions/first.parquet 2 30 +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_metadata_functions/second.parquet 0 40 +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_metadata_functions/second.parquet 1 50 +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_metadata_functions/second.parquet 2 60 + +query TT +EXPLAIN SELECT input_file_name(), file_row_index(), column1 +FROM test_table +---- +logical_plan +01)Projection: input_file_name(), file_row_index(), test_table.column1 +02)--TableScan: test_table projection=[column1] +physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_metadata_functions/first.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_metadata_functions/second.parquet]]}, projection=[input_file_name() as input_file_name(), CAST(__datafusion_file_row_index@1 AS Int64) as file_row_index(), column1], file_type=parquet + +# input_file_name() in a WHERE predicate: only rows from the matching file are returned +query I +SELECT column1 FROM test_table +WHERE input_file_name() LIKE '%first.parquet' +ORDER BY column1 +---- +10 +20 +30 + +# input_file_name() as a GROUP BY key: per-file aggregation +query TII +SELECT input_file_name(), count(*), sum(column1) +FROM test_table +GROUP BY input_file_name() +ORDER BY input_file_name() +---- +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_metadata_functions/first.parquet 3 60 +WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_metadata_functions/second.parquet 3 150 + +# input_file_name() inside a projection expression +query B rowsort +SELECT DISTINCT input_file_name() LIKE '%second.parquet' +FROM test_table +---- +false +true + +statement ok +DROP TABLE test_table; From 4277096cab6469adcb3d33a5c71c57c0ddd00cf1 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 16 Jun 2026 12:27:00 +0100 Subject: [PATCH 5/5] fmt Signed-off-by: Adam Gutglick --- datafusion/physical-expr-adapter/src/schema_rewriter.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index cfd333b4bc00e..36cf1e2a67157 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -216,7 +216,9 @@ pub fn rewrite_input_file_name_in_projection( as Arc; projection.try_map_exprs(|expr| { - rewrite_scalar_udf::(expr, |_| Ok(Arc::clone(&file_name_lit))) + rewrite_scalar_udf::(expr, |_| { + Ok(Arc::clone(&file_name_lit)) + }) }) }