From 50f9345206c6d63c716e3f66d19d0e583ea9d3b3 Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Wed, 26 Nov 2025 16:37:25 +0300 Subject: [PATCH 01/11] Run the examples in the new format --- ci/scripts/rust_example.sh | 30 ++++++-- .../examples/builtin_functions/main.rs | 41 ++++++++-- .../examples/custom_data_source/main.rs | 66 ++++++++++++---- datafusion-examples/examples/data_io/main.rs | 75 ++++++++++++++----- .../examples/dataframe/main.rs | 36 +++++++-- .../examples/execution_monitoring/main.rs | 46 +++++++++--- .../examples/external_dependency/main.rs | 33 ++++++-- datafusion-examples/examples/flight/main.rs | 43 +++++++++-- datafusion-examples/examples/proto/main.rs | 33 ++++++-- .../examples/query_planning/main.rs | 55 +++++++++++--- datafusion-examples/examples/sql_ops/main.rs | 44 +++++++++-- datafusion-examples/examples/udf/main.rs | 55 +++++++++++--- 12 files changed, 450 insertions(+), 107 deletions(-) diff --git a/ci/scripts/rust_example.sh b/ci/scripts/rust_example.sh index c3efcf2cf2e92..23f8df0195807 100755 --- a/ci/scripts/rust_example.sh +++ b/ci/scripts/rust_example.sh @@ -25,12 +25,26 @@ export CARGO_PROFILE_CI_STRIP=true cd datafusion-examples/examples/ cargo build --profile ci --examples -files=$(ls .) -for filename in $files -do - example_name=`basename $filename ".rs"` - # Skip tests that rely on external storage and flight - if [ ! -d $filename ]; then - cargo run --profile ci --example $example_name - fi +SKIP_LIST=("external_dependency" "flight" "ffi") + +skip_example() { + local name="$1" + for skip in "${SKIP_LIST[@]}"; do + if [ "$name" = "$skip" ]; then + return 0 + fi + done + return 1 +} + +for dir in */; do + example_name=$(basename "$dir") + + if skip_example "$example_name"; then + echo "Skipping $example_name" + continue + fi + + echo "Running example group: $example_name" + cargo run --profile ci --example "$example_name" all done diff --git a/datafusion-examples/examples/builtin_functions/main.rs b/datafusion-examples/examples/builtin_functions/main.rs index c307bc9532bff..c3780f62a5d14 100644 --- a/datafusion-examples/examples/builtin_functions/main.rs +++ b/datafusion-examples/examples/builtin_functions/main.rs @@ -21,10 +21,11 @@ //! //! ## Usage //! ```bash -//! cargo run --example builtin_functions -- [date_time|function_factory|regexp] +//! cargo run --example builtin_functions -- [all|date_time|function_factory|regexp] //! ``` //! //! Each subcommand runs a corresponding example: +//! - `all` — run all examples included in this module //! - `date_time` — examples of date-time related functions and queries //! - `function_factory` — register `CREATE FUNCTION` handler to implement SQL macros //! - `regexp` — examples of using regular expression functions @@ -38,6 +39,7 @@ use std::str::FromStr; use datafusion::error::{DataFusionError, Result}; enum ExampleKind { + All, DateTime, FunctionFactory, Regexp, @@ -46,6 +48,7 @@ enum ExampleKind { impl AsRef for ExampleKind { fn as_ref(&self) -> &str { match self { + Self::All => "all", Self::DateTime => "date_time", Self::FunctionFactory => "function_factory", Self::Regexp => "regexp", @@ -58,6 +61,7 @@ impl FromStr for ExampleKind { fn from_str(s: &str) -> Result { match s { + "all" => Ok(Self::All), "date_time" => Ok(Self::DateTime), "function_factory" => Ok(Self::FunctionFactory), "regexp" => Ok(Self::Regexp), @@ -67,12 +71,33 @@ impl FromStr for ExampleKind { } impl ExampleKind { - const ALL: [Self; 3] = [Self::DateTime, Self::FunctionFactory, Self::Regexp]; + const ALL_VARIANTS: [Self; 4] = [ + Self::All, + Self::DateTime, + Self::FunctionFactory, + Self::Regexp, + ]; + + const RUNNABLE_VARIANTS: [Self; 3] = + [Self::DateTime, Self::FunctionFactory, Self::Regexp]; const EXAMPLE_NAME: &str = "builtin_functions"; fn variants() -> Vec<&'static str> { - Self::ALL.iter().map(|x| x.as_ref()).collect() + Self::ALL_VARIANTS + .iter() + .map(|example| example.as_ref()) + .collect() + } + + async fn run(&self) -> Result<()> { + match self { + ExampleKind::DateTime => date_time::date_time().await?, + ExampleKind::FunctionFactory => function_factory::function_factory().await?, + ExampleKind::Regexp => regexp::regexp().await?, + ExampleKind::All => unreachable!("`All` should be handled in main"), + } + Ok(()) } } @@ -90,9 +115,13 @@ async fn main() -> Result<()> { })?; match arg.parse::()? { - ExampleKind::DateTime => date_time::date_time().await?, - ExampleKind::FunctionFactory => function_factory::function_factory().await?, - ExampleKind::Regexp => regexp::regexp().await?, + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + example.run().await?; + } + } + example => example.run().await?, } Ok(()) diff --git a/datafusion-examples/examples/custom_data_source/main.rs b/datafusion-examples/examples/custom_data_source/main.rs index 66d8f083422e2..b23e3658b485d 100644 --- a/datafusion-examples/examples/custom_data_source/main.rs +++ b/datafusion-examples/examples/custom_data_source/main.rs @@ -21,10 +21,11 @@ //! //! ## Usage //! ```bash -//! cargo run --example custom_data_source -- [csv_json_opener|csv_sql_streaming|custom_datasource|custom_file_casts|custom_file_format|default_column_values|file_stream_provider] +//! cargo run --example custom_data_source -- [all|csv_json_opener|csv_sql_streaming|custom_datasource|custom_file_casts|custom_file_format|default_column_values|file_stream_provider] //! ``` //! //! Each subcommand runs a corresponding example: +//! - `all` — run all examples included in this module //! - `csv_json_opener` — use low level FileOpener APIs to read CSV/JSON into Arrow RecordBatches //! - `csv_sql_streaming` — build and run a streaming query plan from a SQL statement against a local CSV file //! - `custom_datasource` — run queries against a custom datasource (TableProvider) @@ -46,6 +47,7 @@ use std::str::FromStr; use datafusion::error::{DataFusionError, Result}; enum ExampleKind { + All, CsvJsonOpener, CsvSqlStreaming, CustomDatasource, @@ -58,6 +60,7 @@ enum ExampleKind { impl AsRef for ExampleKind { fn as_ref(&self) -> &str { match self { + Self::All => "all", Self::CsvJsonOpener => "csv_json_opener", Self::CsvSqlStreaming => "csv_sql_streaming", Self::CustomDatasource => "custom_datasource", @@ -74,6 +77,7 @@ impl FromStr for ExampleKind { fn from_str(s: &str) -> Result { match s { + "all" => Ok(Self::All), "csv_json_opener" => Ok(Self::CsvJsonOpener), "csv_sql_streaming" => Ok(Self::CsvSqlStreaming), "custom_datasource" => Ok(Self::CustomDatasource), @@ -87,7 +91,18 @@ impl FromStr for ExampleKind { } impl ExampleKind { - const ALL: [Self; 7] = [ + const ALL_VARIANTS: [Self; 8] = [ + Self::All, + Self::CsvJsonOpener, + Self::CsvSqlStreaming, + Self::CustomDatasource, + Self::CustomFileCasts, + Self::CustomFileFormat, + Self::DefaultColumnValues, + Self::FileFtreamProvider, + ]; + + const RUNNABLE_VARIANTS: [Self; 7] = [ Self::CsvJsonOpener, Self::CsvSqlStreaming, Self::CustomDatasource, @@ -100,7 +115,36 @@ impl ExampleKind { const EXAMPLE_NAME: &str = "custom_data_source"; fn variants() -> Vec<&'static str> { - Self::ALL.iter().map(|x| x.as_ref()).collect() + Self::ALL_VARIANTS + .iter() + .map(|example| example.as_ref()) + .collect() + } + + async fn run(&self) -> Result<()> { + match self { + ExampleKind::CsvJsonOpener => csv_json_opener::csv_json_opener().await?, + ExampleKind::CsvSqlStreaming => { + csv_sql_streaming::csv_sql_streaming().await? + } + ExampleKind::CustomDatasource => { + custom_datasource::custom_datasource().await? + } + ExampleKind::CustomFileCasts => { + custom_file_casts::custom_file_casts().await? + } + ExampleKind::CustomFileFormat => { + custom_file_format::custom_file_format().await? + } + ExampleKind::DefaultColumnValues => { + default_column_values::default_column_values().await? + } + ExampleKind::FileFtreamProvider => { + file_stream_provider::file_stream_provider().await? + } + ExampleKind::All => unreachable!("`All` should be handled in main"), + } + Ok(()) } } @@ -118,17 +162,13 @@ async fn main() -> Result<()> { })?; match arg.parse::()? { - ExampleKind::CsvJsonOpener => csv_json_opener::csv_json_opener().await?, - ExampleKind::CsvSqlStreaming => csv_sql_streaming::csv_sql_streaming().await?, - ExampleKind::CustomDatasource => custom_datasource::custom_datasource().await?, - ExampleKind::CustomFileCasts => custom_file_casts::custom_file_casts().await?, - ExampleKind::CustomFileFormat => custom_file_format::custom_file_format().await?, - ExampleKind::DefaultColumnValues => { - default_column_values::default_column_values().await? - } - ExampleKind::FileFtreamProvider => { - file_stream_provider::file_stream_provider().await? + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + example.run().await?; + } } + example => example.run().await?, } Ok(()) diff --git a/datafusion-examples/examples/data_io/main.rs b/datafusion-examples/examples/data_io/main.rs index c9ad793410b8e..670e3055b4341 100644 --- a/datafusion-examples/examples/data_io/main.rs +++ b/datafusion-examples/examples/data_io/main.rs @@ -21,10 +21,11 @@ //! //! ## Usage //! ```bash -//! cargo run --example data_io -- [catalog|json_shredding|parquet_adv_idx|parquet_emb_idx|parquet_enc_with_kms|parquet_enc|parquet_exec_visitor|parquet_idx|query_http_csv|remote_catalog] +//! cargo run --example data_io -- [all|catalog|json_shredding|parquet_adv_idx|parquet_emb_idx|parquet_enc_with_kms|parquet_enc|parquet_exec_visitor|parquet_idx|query_http_csv|remote_catalog] //! ``` //! //! Each subcommand runs a corresponding example: +//! - `all` — run all examples included in this module //! - `catalog` — register the table into a custom catalog //! - `json_shredding` — shows how to implement custom filter rewriting for JSON shredding //! - `parquet_adv_idx` — create a detailed secondary index that covers the contents of several parquet files @@ -52,6 +53,7 @@ use std::str::FromStr; use datafusion::error::{DataFusionError, Result}; enum ExampleKind { + All, Catalog, JsonShredding, ParquetAdvIdx, @@ -67,6 +69,7 @@ enum ExampleKind { impl AsRef for ExampleKind { fn as_ref(&self) -> &str { match self { + Self::All => "all", Self::Catalog => "catalog", Self::JsonShredding => "json_shredding", Self::ParquetAdvIdx => "parquet_adv_idx", @@ -86,6 +89,7 @@ impl FromStr for ExampleKind { fn from_str(s: &str) -> Result { match s { + "all" => Ok(Self::All), "catalog" => Ok(Self::Catalog), "json_shredding" => Ok(Self::JsonShredding), "parquet_adv_idx" => Ok(Self::ParquetAdvIdx), @@ -102,7 +106,21 @@ impl FromStr for ExampleKind { } impl ExampleKind { - const ALL: [Self; 10] = [ + const ALL_VARIANTS: [Self; 11] = [ + Self::All, + Self::Catalog, + Self::JsonShredding, + Self::ParquetAdvIdx, + Self::ParquetEmbIdx, + Self::ParquetEnc, + Self::ParquetEncWithKms, + Self::ParquetExecVisitor, + Self::ParquetIdx, + Self::QueryHttpCsv, + Self::RemoteCatalog, + ]; + + const RUNNABLE_VARIANTS: [Self; 10] = [ Self::Catalog, Self::JsonShredding, Self::ParquetAdvIdx, @@ -118,7 +136,35 @@ impl ExampleKind { const EXAMPLE_NAME: &str = "data_io"; fn variants() -> Vec<&'static str> { - Self::ALL.iter().map(|x| x.as_ref()).collect() + Self::ALL_VARIANTS + .iter() + .map(|example| example.as_ref()) + .collect() + } + + async fn run(&self) -> Result<()> { + match self { + ExampleKind::Catalog => catalog::catalog().await?, + ExampleKind::JsonShredding => json_shredding::json_shredding().await?, + ExampleKind::ParquetAdvIdx => { + parquet_advanced_index::parquet_advanced_index().await? + } + ExampleKind::ParquetEmbIdx => { + parquet_embedded_index::parquet_embedded_index().await? + } + ExampleKind::ParquetEncWithKms => { + parquet_encrypted_with_kms::parquet_encrypted_with_kms().await? + } + ExampleKind::ParquetEnc => parquet_encrypted::parquet_encrypted().await?, + ExampleKind::ParquetExecVisitor => { + parquet_exec_visitor::parquet_exec_visitor().await? + } + ExampleKind::ParquetIdx => parquet_index::parquet_index().await?, + ExampleKind::QueryHttpCsv => query_http_csv::query_http_csv().await?, + ExampleKind::RemoteCatalog => remote_catalog::remote_catalog().await?, + ExampleKind::All => unreachable!("`All` should be handled in main"), + } + Ok(()) } } @@ -136,24 +182,13 @@ async fn main() -> Result<()> { })?; match arg.parse::()? { - ExampleKind::Catalog => catalog::catalog().await?, - ExampleKind::JsonShredding => json_shredding::json_shredding().await?, - ExampleKind::ParquetAdvIdx => { - parquet_advanced_index::parquet_advanced_index().await? - } - ExampleKind::ParquetEmbIdx => { - parquet_embedded_index::parquet_embedded_index().await? - } - ExampleKind::ParquetEncWithKms => { - parquet_encrypted_with_kms::parquet_encrypted_with_kms().await? - } - ExampleKind::ParquetEnc => parquet_encrypted::parquet_encrypted().await?, - ExampleKind::ParquetExecVisitor => { - parquet_exec_visitor::parquet_exec_visitor().await? + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + example.run().await?; + } } - ExampleKind::ParquetIdx => parquet_index::parquet_index().await?, - ExampleKind::QueryHttpCsv => query_http_csv::query_http_csv().await?, - ExampleKind::RemoteCatalog => remote_catalog::remote_catalog().await?, + example => example.run().await?, } Ok(()) diff --git a/datafusion-examples/examples/dataframe/main.rs b/datafusion-examples/examples/dataframe/main.rs index 0846bce2e811d..b8288cf4d709a 100644 --- a/datafusion-examples/examples/dataframe/main.rs +++ b/datafusion-examples/examples/dataframe/main.rs @@ -21,10 +21,11 @@ //! //! ## Usage //! ```bash -//! cargo run --example dataframe -- [dataframe|deserialize_to_struct] +//! cargo run --example dataframe -- [all|dataframe|deserialize_to_struct] //! ``` //! //! Each subcommand runs a corresponding example: +//! - `all` — run all examples included in this module //! - `dataframe` — run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries //! - `deserialize_to_struct` — convert query results (Arrow ArrayRefs) into Rust structs @@ -36,6 +37,7 @@ use std::str::FromStr; use datafusion::error::{DataFusionError, Result}; enum ExampleKind { + All, Dataframe, DeserializeToStruct, } @@ -43,6 +45,7 @@ enum ExampleKind { impl AsRef for ExampleKind { fn as_ref(&self) -> &str { match self { + Self::All => "all", Self::Dataframe => "dataframe", Self::DeserializeToStruct => "deserialize_to_struct", } @@ -54,6 +57,7 @@ impl FromStr for ExampleKind { fn from_str(s: &str) -> Result { match s { + "all" => Ok(Self::All), "dataframe" => Ok(Self::Dataframe), "deserialize_to_struct" => Ok(Self::DeserializeToStruct), _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), @@ -62,12 +66,29 @@ impl FromStr for ExampleKind { } impl ExampleKind { - const ALL: [Self; 2] = [Self::Dataframe, Self::DeserializeToStruct]; + const ALL_VARIANTS: [Self; 3] = + [Self::All, Self::Dataframe, Self::DeserializeToStruct]; + + const RUNNABLE_VARIANTS: [Self; 2] = [Self::Dataframe, Self::DeserializeToStruct]; const EXAMPLE_NAME: &str = "dataframe"; fn variants() -> Vec<&'static str> { - Self::ALL.iter().map(|x| x.as_ref()).collect() + Self::ALL_VARIANTS + .iter() + .map(|example| example.as_ref()) + .collect() + } + + async fn run(&self) -> Result<()> { + match self { + ExampleKind::Dataframe => dataframe::dataframe_example().await?, + ExampleKind::DeserializeToStruct => { + deserialize_to_struct::deserialize_to_struct().await? + } + ExampleKind::All => unreachable!("`All` should be handled in main"), + } + Ok(()) } } @@ -85,10 +106,13 @@ async fn main() -> Result<()> { })?; match arg.parse::()? { - ExampleKind::Dataframe => dataframe::dataframe_example().await?, - ExampleKind::DeserializeToStruct => { - deserialize_to_struct::deserialize_to_struct().await? + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + example.run().await?; + } } + example => example.run().await?, } Ok(()) diff --git a/datafusion-examples/examples/execution_monitoring/main.rs b/datafusion-examples/examples/execution_monitoring/main.rs index fd834cf7b72bf..5201522d813f5 100644 --- a/datafusion-examples/examples/execution_monitoring/main.rs +++ b/datafusion-examples/examples/execution_monitoring/main.rs @@ -21,10 +21,11 @@ //! //! ## Usage //! ```bash -//! cargo run --example execution_monitoring -- [mem_pool_exec_plan|mem_pool_tracking|tracing] +//! cargo run --example execution_monitoring -- [all|mem_pool_exec_plan|mem_pool_tracking|tracing] //! ``` //! //! Each subcommand runs a corresponding example: +//! - `all` — run all examples included in this module //! - `mem_pool_exec_plan` — shows how to implement memory-aware ExecutionPlan with memory reservation and spilling //! - `mem_pool_tracking` — demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages //! - `tracing` — demonstrates the tracing injection feature for the DataFusion runtime @@ -38,6 +39,7 @@ use std::str::FromStr; use datafusion::error::{DataFusionError, Result}; enum ExampleKind { + All, MemoryPoolExecutionPlan, MemoryPoolTracking, Tracing, @@ -46,6 +48,7 @@ enum ExampleKind { impl AsRef for ExampleKind { fn as_ref(&self) -> &str { match self { + Self::All => "all", Self::MemoryPoolExecutionPlan => "mem_pool_exec_plan", Self::MemoryPoolTracking => "mem_pool_tracking", Self::Tracing => "tracing", @@ -58,6 +61,7 @@ impl FromStr for ExampleKind { fn from_str(s: &str) -> Result { match s { + "all" => Ok(Self::All), "mem_pool_exec_plan" => Ok(Self::MemoryPoolExecutionPlan), "mem_pool_tracking" => Ok(Self::MemoryPoolTracking), "tracing" => Ok(Self::Tracing), @@ -67,7 +71,14 @@ impl FromStr for ExampleKind { } impl ExampleKind { - const ALL: [Self; 3] = [ + const ALL_VARIANTS: [Self; 4] = [ + Self::All, + Self::MemoryPoolExecutionPlan, + Self::MemoryPoolTracking, + Self::Tracing, + ]; + + const RUNNABLE_VARIANTS: [Self; 3] = [ Self::MemoryPoolExecutionPlan, Self::MemoryPoolTracking, Self::Tracing, @@ -76,7 +87,24 @@ impl ExampleKind { const EXAMPLE_NAME: &str = "execution_monitoring"; fn variants() -> Vec<&'static str> { - Self::ALL.iter().map(|x| x.as_ref()).collect() + Self::ALL_VARIANTS + .iter() + .map(|example| example.as_ref()) + .collect() + } + + async fn run(&self) -> Result<()> { + match self { + ExampleKind::MemoryPoolExecutionPlan => { + memory_pool_execution_plan::memory_pool_execution_plan().await? + } + ExampleKind::MemoryPoolTracking => { + memory_pool_tracking::mem_pool_tracking().await? + } + ExampleKind::Tracing => tracing::tracing().await?, + ExampleKind::All => unreachable!("`All` should be handled in main"), + } + Ok(()) } } @@ -94,13 +122,13 @@ async fn main() -> Result<()> { })?; match arg.parse::()? { - ExampleKind::MemoryPoolExecutionPlan => { - memory_pool_execution_plan::memory_pool_execution_plan().await? - } - ExampleKind::MemoryPoolTracking => { - memory_pool_tracking::mem_pool_tracking().await? + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + example.run().await?; + } } - ExampleKind::Tracing => tracing::tracing().await?, + example => example.run().await?, } Ok(()) diff --git a/datafusion-examples/examples/external_dependency/main.rs b/datafusion-examples/examples/external_dependency/main.rs index f6fc1fb0ef14e..7259a2acf1044 100644 --- a/datafusion-examples/examples/external_dependency/main.rs +++ b/datafusion-examples/examples/external_dependency/main.rs @@ -21,10 +21,11 @@ //! //! ## Usage //! ```bash -//! cargo run --example external_dependency -- [dataframe_to_s3|query_aws_s3] +//! cargo run --example external_dependency -- [all|dataframe_to_s3|query_aws_s3] //! ``` //! //! Each subcommand runs a corresponding example: +//! - `all` — run all examples included in this module //! - `dataframe_to_s3` — run a query using a DataFrame against a parquet file from AWS S3 and writing back to AWS S3 //! - `query_aws_s3` — configure `object_store` and run a query against files stored in AWS S3 @@ -36,6 +37,7 @@ use std::str::FromStr; use datafusion::error::{DataFusionError, Result}; enum ExampleKind { + All, DataframeToS3, QueryAwsS3, } @@ -43,6 +45,7 @@ enum ExampleKind { impl AsRef for ExampleKind { fn as_ref(&self) -> &str { match self { + Self::All => "all", Self::DataframeToS3 => "dataframe_to_s3", Self::QueryAwsS3 => "query_aws_s3", } @@ -54,6 +57,7 @@ impl FromStr for ExampleKind { fn from_str(s: &str) -> Result { match s { + "all" => Ok(Self::All), "dataframe_to_s3" => Ok(Self::DataframeToS3), "query_aws_s3" => Ok(Self::QueryAwsS3), _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), @@ -62,12 +66,26 @@ impl FromStr for ExampleKind { } impl ExampleKind { - const ALL: [Self; 2] = [Self::DataframeToS3, Self::QueryAwsS3]; + const ALL_VARIANTS: [Self; 3] = [Self::All, Self::DataframeToS3, Self::QueryAwsS3]; + + const RUNNABLE_VARIANTS: [Self; 2] = [Self::DataframeToS3, Self::QueryAwsS3]; const EXAMPLE_NAME: &str = "external_dependency"; fn variants() -> Vec<&'static str> { - Self::ALL.iter().map(|x| x.as_ref()).collect() + Self::ALL_VARIANTS + .iter() + .map(|example| example.as_ref()) + .collect() + } + + async fn run(&self) -> Result<()> { + match self { + ExampleKind::DataframeToS3 => dataframe_to_s3::dataframe_to_s3().await?, + ExampleKind::QueryAwsS3 => query_aws_s3::query_aws_s3().await?, + ExampleKind::All => unreachable!("`All` should be handled in main"), + } + Ok(()) } } @@ -85,8 +103,13 @@ async fn main() -> Result<()> { })?; match arg.parse::()? { - ExampleKind::DataframeToS3 => dataframe_to_s3::dataframe_to_s3().await?, - ExampleKind::QueryAwsS3 => query_aws_s3::query_aws_s3().await?, + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + example.run().await?; + } + } + example => example.run().await?, } Ok(()) diff --git a/datafusion-examples/examples/flight/main.rs b/datafusion-examples/examples/flight/main.rs index a83b19bac42eb..f3c36d6a32c68 100644 --- a/datafusion-examples/examples/flight/main.rs +++ b/datafusion-examples/examples/flight/main.rs @@ -21,10 +21,14 @@ //! //! ## Usage //! ```bash -//! cargo run --example flight -- [client|server|sql_server] +//! cargo run --example flight -- [all|client|server|sql_server] //! ``` //! //! Each subcommand runs a corresponding example: +//! - `all` — run all examples included in this module +//! Note: The Flight server must be started in a separate process +//! before running the `client` example. Therefore, running `all` will +//! not produce a full server+client workflow automatically. //! - `client` — run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol //! - `server` — run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol //! - `sql_server` — run DataFusion as a standalone process and execute SQL queries from JDBC clients @@ -37,7 +41,12 @@ use std::str::FromStr; use datafusion::error::{DataFusionError, Result}; +/// The `all` option cannot run all examples end-to-end because the +/// `server` example must run in a separate process before the `client` +/// example can connect. +/// Therefore, `all` only iterates over individually runnable examples. enum ExampleKind { + All, Client, Server, SqlServer, @@ -46,6 +55,7 @@ enum ExampleKind { impl AsRef for ExampleKind { fn as_ref(&self) -> &str { match self { + Self::All => "all", Self::Client => "client", Self::Server => "server", Self::SqlServer => "sql_server", @@ -58,6 +68,7 @@ impl FromStr for ExampleKind { fn from_str(s: &str) -> Result { match s { + "all" => Ok(Self::All), "client" => Ok(Self::Client), "server" => Ok(Self::Server), "sql_server" => Ok(Self::SqlServer), @@ -67,12 +78,28 @@ impl FromStr for ExampleKind { } impl ExampleKind { - const ALL: [Self; 3] = [Self::Client, Self::Server, Self::SqlServer]; + const ALL_VARIANTS: [Self; 4] = + [Self::All, Self::Client, Self::Server, Self::SqlServer]; + + const RUNNABLE_VARIANTS: [Self; 3] = [Self::Client, Self::Server, Self::SqlServer]; const EXAMPLE_NAME: &str = "flight"; fn variants() -> Vec<&'static str> { - Self::ALL.iter().map(|x| x.as_ref()).collect() + Self::ALL_VARIANTS + .iter() + .map(|example| example.as_ref()) + .collect() + } + + async fn run(&self) -> Result<(), Box> { + match self { + ExampleKind::Client => client::client().await?, + ExampleKind::Server => server::server().await?, + ExampleKind::SqlServer => sql_server::sql_server().await?, + ExampleKind::All => unreachable!("`All` should be handled in main"), + } + Ok(()) } } @@ -90,9 +117,13 @@ async fn main() -> Result<(), Box> { })?; match arg.parse::()? { - ExampleKind::Client => client::client().await?, - ExampleKind::Server => server::server().await?, - ExampleKind::SqlServer => sql_server::sql_server().await?, + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + example.run().await?; + } + } + example => example.run().await?, } Ok(()) diff --git a/datafusion-examples/examples/proto/main.rs b/datafusion-examples/examples/proto/main.rs index e15ba329a43e1..0a260768637ad 100644 --- a/datafusion-examples/examples/proto/main.rs +++ b/datafusion-examples/examples/proto/main.rs @@ -21,10 +21,11 @@ //! //! ## Usage //! ```bash -//! cargo run --example proto -- [composed_extension_codec] +//! cargo run --example proto -- [all|composed_extension_codec] //! ``` //! //! Each subcommand runs a corresponding example: +//! - `all` — run all examples included in this module //! - `composed_extension_codec` — example of using multiple extension codecs for serialization / deserialization mod composed_extension_codec; @@ -34,12 +35,14 @@ use std::str::FromStr; use datafusion::error::{DataFusionError, Result}; enum ExampleKind { + All, ComposedExtensionCodec, } impl AsRef for ExampleKind { fn as_ref(&self) -> &str { match self { + Self::All => "all", Self::ComposedExtensionCodec => "composed_extension_codec", } } @@ -50,6 +53,7 @@ impl FromStr for ExampleKind { fn from_str(s: &str) -> Result { match s { + "all" => Ok(Self::All), "composed_extension_codec" => Ok(Self::ComposedExtensionCodec), _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), } @@ -57,12 +61,27 @@ impl FromStr for ExampleKind { } impl ExampleKind { - const ALL: [Self; 1] = [Self::ComposedExtensionCodec]; + const ALL_VARIANTS: [Self; 2] = [Self::All, Self::ComposedExtensionCodec]; + + const RUNNABLE_VARIANTS: [Self; 1] = [Self::ComposedExtensionCodec]; const EXAMPLE_NAME: &str = "proto"; fn variants() -> Vec<&'static str> { - Self::ALL.iter().map(|x| x.as_ref()).collect() + Self::ALL_VARIANTS + .iter() + .map(|example| example.as_ref()) + .collect() + } + + async fn run(&self) -> Result<()> { + match self { + ExampleKind::ComposedExtensionCodec => { + composed_extension_codec::composed_extension_codec().await? + } + ExampleKind::All => unreachable!("`All` should be handled in main"), + } + Ok(()) } } @@ -80,9 +99,13 @@ async fn main() -> Result<()> { })?; match arg.parse::()? { - ExampleKind::ComposedExtensionCodec => { - composed_extension_codec::composed_extension_codec().await? + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + example.run().await?; + } } + example => example.run().await?, } Ok(()) diff --git a/datafusion-examples/examples/query_planning/main.rs b/datafusion-examples/examples/query_planning/main.rs index a2b6f0925a6ca..bb2ed6d9f5952 100644 --- a/datafusion-examples/examples/query_planning/main.rs +++ b/datafusion-examples/examples/query_planning/main.rs @@ -21,10 +21,11 @@ //! //! ## Usage //! ```bash -//! cargo run --example query_planning -- [analyzer_rule|expr_api|optimizer_rule|parse_sql_expr|plan_to_sql|planner_api|pruning|thread_pools] +//! cargo run --example query_planning -- [all|analyzer_rule|expr_api|optimizer_rule|parse_sql_expr|plan_to_sql|planner_api|pruning|thread_pools] //! ``` //! //! Each subcommand runs a corresponding example: +//! - `all` — run all examples included in this module //! - `analyzer_rule` — use a custom AnalyzerRule to change a query's semantics (row level access control) //! - `expr_api` — create, execute, simplify, analyze and coerce `Expr`s //! - `optimizer_rule` — use a custom OptimizerRule to replace certain predicates @@ -48,6 +49,7 @@ use std::str::FromStr; use datafusion::error::{DataFusionError, Result}; enum ExampleKind { + All, AnalyzerRule, ExprApi, OptimizerRule, @@ -61,6 +63,7 @@ enum ExampleKind { impl AsRef for ExampleKind { fn as_ref(&self) -> &str { match self { + Self::All => "all", Self::AnalyzerRule => "analyzer_rule", Self::ExprApi => "expr_api", Self::OptimizerRule => "optimizer_rule", @@ -78,6 +81,7 @@ impl FromStr for ExampleKind { fn from_str(s: &str) -> Result { match s { + "all" => Ok(Self::All), "analyzer_rule" => Ok(Self::AnalyzerRule), "expr_api" => Ok(Self::ExprApi), "optimizer_rule" => Ok(Self::OptimizerRule), @@ -92,7 +96,19 @@ impl FromStr for ExampleKind { } impl ExampleKind { - const ALL: [Self; 8] = [ + const ALL_VARIANTS: [Self; 9] = [ + Self::All, + Self::AnalyzerRule, + Self::ExprApi, + Self::OptimizerRule, + Self::ParseSqlExpr, + Self::PlanToSql, + Self::PlannerApi, + Self::Pruning, + Self::ThreadPools, + ]; + + const RUNNABLE_VARIANTS: [Self; 8] = [ Self::AnalyzerRule, Self::ExprApi, Self::OptimizerRule, @@ -106,7 +122,25 @@ impl ExampleKind { const EXAMPLE_NAME: &str = "query_planning"; fn variants() -> Vec<&'static str> { - Self::ALL.iter().map(|x| x.as_ref()).collect() + Self::ALL_VARIANTS + .iter() + .map(|example| example.as_ref()) + .collect() + } + + async fn run(&self) -> Result<()> { + match self { + ExampleKind::AnalyzerRule => analyzer_rule::analyzer_rule().await?, + ExampleKind::ExprApi => expr_api::expr_api().await?, + ExampleKind::OptimizerRule => optimizer_rule::optimizer_rule().await?, + ExampleKind::ParseSqlExpr => parse_sql_expr::parse_sql_expr().await?, + ExampleKind::PlanToSql => plan_to_sql::plan_to_sql_examples().await?, + ExampleKind::PlannerApi => planner_api::planner_api().await?, + ExampleKind::Pruning => pruning::pruning().await?, + ExampleKind::ThreadPools => thread_pools::thread_pools().await?, + ExampleKind::All => unreachable!("`All` should be handled in main"), + } + Ok(()) } } @@ -124,14 +158,13 @@ async fn main() -> Result<()> { })?; match arg.parse::()? { - ExampleKind::AnalyzerRule => analyzer_rule::analyzer_rule().await?, - ExampleKind::ExprApi => expr_api::expr_api().await?, - ExampleKind::OptimizerRule => optimizer_rule::optimizer_rule().await?, - ExampleKind::ParseSqlExpr => parse_sql_expr::parse_sql_expr().await?, - ExampleKind::PlanToSql => plan_to_sql::plan_to_sql_examples().await?, - ExampleKind::PlannerApi => planner_api::planner_api().await?, - ExampleKind::Pruning => pruning::pruning().await?, - ExampleKind::ThreadPools => thread_pools::thread_pools().await?, + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + example.run().await?; + } + } + example => example.run().await?, } Ok(()) diff --git a/datafusion-examples/examples/sql_ops/main.rs b/datafusion-examples/examples/sql_ops/main.rs index d67b16bf5c20d..e310358edc7c3 100644 --- a/datafusion-examples/examples/sql_ops/main.rs +++ b/datafusion-examples/examples/sql_ops/main.rs @@ -21,10 +21,11 @@ //! //! ## Usage //! ```bash -//! cargo run --example sql_ops -- [analysis|dialect|frontend|query] +//! cargo run --example sql_ops -- [all|analysis|dialect|frontend|query] //! ``` //! //! Each subcommand runs a corresponding example: +//! - `all` — run all examples included in this module //! - `analysis` — analyse SQL queries with DataFusion structures //! - `dialect` — implementing a custom SQL dialect on top of DFParser //! - `frontend` — create LogicalPlans (only) from sql strings @@ -40,6 +41,7 @@ use std::str::FromStr; use datafusion::error::{DataFusionError, Result}; enum ExampleKind { + All, Analysis, Dialect, Frontend, @@ -49,6 +51,7 @@ enum ExampleKind { impl AsRef for ExampleKind { fn as_ref(&self) -> &str { match self { + Self::All => "all", Self::Analysis => "analysis", Self::Dialect => "dialect", Self::Frontend => "frontend", @@ -62,6 +65,7 @@ impl FromStr for ExampleKind { fn from_str(s: &str) -> Result { match s { + "all" => Ok(Self::All), "analysis" => Ok(Self::Analysis), "dialect" => Ok(Self::Dialect), "frontend" => Ok(Self::Frontend), @@ -72,12 +76,35 @@ impl FromStr for ExampleKind { } impl ExampleKind { - const ALL: [Self; 4] = [Self::Analysis, Self::Dialect, Self::Frontend, Self::Query]; + const ALL_VARIANTS: [Self; 5] = [ + Self::All, + Self::Analysis, + Self::Dialect, + Self::Frontend, + Self::Query, + ]; + + const RUNNABLE_VARIANTS: [Self; 4] = + [Self::Analysis, Self::Dialect, Self::Frontend, Self::Query]; const EXAMPLE_NAME: &str = "sql_ops"; fn variants() -> Vec<&'static str> { - Self::ALL.iter().map(|x| x.as_ref()).collect() + Self::ALL_VARIANTS + .iter() + .map(|example| example.as_ref()) + .collect() + } + + async fn run(&self) -> Result<()> { + match self { + ExampleKind::Analysis => analysis::analysis().await?, + ExampleKind::Dialect => dialect::dialect().await?, + ExampleKind::Frontend => frontend::frontend()?, + ExampleKind::Query => query::query().await?, + ExampleKind::All => unreachable!("`All` should be handled in main"), + } + Ok(()) } } @@ -95,10 +122,13 @@ async fn main() -> Result<()> { })?; match arg.parse::()? { - ExampleKind::Analysis => analysis::analysis().await?, - ExampleKind::Dialect => dialect::dialect().await?, - ExampleKind::Frontend => frontend::frontend()?, - ExampleKind::Query => query::query().await?, + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + example.run().await?; + } + } + example => example.run().await?, } Ok(()) diff --git a/datafusion-examples/examples/udf/main.rs b/datafusion-examples/examples/udf/main.rs index 104d373937809..d3dd1e325cd9b 100644 --- a/datafusion-examples/examples/udf/main.rs +++ b/datafusion-examples/examples/udf/main.rs @@ -21,10 +21,11 @@ //! //! ## Usage //! ```bash -//! cargo run --example udf -- [adv_udaf|adv_udf|adv_udwf|async_udf|udaf|udf|udtf|udwf] +//! cargo run --example udf -- [all|adv_udaf|adv_udf|adv_udwf|async_udf|udaf|udf|udtf|udwf] //! ``` //! //! Each subcommand runs a corresponding example: +//! - `all` — run all examples included in this module //! - `adv_udaf` — user defined aggregate function example //! - `adv_udf` — user defined scalar function example //! - `adv_udwf` — user defined window function example @@ -48,6 +49,7 @@ use std::str::FromStr; use datafusion::error::{DataFusionError, Result}; enum ExampleKind { + All, AdvUdaf, AdvUdf, AdvUdwf, @@ -61,6 +63,7 @@ enum ExampleKind { impl AsRef for ExampleKind { fn as_ref(&self) -> &str { match self { + Self::All => "all", Self::AdvUdaf => "adv_udaf", Self::AdvUdf => "adv_udf", Self::AdvUdwf => "adv_udwf", @@ -78,6 +81,7 @@ impl FromStr for ExampleKind { fn from_str(s: &str) -> Result { match s { + "all" => Ok(Self::All), "adv_udaf" => Ok(Self::AdvUdaf), "adv_udf" => Ok(Self::AdvUdf), "adv_udwf" => Ok(Self::AdvUdwf), @@ -92,7 +96,19 @@ impl FromStr for ExampleKind { } impl ExampleKind { - const ALL: [Self; 8] = [ + const ALL_VARIANTS: [Self; 9] = [ + Self::All, + Self::AdvUdaf, + Self::AdvUdf, + Self::AdvUdwf, + Self::AsyncUdf, + Self::Udaf, + Self::Udf, + Self::Udtf, + Self::Udwf, + ]; + + const RUNNABLE_VARIANTS: [Self; 8] = [ Self::AdvUdaf, Self::AdvUdf, Self::AdvUdwf, @@ -106,7 +122,25 @@ impl ExampleKind { const EXAMPLE_NAME: &str = "udf"; fn variants() -> Vec<&'static str> { - Self::ALL.iter().map(|x| x.as_ref()).collect() + Self::ALL_VARIANTS + .iter() + .map(|example| example.as_ref()) + .collect() + } + + async fn run(&self) -> Result<()> { + match self { + ExampleKind::AdvUdaf => advanced_udaf::advanced_udaf().await?, + ExampleKind::AdvUdf => advanced_udf::advanced_udf().await?, + ExampleKind::AdvUdwf => advanced_udwf::advanced_udwf().await?, + ExampleKind::AsyncUdf => async_udf::async_udf().await?, + ExampleKind::Udaf => simple_udaf::simple_udaf().await?, + ExampleKind::Udf => simple_udf::simple_udf().await?, + ExampleKind::Udtf => simple_udtf::simple_udtf().await?, + ExampleKind::Udwf => simple_udwf::simple_udwf().await?, + ExampleKind::All => unreachable!("`All` should be handled in main"), + } + Ok(()) } } @@ -124,14 +158,13 @@ async fn main() -> Result<()> { })?; match arg.parse::()? { - ExampleKind::AdvUdaf => advanced_udaf::advanced_udaf().await?, - ExampleKind::AdvUdf => advanced_udf::advanced_udf().await?, - ExampleKind::AdvUdwf => advanced_udwf::advanced_udwf().await?, - ExampleKind::AsyncUdf => async_udf::async_udf().await?, - ExampleKind::Udaf => simple_udaf::simple_udaf().await?, - ExampleKind::Udf => simple_udf::simple_udf().await?, - ExampleKind::Udtf => simple_udtf::simple_udtf().await?, - ExampleKind::Udwf => simple_udwf::simple_udwf().await?, + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + example.run().await?; + } + } + example => example.run().await?, } Ok(()) From 60c2a9e6a991efde6b04fc14ef88dc64d7fc632b Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Wed, 26 Nov 2025 17:29:14 +0300 Subject: [PATCH 02/11] Fix issues causing GitHub checks to fail --- datafusion-examples/examples/flight/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion-examples/examples/flight/main.rs b/datafusion-examples/examples/flight/main.rs index f3c36d6a32c68..2f3136a63e793 100644 --- a/datafusion-examples/examples/flight/main.rs +++ b/datafusion-examples/examples/flight/main.rs @@ -26,9 +26,9 @@ //! //! Each subcommand runs a corresponding example: //! - `all` — run all examples included in this module -//! Note: The Flight server must be started in a separate process -//! before running the `client` example. Therefore, running `all` will -//! not produce a full server+client workflow automatically. +//! Note: The Flight server must be started in a separate process +//! before running the `client` example. Therefore, running `all` will +//! not produce a full server+client workflow automatically. //! - `client` — run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol //! - `server` — run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol //! - `sql_server` — run DataFusion as a standalone process and execute SQL queries from JDBC clients From ec321cc3d44c56531fd52ab3534e314ae65946c4 Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Thu, 27 Nov 2025 09:41:09 +0300 Subject: [PATCH 03/11] Trigger CI From ec77fba66754758202248ad0b5a98f8c70431d8a Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Thu, 27 Nov 2025 10:56:17 +0300 Subject: [PATCH 04/11] fix typo in custom_data_source and update rust_example.sh wih passing correct arg --- ci/scripts/rust_example.sh | 2 +- .../examples/custom_data_source/main.rs | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ci/scripts/rust_example.sh b/ci/scripts/rust_example.sh index 23f8df0195807..7a5f7825b4e6d 100755 --- a/ci/scripts/rust_example.sh +++ b/ci/scripts/rust_example.sh @@ -46,5 +46,5 @@ for dir in */; do fi echo "Running example group: $example_name" - cargo run --profile ci --example "$example_name" all + cargo run --profile ci --example "$example_name" -- all done diff --git a/datafusion-examples/examples/custom_data_source/main.rs b/datafusion-examples/examples/custom_data_source/main.rs index b23e3658b485d..83e28bfab6718 100644 --- a/datafusion-examples/examples/custom_data_source/main.rs +++ b/datafusion-examples/examples/custom_data_source/main.rs @@ -54,7 +54,7 @@ enum ExampleKind { CustomFileCasts, CustomFileFormat, DefaultColumnValues, - FileFtreamProvider, + FileStreamProvider, } impl AsRef for ExampleKind { @@ -67,7 +67,7 @@ impl AsRef for ExampleKind { Self::CustomFileCasts => "custom_file_casts", Self::CustomFileFormat => "custom_file_format", Self::DefaultColumnValues => "default_column_values", - Self::FileFtreamProvider => "file_stream_provider", + Self::FileStreamProvider => "file_stream_provider", } } } @@ -84,7 +84,7 @@ impl FromStr for ExampleKind { "custom_file_casts" => Ok(Self::CustomFileCasts), "custom_file_format" => Ok(Self::CustomFileFormat), "default_column_values" => Ok(Self::DefaultColumnValues), - "file_stream_provider" => Ok(Self::FileFtreamProvider), + "file_stream_provider" => Ok(Self::FileStreamProvider), _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), } } @@ -99,7 +99,7 @@ impl ExampleKind { Self::CustomFileCasts, Self::CustomFileFormat, Self::DefaultColumnValues, - Self::FileFtreamProvider, + Self::FileStreamProvider, ]; const RUNNABLE_VARIANTS: [Self; 7] = [ @@ -109,7 +109,7 @@ impl ExampleKind { Self::CustomFileCasts, Self::CustomFileFormat, Self::DefaultColumnValues, - Self::FileFtreamProvider, + Self::FileStreamProvider, ]; const EXAMPLE_NAME: &str = "custom_data_source"; @@ -139,7 +139,7 @@ impl ExampleKind { ExampleKind::DefaultColumnValues => { default_column_values::default_column_values().await? } - ExampleKind::FileFtreamProvider => { + ExampleKind::FileStreamProvider => { file_stream_provider::file_stream_provider().await? } ExampleKind::All => unreachable!("`All` should be handled in main"), From 14a91c1bdd511aa583e7a64e6e944e283f8634d0 Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Thu, 27 Nov 2025 16:23:09 +0300 Subject: [PATCH 05/11] fix regexp example using content from regex.csv file --- .../examples/builtin_functions/regexp.rs | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/datafusion-examples/examples/builtin_functions/regexp.rs b/datafusion-examples/examples/builtin_functions/regexp.rs index b8e15431603d4..144a7ba1981a6 100644 --- a/datafusion-examples/examples/builtin_functions/regexp.rs +++ b/datafusion-examples/examples/builtin_functions/regexp.rs @@ -18,9 +18,13 @@ //! See `main.rs` for how to run it. +use std::fs::File; +use std::io::Write; + use datafusion::common::{assert_batches_eq, assert_contains}; use datafusion::error::Result; use datafusion::prelude::*; +use tempfile::tempdir; /// This example demonstrates how to use the regexp_* functions /// @@ -32,12 +36,30 @@ use datafusion::prelude::*; /// https://docs.rs/regex/latest/regex/#grouping-and-flags pub async fn regexp() -> Result<()> { let ctx = SessionContext::new(); - ctx.register_csv( - "examples", - "datafusion/physical-expr/tests/data/regex.csv", - CsvReadOptions::new(), - ) - .await?; + // content from file 'datafusion/physical-expr/tests/data/regex.csv' + let csv_data = r#"values,patterns,replacement,flags +abc,^(a),bb\1bb,i +ABC,^(A).*,B,i +aBc,(b|d),e,i +AbC,(B|D),e, +aBC,^(b|c),d, +4000,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz, +4010,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz, +Düsseldorf,[\p{Letter}-]+,München, +Москва,[\p{L}-]+,Moscow, +Köln,[a-zA-Z]ö[a-zA-Z]{2},Koln, +اليوم,^\p{Arabic}+$,Today,"#; + let dir = tempdir()?; + let file_path = dir.path().join("regex.csv"); + { + let mut file = File::create(&file_path)?; + // write CSV data + file.write_all(csv_data.as_bytes())?; + } // scope closes the file + let file_path = file_path.to_str().unwrap(); + + ctx.register_csv("examples", file_path, CsvReadOptions::new()) + .await?; // // From ca8300f84d96a8762223102ff4ed6ce43bd60522 Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Thu, 27 Nov 2025 17:23:16 +0300 Subject: [PATCH 06/11] fix type in udf examples and use csv data from cars.csv file --- .../examples/builtin_functions/regexp.rs | 3 +- .../examples/udf/advanced_udwf.rs | 49 ++++++++++++++++--- datafusion-examples/examples/udf/async_udf.rs | 31 ++++++------ datafusion-examples/examples/udf/main.rs | 2 +- .../examples/udf/simple_udwf.rs | 48 +++++++++++++++--- 5 files changed, 100 insertions(+), 33 deletions(-) diff --git a/datafusion-examples/examples/builtin_functions/regexp.rs b/datafusion-examples/examples/builtin_functions/regexp.rs index 144a7ba1981a6..4c6b6c67ac4d5 100644 --- a/datafusion-examples/examples/builtin_functions/regexp.rs +++ b/datafusion-examples/examples/builtin_functions/regexp.rs @@ -18,8 +18,7 @@ //! See `main.rs` for how to run it. -use std::fs::File; -use std::io::Write; +use std::{fs::File, io::Write}; use datafusion::common::{assert_batches_eq, assert_contains}; use datafusion::error::Result; diff --git a/datafusion-examples/examples/udf/advanced_udwf.rs b/datafusion-examples/examples/udf/advanced_udwf.rs index 37b6671639e47..e8d3a75b29dec 100644 --- a/datafusion-examples/examples/udf/advanced_udwf.rs +++ b/datafusion-examples/examples/udf/advanced_udwf.rs @@ -17,6 +17,8 @@ //! See `main.rs` for how to run it. +use std::{any::Any, fs::File, io::Write, sync::Arc}; + use arrow::datatypes::Field; use arrow::{ array::{ArrayRef, AsArray, Float64Array}, @@ -38,8 +40,7 @@ use datafusion::logical_expr::{ use datafusion::physical_expr::PhysicalExpr; use datafusion::prelude::*; use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; -use std::any::Any; -use std::sync::Arc; +use tempfile::tempdir; /// This example shows how to use the full WindowUDFImpl API to implement a user /// defined window function. As in the `simple_udwf.rs` example, this struct implements @@ -229,12 +230,46 @@ async fn create_context() -> Result { // declare a new context. In spark API, this corresponds to a new spark SQL session let ctx = SessionContext::new(); - // declare a table in memory. In spark API, this corresponds to createDataFrame(...). - println!("pwd: {}", std::env::current_dir().unwrap().display()); - let csv_path = "../../datafusion/core/tests/data/cars.csv".to_string(); - let read_options = CsvReadOptions::default().has_header(true); + // content from file 'datafusion/core/tests/data/cars.csv' + let csv_data = r#"car,speed,time +red,20.0,1996-04-12T12:05:03.000000000 +red,20.3,1996-04-12T12:05:04.000000000 +red,21.4,1996-04-12T12:05:05.000000000 +red,21.5,1996-04-12T12:05:06.000000000 +red,19.0,1996-04-12T12:05:07.000000000 +red,18.0,1996-04-12T12:05:08.000000000 +red,17.0,1996-04-12T12:05:09.000000000 +red,7.0,1996-04-12T12:05:10.000000000 +red,7.1,1996-04-12T12:05:11.000000000 +red,7.2,1996-04-12T12:05:12.000000000 +red,3.0,1996-04-12T12:05:13.000000000 +red,1.0,1996-04-12T12:05:14.000000000 +red,0.0,1996-04-12T12:05:15.000000000 +green,10.0,1996-04-12T12:05:03.000000000 +green,10.3,1996-04-12T12:05:04.000000000 +green,10.4,1996-04-12T12:05:05.000000000 +green,10.5,1996-04-12T12:05:06.000000000 +green,11.0,1996-04-12T12:05:07.000000000 +green,12.0,1996-04-12T12:05:08.000000000 +green,14.0,1996-04-12T12:05:09.000000000 +green,15.0,1996-04-12T12:05:10.000000000 +green,15.1,1996-04-12T12:05:11.000000000 +green,15.2,1996-04-12T12:05:12.000000000 +green,8.0,1996-04-12T12:05:13.000000000 +green,2.0,1996-04-12T12:05:14.000000000 +"#; + let dir = tempdir()?; + let file_path = dir.path().join("cars.csv"); + { + let mut file = File::create(&file_path)?; + // write CSV data + file.write_all(csv_data.as_bytes())?; + } // scope closes the file + let file_path = file_path.to_str().unwrap(); + + ctx.register_csv("cars", file_path, CsvReadOptions::new()) + .await?; - ctx.register_csv("cars", &csv_path, read_options).await?; Ok(ctx) } diff --git a/datafusion-examples/examples/udf/async_udf.rs b/datafusion-examples/examples/udf/async_udf.rs index c55650223cd4c..18630d33a3436 100644 --- a/datafusion-examples/examples/udf/async_udf.rs +++ b/datafusion-examples/examples/udf/async_udf.rs @@ -23,6 +23,8 @@ //! making network requests. This can be used for tasks like fetching //! data from an external API such as a LLM service or an external database. +use std::{any::Any, sync::Arc}; + use arrow::array::{ArrayRef, BooleanArray, Int64Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; use async_trait::async_trait; @@ -37,8 +39,6 @@ use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; use datafusion::prelude::{SessionConfig, SessionContext}; -use std::any::Any; -use std::sync::Arc; /// In this example we register `AskLLM` as an asynchronous user defined function /// and invoke it via the DataFrame API and SQL @@ -93,20 +93,19 @@ pub async fn async_udf() -> Result<()> { assert_batches_eq!( [ - "+---------------+--------------------------------------------------------------------------------------------------------------------------------+", - "| plan_type | plan |", - "+---------------+--------------------------------------------------------------------------------------------------------------------------------+", - "| logical_plan | SubqueryAlias: a |", - "| | Filter: ask_llm(CAST(animal.name AS Utf8View), Utf8View(\"Is this animal furry?\")) |", - "| | TableScan: animal projection=[id, name] |", - "| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |", - "| | FilterExec: __async_fn_0@2, projection=[id@0, name@1] |", - "| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |", - "| | AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=ask_llm(CAST(name@1 AS Utf8View), Is this animal furry?))] |", - "| | CoalesceBatchesExec: target_batch_size=8192 |", - "| | DataSourceExec: partitions=1, partition_sizes=[1] |", - "| | |", - "+---------------+--------------------------------------------------------------------------------------------------------------------------------+", + "+---------------+------------------------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+---------------+------------------------------------------------------------------------------------------------------------------------------+", + "| logical_plan | SubqueryAlias: a |", + "| | Filter: ask_llm(CAST(animal.name AS Utf8View), Utf8View(\"Is this animal furry?\")) |", + "| | TableScan: animal projection=[id, name] |", + "| physical_plan | FilterExec: __async_fn_0@2, projection=[id@0, name@1] |", + "| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |", + "| | AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=ask_llm(CAST(name@1 AS Utf8View), Is this animal furry?))] |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", + "| | DataSourceExec: partitions=1, partition_sizes=[1] |", + "| | |", + "+---------------+------------------------------------------------------------------------------------------------------------------------------+", ], &results ); diff --git a/datafusion-examples/examples/udf/main.rs b/datafusion-examples/examples/udf/main.rs index d3dd1e325cd9b..60fa1a9e50c77 100644 --- a/datafusion-examples/examples/udf/main.rs +++ b/datafusion-examples/examples/udf/main.rs @@ -70,7 +70,7 @@ impl AsRef for ExampleKind { Self::AsyncUdf => "async_udf", Self::Udf => "udf", Self::Udaf => "udaf", - Self::Udwf => "udwt", + Self::Udwf => "udwf", Self::Udtf => "udtf", } } diff --git a/datafusion-examples/examples/udf/simple_udwf.rs b/datafusion-examples/examples/udf/simple_udwf.rs index a4f6d59a7d6d4..1842d88b9ba29 100644 --- a/datafusion-examples/examples/udf/simple_udwf.rs +++ b/datafusion-examples/examples/udf/simple_udwf.rs @@ -17,29 +17,63 @@ //! See `main.rs` for how to run it. -use std::sync::Arc; +use std::{fs::File, io::Write, sync::Arc}; use arrow::{ array::{ArrayRef, AsArray, Float64Array}, datatypes::{DataType, Float64Type}, }; - use datafusion::common::ScalarValue; use datafusion::error::Result; use datafusion::logical_expr::{PartitionEvaluator, Volatility, WindowFrame}; use datafusion::prelude::*; +use tempfile::tempdir; // create local execution context with `cars.csv` registered as a table named `cars` async fn create_context() -> Result { // declare a new context. In spark API, this corresponds to a new spark SQL session let ctx = SessionContext::new(); - // declare a table in memory. In spark API, this corresponds to createDataFrame(...). - println!("pwd: {}", std::env::current_dir().unwrap().display()); - let csv_path = "../../datafusion/core/tests/data/cars.csv".to_string(); - let read_options = CsvReadOptions::default().has_header(true); + // content from file 'datafusion/core/tests/data/cars.csv' + let csv_data = r#"car,speed,time +red,20.0,1996-04-12T12:05:03.000000000 +red,20.3,1996-04-12T12:05:04.000000000 +red,21.4,1996-04-12T12:05:05.000000000 +red,21.5,1996-04-12T12:05:06.000000000 +red,19.0,1996-04-12T12:05:07.000000000 +red,18.0,1996-04-12T12:05:08.000000000 +red,17.0,1996-04-12T12:05:09.000000000 +red,7.0,1996-04-12T12:05:10.000000000 +red,7.1,1996-04-12T12:05:11.000000000 +red,7.2,1996-04-12T12:05:12.000000000 +red,3.0,1996-04-12T12:05:13.000000000 +red,1.0,1996-04-12T12:05:14.000000000 +red,0.0,1996-04-12T12:05:15.000000000 +green,10.0,1996-04-12T12:05:03.000000000 +green,10.3,1996-04-12T12:05:04.000000000 +green,10.4,1996-04-12T12:05:05.000000000 +green,10.5,1996-04-12T12:05:06.000000000 +green,11.0,1996-04-12T12:05:07.000000000 +green,12.0,1996-04-12T12:05:08.000000000 +green,14.0,1996-04-12T12:05:09.000000000 +green,15.0,1996-04-12T12:05:10.000000000 +green,15.1,1996-04-12T12:05:11.000000000 +green,15.2,1996-04-12T12:05:12.000000000 +green,8.0,1996-04-12T12:05:13.000000000 +green,2.0,1996-04-12T12:05:14.000000000 +"#; + let dir = tempdir()?; + let file_path = dir.path().join("cars.csv"); + { + let mut file = File::create(&file_path)?; + // write CSV data + file.write_all(csv_data.as_bytes())?; + } // scope closes the file + let file_path = file_path.to_str().unwrap(); + + ctx.register_csv("cars", file_path, CsvReadOptions::new()) + .await?; - ctx.register_csv("cars", &csv_path, read_options).await?; Ok(ctx) } From a4cac2f0dd4e40b120b9016fa5623365692c6a46 Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Fri, 28 Nov 2025 09:17:42 +0300 Subject: [PATCH 07/11] Track generated file so rustfmt matches CI --- datafusion/proto/src/generated/datafusion.rs | 1 + 1 file changed, 1 insertion(+) create mode 100644 datafusion/proto/src/generated/datafusion.rs diff --git a/datafusion/proto/src/generated/datafusion.rs b/datafusion/proto/src/generated/datafusion.rs new file mode 100644 index 0000000000000..8b137891791fe --- /dev/null +++ b/datafusion/proto/src/generated/datafusion.rs @@ -0,0 +1 @@ + From 5110e4cbed371e5bd302ec547e90ec1269c1624a Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Fri, 28 Nov 2025 13:12:52 +0300 Subject: [PATCH 08/11] move logic to run() --- .../examples/builtin_functions/main.rs | 20 +++++++----------- .../examples/custom_data_source/main.rs | 20 +++++++----------- datafusion-examples/examples/data_io/main.rs | 20 +++++++----------- .../examples/dataframe/main.rs | 20 +++++++----------- .../examples/execution_monitoring/main.rs | 20 +++++++----------- .../examples/external_dependency/main.rs | 20 +++++++----------- datafusion-examples/examples/flight/main.rs | 20 +++++++----------- datafusion-examples/examples/proto/main.rs | 20 +++++++----------- .../examples/query_planning/main.rs | 20 +++++++----------- datafusion-examples/examples/sql_ops/main.rs | 20 +++++++----------- datafusion-examples/examples/udf/main.rs | 21 ++++++++----------- 11 files changed, 89 insertions(+), 132 deletions(-) diff --git a/datafusion-examples/examples/builtin_functions/main.rs b/datafusion-examples/examples/builtin_functions/main.rs index c3780f62a5d14..f0aa241a6e040 100644 --- a/datafusion-examples/examples/builtin_functions/main.rs +++ b/datafusion-examples/examples/builtin_functions/main.rs @@ -92,10 +92,15 @@ impl ExampleKind { async fn run(&self) -> Result<()> { match self { + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + Box::pin(example.run()).await?; + } + } ExampleKind::DateTime => date_time::date_time().await?, ExampleKind::FunctionFactory => function_factory::function_factory().await?, ExampleKind::Regexp => regexp::regexp().await?, - ExampleKind::All => unreachable!("`All` should be handled in main"), } Ok(()) } @@ -114,15 +119,6 @@ async fn main() -> Result<()> { DataFusionError::Execution("Missing argument".to_string()) })?; - match arg.parse::()? { - ExampleKind::All => { - for example in ExampleKind::RUNNABLE_VARIANTS { - println!("Running example: {}", example.as_ref()); - example.run().await?; - } - } - example => example.run().await?, - } - - Ok(()) + let example = arg.parse::()?; + example.run().await } diff --git a/datafusion-examples/examples/custom_data_source/main.rs b/datafusion-examples/examples/custom_data_source/main.rs index 83e28bfab6718..779d3ce6f072e 100644 --- a/datafusion-examples/examples/custom_data_source/main.rs +++ b/datafusion-examples/examples/custom_data_source/main.rs @@ -123,6 +123,12 @@ impl ExampleKind { async fn run(&self) -> Result<()> { match self { + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + Box::pin(example.run()).await?; + } + } ExampleKind::CsvJsonOpener => csv_json_opener::csv_json_opener().await?, ExampleKind::CsvSqlStreaming => { csv_sql_streaming::csv_sql_streaming().await? @@ -142,7 +148,6 @@ impl ExampleKind { ExampleKind::FileStreamProvider => { file_stream_provider::file_stream_provider().await? } - ExampleKind::All => unreachable!("`All` should be handled in main"), } Ok(()) } @@ -161,15 +166,6 @@ async fn main() -> Result<()> { DataFusionError::Execution("Missing argument".to_string()) })?; - match arg.parse::()? { - ExampleKind::All => { - for example in ExampleKind::RUNNABLE_VARIANTS { - println!("Running example: {}", example.as_ref()); - example.run().await?; - } - } - example => example.run().await?, - } - - Ok(()) + let example = arg.parse::()?; + example.run().await } diff --git a/datafusion-examples/examples/data_io/main.rs b/datafusion-examples/examples/data_io/main.rs index 670e3055b4341..3466eeedf1ca5 100644 --- a/datafusion-examples/examples/data_io/main.rs +++ b/datafusion-examples/examples/data_io/main.rs @@ -144,6 +144,12 @@ impl ExampleKind { async fn run(&self) -> Result<()> { match self { + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + Box::pin(example.run()).await?; + } + } ExampleKind::Catalog => catalog::catalog().await?, ExampleKind::JsonShredding => json_shredding::json_shredding().await?, ExampleKind::ParquetAdvIdx => { @@ -162,7 +168,6 @@ impl ExampleKind { ExampleKind::ParquetIdx => parquet_index::parquet_index().await?, ExampleKind::QueryHttpCsv => query_http_csv::query_http_csv().await?, ExampleKind::RemoteCatalog => remote_catalog::remote_catalog().await?, - ExampleKind::All => unreachable!("`All` should be handled in main"), } Ok(()) } @@ -181,15 +186,6 @@ async fn main() -> Result<()> { DataFusionError::Execution("Missing argument".to_string()) })?; - match arg.parse::()? { - ExampleKind::All => { - for example in ExampleKind::RUNNABLE_VARIANTS { - println!("Running example: {}", example.as_ref()); - example.run().await?; - } - } - example => example.run().await?, - } - - Ok(()) + let example = arg.parse::()?; + example.run().await } diff --git a/datafusion-examples/examples/dataframe/main.rs b/datafusion-examples/examples/dataframe/main.rs index b8288cf4d709a..f6a7a025a8e06 100644 --- a/datafusion-examples/examples/dataframe/main.rs +++ b/datafusion-examples/examples/dataframe/main.rs @@ -82,11 +82,16 @@ impl ExampleKind { async fn run(&self) -> Result<()> { match self { + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + Box::pin(example.run()).await?; + } + } ExampleKind::Dataframe => dataframe::dataframe_example().await?, ExampleKind::DeserializeToStruct => { deserialize_to_struct::deserialize_to_struct().await? } - ExampleKind::All => unreachable!("`All` should be handled in main"), } Ok(()) } @@ -105,15 +110,6 @@ async fn main() -> Result<()> { DataFusionError::Execution("Missing argument".to_string()) })?; - match arg.parse::()? { - ExampleKind::All => { - for example in ExampleKind::RUNNABLE_VARIANTS { - println!("Running example: {}", example.as_ref()); - example.run().await?; - } - } - example => example.run().await?, - } - - Ok(()) + let example = arg.parse::()?; + example.run().await } diff --git a/datafusion-examples/examples/execution_monitoring/main.rs b/datafusion-examples/examples/execution_monitoring/main.rs index 5201522d813f5..39126968d1f35 100644 --- a/datafusion-examples/examples/execution_monitoring/main.rs +++ b/datafusion-examples/examples/execution_monitoring/main.rs @@ -95,6 +95,12 @@ impl ExampleKind { async fn run(&self) -> Result<()> { match self { + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + Box::pin(example.run()).await?; + } + } ExampleKind::MemoryPoolExecutionPlan => { memory_pool_execution_plan::memory_pool_execution_plan().await? } @@ -102,7 +108,6 @@ impl ExampleKind { memory_pool_tracking::mem_pool_tracking().await? } ExampleKind::Tracing => tracing::tracing().await?, - ExampleKind::All => unreachable!("`All` should be handled in main"), } Ok(()) } @@ -121,15 +126,6 @@ async fn main() -> Result<()> { DataFusionError::Execution("Missing argument".to_string()) })?; - match arg.parse::()? { - ExampleKind::All => { - for example in ExampleKind::RUNNABLE_VARIANTS { - println!("Running example: {}", example.as_ref()); - example.run().await?; - } - } - example => example.run().await?, - } - - Ok(()) + let example = arg.parse::()?; + example.run().await } diff --git a/datafusion-examples/examples/external_dependency/main.rs b/datafusion-examples/examples/external_dependency/main.rs index 7259a2acf1044..55b47ab9b7f83 100644 --- a/datafusion-examples/examples/external_dependency/main.rs +++ b/datafusion-examples/examples/external_dependency/main.rs @@ -81,9 +81,14 @@ impl ExampleKind { async fn run(&self) -> Result<()> { match self { + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + Box::pin(example.run()).await?; + } + } ExampleKind::DataframeToS3 => dataframe_to_s3::dataframe_to_s3().await?, ExampleKind::QueryAwsS3 => query_aws_s3::query_aws_s3().await?, - ExampleKind::All => unreachable!("`All` should be handled in main"), } Ok(()) } @@ -102,15 +107,6 @@ async fn main() -> Result<()> { DataFusionError::Execution("Missing argument".to_string()) })?; - match arg.parse::()? { - ExampleKind::All => { - for example in ExampleKind::RUNNABLE_VARIANTS { - println!("Running example: {}", example.as_ref()); - example.run().await?; - } - } - example => example.run().await?, - } - - Ok(()) + let example = arg.parse::()?; + example.run().await } diff --git a/datafusion-examples/examples/flight/main.rs b/datafusion-examples/examples/flight/main.rs index 2f3136a63e793..d2616023a8fd8 100644 --- a/datafusion-examples/examples/flight/main.rs +++ b/datafusion-examples/examples/flight/main.rs @@ -94,10 +94,15 @@ impl ExampleKind { async fn run(&self) -> Result<(), Box> { match self { + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + Box::pin(example.run()).await?; + } + } ExampleKind::Client => client::client().await?, ExampleKind::Server => server::server().await?, ExampleKind::SqlServer => sql_server::sql_server().await?, - ExampleKind::All => unreachable!("`All` should be handled in main"), } Ok(()) } @@ -116,15 +121,6 @@ async fn main() -> Result<(), Box> { DataFusionError::Execution("Missing argument".to_string()) })?; - match arg.parse::()? { - ExampleKind::All => { - for example in ExampleKind::RUNNABLE_VARIANTS { - println!("Running example: {}", example.as_ref()); - example.run().await?; - } - } - example => example.run().await?, - } - - Ok(()) + let example = arg.parse::()?; + example.run().await } diff --git a/datafusion-examples/examples/proto/main.rs b/datafusion-examples/examples/proto/main.rs index 0a260768637ad..c1ca3e53c052e 100644 --- a/datafusion-examples/examples/proto/main.rs +++ b/datafusion-examples/examples/proto/main.rs @@ -76,10 +76,15 @@ impl ExampleKind { async fn run(&self) -> Result<()> { match self { + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + Box::pin(example.run()).await?; + } + } ExampleKind::ComposedExtensionCodec => { composed_extension_codec::composed_extension_codec().await? } - ExampleKind::All => unreachable!("`All` should be handled in main"), } Ok(()) } @@ -98,15 +103,6 @@ async fn main() -> Result<()> { DataFusionError::Execution("Missing argument".to_string()) })?; - match arg.parse::()? { - ExampleKind::All => { - for example in ExampleKind::RUNNABLE_VARIANTS { - println!("Running example: {}", example.as_ref()); - example.run().await?; - } - } - example => example.run().await?, - } - - Ok(()) + let example = arg.parse::()?; + example.run().await } diff --git a/datafusion-examples/examples/query_planning/main.rs b/datafusion-examples/examples/query_planning/main.rs index bb2ed6d9f5952..17409f0312a2b 100644 --- a/datafusion-examples/examples/query_planning/main.rs +++ b/datafusion-examples/examples/query_planning/main.rs @@ -130,6 +130,12 @@ impl ExampleKind { async fn run(&self) -> Result<()> { match self { + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + Box::pin(example.run()).await?; + } + } ExampleKind::AnalyzerRule => analyzer_rule::analyzer_rule().await?, ExampleKind::ExprApi => expr_api::expr_api().await?, ExampleKind::OptimizerRule => optimizer_rule::optimizer_rule().await?, @@ -138,7 +144,6 @@ impl ExampleKind { ExampleKind::PlannerApi => planner_api::planner_api().await?, ExampleKind::Pruning => pruning::pruning().await?, ExampleKind::ThreadPools => thread_pools::thread_pools().await?, - ExampleKind::All => unreachable!("`All` should be handled in main"), } Ok(()) } @@ -157,15 +162,6 @@ async fn main() -> Result<()> { DataFusionError::Execution("Missing argument".to_string()) })?; - match arg.parse::()? { - ExampleKind::All => { - for example in ExampleKind::RUNNABLE_VARIANTS { - println!("Running example: {}", example.as_ref()); - example.run().await?; - } - } - example => example.run().await?, - } - - Ok(()) + let example = arg.parse::()?; + example.run().await } diff --git a/datafusion-examples/examples/sql_ops/main.rs b/datafusion-examples/examples/sql_ops/main.rs index e310358edc7c3..154972aa74461 100644 --- a/datafusion-examples/examples/sql_ops/main.rs +++ b/datafusion-examples/examples/sql_ops/main.rs @@ -98,11 +98,16 @@ impl ExampleKind { async fn run(&self) -> Result<()> { match self { + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + Box::pin(example.run()).await?; + } + } ExampleKind::Analysis => analysis::analysis().await?, ExampleKind::Dialect => dialect::dialect().await?, ExampleKind::Frontend => frontend::frontend()?, ExampleKind::Query => query::query().await?, - ExampleKind::All => unreachable!("`All` should be handled in main"), } Ok(()) } @@ -121,15 +126,6 @@ async fn main() -> Result<()> { DataFusionError::Execution("Missing argument".to_string()) })?; - match arg.parse::()? { - ExampleKind::All => { - for example in ExampleKind::RUNNABLE_VARIANTS { - println!("Running example: {}", example.as_ref()); - example.run().await?; - } - } - example => example.run().await?, - } - - Ok(()) + let example = arg.parse::()?; + example.run().await } diff --git a/datafusion-examples/examples/udf/main.rs b/datafusion-examples/examples/udf/main.rs index 60fa1a9e50c77..34bd34ab25b35 100644 --- a/datafusion-examples/examples/udf/main.rs +++ b/datafusion-examples/examples/udf/main.rs @@ -130,6 +130,12 @@ impl ExampleKind { async fn run(&self) -> Result<()> { match self { + ExampleKind::All => { + for example in ExampleKind::RUNNABLE_VARIANTS { + println!("Running example: {}", example.as_ref()); + Box::pin(example.run()).await?; + } + } ExampleKind::AdvUdaf => advanced_udaf::advanced_udaf().await?, ExampleKind::AdvUdf => advanced_udf::advanced_udf().await?, ExampleKind::AdvUdwf => advanced_udwf::advanced_udwf().await?, @@ -138,8 +144,8 @@ impl ExampleKind { ExampleKind::Udf => simple_udf::simple_udf().await?, ExampleKind::Udtf => simple_udtf::simple_udtf().await?, ExampleKind::Udwf => simple_udwf::simple_udwf().await?, - ExampleKind::All => unreachable!("`All` should be handled in main"), } + Ok(()) } } @@ -157,15 +163,6 @@ async fn main() -> Result<()> { DataFusionError::Execution("Missing argument".to_string()) })?; - match arg.parse::()? { - ExampleKind::All => { - for example in ExampleKind::RUNNABLE_VARIANTS { - println!("Running example: {}", example.as_ref()); - example.run().await?; - } - } - example => example.run().await?, - } - - Ok(()) + let example = arg.parse::()?; + example.run().await } From f8c18f6b14ca2c22a22352b9efd8e44bc0acbe5f Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Fri, 5 Dec 2025 09:41:08 +0300 Subject: [PATCH 09/11] cleanup test folders in script and use temporary directory --- ci/scripts/rust_example.sh | 9 +++++ .../examples/dataframe/dataframe.rs | 39 ++++++++++++------- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/ci/scripts/rust_example.sh b/ci/scripts/rust_example.sh index 7a5f7825b4e6d..4ef13e909f331 100755 --- a/ci/scripts/rust_example.sh +++ b/ci/scripts/rust_example.sh @@ -22,6 +22,15 @@ set -e export CARGO_PROFILE_CI_OPT_LEVEL="s" export CARGO_PROFILE_CI_STRIP=true +# Remove leftover files inside test_* directories inside datafusion-examples/ +EXAMPLES_ROOT="datafusion-examples" + +find "${EXAMPLES_ROOT}" -maxdepth 1 -type d -name "test_*" | while read -r test_dir; do + echo "Cleaning directory: $test_dir" + find "$test_dir" -mindepth 1 -maxdepth 1 -exec rm -rf {} + +done + +# Run the examples cd datafusion-examples/examples/ cargo build --profile ci --examples diff --git a/datafusion-examples/examples/dataframe/dataframe.rs b/datafusion-examples/examples/dataframe/dataframe.rs index 7ba4464b53d9f..6953170191724 100644 --- a/datafusion-examples/examples/dataframe/dataframe.rs +++ b/datafusion-examples/examples/dataframe/dataframe.rs @@ -28,10 +28,10 @@ use datafusion::error::Result; use datafusion::functions_aggregate::average::avg; use datafusion::functions_aggregate::min_max::max; use datafusion::prelude::*; -use std::fs::File; +use std::fs::{create_dir_all, File}; use std::io::Write; use std::sync::Arc; -use tempfile::tempdir; +use tempfile::{tempdir, TempDir}; /// This example demonstrates using DataFusion's DataFrame API /// @@ -208,15 +208,26 @@ async fn write_out(ctx: &SessionContext) -> Result<()> { ctx.register_table("initial_data", Arc::new(mem_table))?; let df = ctx.table("initial_data").await?; - ctx.sql( - "create external table - test(tablecol1 varchar) - stored as parquet - location './datafusion-examples/test_table/'", - ) - .await? - .collect() - .await?; + // Create a single temp root with subdirectories + let tmp_root = TempDir::new()?; + let examples_root = tmp_root.path().join("datafusion-examples"); + create_dir_all(&examples_root)?; + let table_dir = examples_root.join("test_table"); + let parquet_dir = examples_root.join("test_parquet"); + let csv_dir = examples_root.join("test_csv"); + let json_dir = examples_root.join("test_json"); + create_dir_all(&table_dir)?; + create_dir_all(&parquet_dir)?; + create_dir_all(&csv_dir)?; + create_dir_all(&json_dir)?; + + let create_sql = format!( + "CREATE EXTERNAL TABLE test(tablecol1 varchar) + STORED AS parquet + LOCATION '{}'", + table_dir.display() + ); + ctx.sql(&create_sql).await?.collect().await?; // This is equivalent to INSERT INTO test VALUES ('a'), ('b'), ('c'). // The behavior of write_table depends on the TableProvider's implementation @@ -227,7 +238,7 @@ async fn write_out(ctx: &SessionContext) -> Result<()> { df.clone() .write_parquet( - "./datafusion-examples/test_parquet/", + parquet_dir.to_str().unwrap(), DataFrameWriteOptions::new(), None, ) @@ -235,7 +246,7 @@ async fn write_out(ctx: &SessionContext) -> Result<()> { df.clone() .write_csv( - "./datafusion-examples/test_csv/", + csv_dir.to_str().unwrap(), // DataFrameWriteOptions contains options which control how data is written // such as compression codec DataFrameWriteOptions::new(), @@ -245,7 +256,7 @@ async fn write_out(ctx: &SessionContext) -> Result<()> { df.clone() .write_json( - "./datafusion-examples/test_json/", + json_dir.to_str().unwrap(), DataFrameWriteOptions::new(), None, ) From 2f5c63f22fccf80adba9885fd5f35ae0d61d7c80 Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Sat, 6 Dec 2025 09:25:05 +0300 Subject: [PATCH 10/11] remove cleanup test folders in script --- ci/scripts/rust_example.sh | 9 --------- 1 file changed, 9 deletions(-) diff --git a/ci/scripts/rust_example.sh b/ci/scripts/rust_example.sh index 4ef13e909f331..7a5f7825b4e6d 100755 --- a/ci/scripts/rust_example.sh +++ b/ci/scripts/rust_example.sh @@ -22,15 +22,6 @@ set -e export CARGO_PROFILE_CI_OPT_LEVEL="s" export CARGO_PROFILE_CI_STRIP=true -# Remove leftover files inside test_* directories inside datafusion-examples/ -EXAMPLES_ROOT="datafusion-examples" - -find "${EXAMPLES_ROOT}" -maxdepth 1 -type d -name "test_*" | while read -r test_dir; do - echo "Cleaning directory: $test_dir" - find "$test_dir" -mindepth 1 -maxdepth 1 -exec rm -rf {} + -done - -# Run the examples cd datafusion-examples/examples/ cargo build --profile ci --examples From 383993af05d98a6ee76d6fe96c92661030f93ffd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 6 Dec 2025 07:27:50 -0500 Subject: [PATCH 11/11] Fix example --- .../custom_data_source/custom_file_casts.rs | 15 ++++++++++++++- datafusion/physical-expr/src/expressions/cast.rs | 14 ++++++++++---- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/datafusion-examples/examples/custom_data_source/custom_file_casts.rs b/datafusion-examples/examples/custom_data_source/custom_file_casts.rs index 6d8fd358607d2..ada3f72f455bc 100644 --- a/datafusion-examples/examples/custom_data_source/custom_file_casts.rs +++ b/datafusion-examples/examples/custom_data_source/custom_file_casts.rs @@ -32,7 +32,7 @@ use datafusion::datasource::listing::{ use datafusion::execution::context::SessionContext; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::parquet::arrow::ArrowWriter; -use datafusion::physical_expr::expressions::CastExpr; +use datafusion::physical_expr::expressions::{CastColumnExpr, CastExpr}; use datafusion::physical_expr::PhysicalExpr; use datafusion::prelude::SessionConfig; use datafusion_physical_expr_adapter::{ @@ -192,6 +192,19 @@ impl PhysicalExprAdapter for CustomCastsPhysicalExprAdapter { ); } } + if let Some(cast) = expr.as_any().downcast_ref::() { + let input_data_type = + cast.expr().data_type(&self.physical_file_schema)?; + let output_data_type = cast.data_type(&self.physical_file_schema)?; + if !CastExpr::check_bigger_cast( + cast.target_field().data_type(), + &input_data_type, + ) { + return not_impl_err!( + "Unsupported CAST from {input_data_type} to {output_data_type}" + ); + } + } Ok(Transformed::no(expr)) }) .data() diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index a368aafbc62d5..263a628be8e95 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -98,13 +98,14 @@ impl CastExpr { &self.cast_options } - /// Check if the cast is a widening cast (e.g. from `Int8` to `Int16`). - pub fn is_bigger_cast(&self, src: &DataType) -> bool { - if self.cast_type.eq(src) { + /// Check if casting from the specified source type to the target type is a + /// widening cast (e.g. from `Int8` to `Int16`). + pub fn check_bigger_cast(cast_type: &DataType, src: &DataType) -> bool { + if cast_type.eq(src) { return true; } matches!( - (src, &self.cast_type), + (src, cast_type), (Int8, Int16 | Int32 | Int64) | (Int16, Int32 | Int64) | (Int32, Int64) @@ -119,6 +120,11 @@ impl CastExpr { | (Utf8, LargeUtf8) ) } + + /// Check if the cast is a widening cast (e.g. from `Int8` to `Int16`). + pub fn is_bigger_cast(&self, src: &DataType) -> bool { + Self::check_bigger_cast(&self.cast_type, src) + } } impl fmt::Display for CastExpr {