Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions ci/scripts/rust_example.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,26 @@ export CARGO_PROFILE_CI_STRIP=true
cd datafusion-examples/examples/
cargo build --profile ci --examples

files=$(ls .)
for filename in $files
do
example_name=`basename $filename ".rs"`
# Skip tests that rely on external storage and flight
if [ ! -d $filename ]; then
cargo run --profile ci --example $example_name
fi
SKIP_LIST=("external_dependency" "flight" "ffi")

skip_example() {
local name="$1"
for skip in "${SKIP_LIST[@]}"; do
if [ "$name" = "$skip" ]; then
return 0
fi
done
return 1
}

for dir in */; do
example_name=$(basename "$dir")

if skip_example "$example_name"; then
echo "Skipping $example_name"
continue
fi

echo "Running example group: $example_name"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I ran this script twice, I got an error the second time around:

./ci/scripts/rust_example.sh
./ci/scripts/rust_example.sh

The second run made this:

Running example: deserialize_to_struct
Running example group: datafusion-examples
error: no example target named `datafusion-examples` in default-run packages
help: available example targets:
    builtin_functions
    custom_data_source
    data_io
    dataframe
    execution_monitoring
    external_dependency
    flight
    proto
    query_planning
    sql_ops
    udf

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue is that some of the examples leave files around:

andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion2$ find datafusion-examples/examples/datafusion-examples
datafusion-examples/examples/datafusion-examples
datafusion-examples/examples/datafusion-examples/test_json
datafusion-examples/examples/datafusion-examples/test_json/lQqU6IGDpwHJyGQB_0.json
datafusion-examples/examples/datafusion-examples/test_json/KKsELYFJ4st3GUOa_0.json
datafusion-examples/examples/datafusion-examples/test_csv
datafusion-examples/examples/datafusion-examples/test_csv/9Alj5dF7w72vLpp0_0.csv.gz
datafusion-examples/examples/datafusion-examples/test_csv/4Bh8tCQllXXE43A9_0.csv.gz
datafusion-examples/examples/datafusion-examples/test_table
datafusion-examples/examples/datafusion-examples/test_table/Y2CVyUXOXRPhDhFA_0.parquet
datafusion-examples/examples/datafusion-examples/test_table/2nskzCtDZnEJR851_0.parquet
datafusion-examples/examples/datafusion-examples/test_parquet
datafusion-examples/examples/datafusion-examples/test_parquet/4CIKeX6U9Ik1YYgY_0.parquet
datafusion-examples/examples/datafusion-examples/test_parquet/ooUJiB7QMqY0BRdN_0.parquet

Maybe we could change the examples to use a temporary directory or else have this script clean up before running

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point! Let's do it.

cargo run --profile ci --example "$example_name" -- all
done
45 changes: 35 additions & 10 deletions datafusion-examples/examples/builtin_functions/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
//!
//! ## Usage
//! ```bash
//! cargo run --example builtin_functions -- [date_time|function_factory|regexp]
//! cargo run --example builtin_functions -- [all|date_time|function_factory|regexp]
//! ```
//!
//! Each subcommand runs a corresponding example:
//! - `all` — run all examples included in this module
//! - `date_time` — examples of date-time related functions and queries
//! - `function_factory` — register `CREATE FUNCTION` handler to implement SQL macros
//! - `regexp` — examples of using regular expression functions
Expand All @@ -38,6 +39,7 @@ use std::str::FromStr;
use datafusion::error::{DataFusionError, Result};

enum ExampleKind {
All,
DateTime,
FunctionFactory,
Regexp,
Expand All @@ -46,6 +48,7 @@ enum ExampleKind {
impl AsRef<str> for ExampleKind {
fn as_ref(&self) -> &str {
match self {
Self::All => "all",
Self::DateTime => "date_time",
Self::FunctionFactory => "function_factory",
Self::Regexp => "regexp",
Expand All @@ -58,6 +61,7 @@ impl FromStr for ExampleKind {

fn from_str(s: &str) -> Result<Self> {
match s {
"all" => Ok(Self::All),
"date_time" => Ok(Self::DateTime),
"function_factory" => Ok(Self::FunctionFactory),
"regexp" => Ok(Self::Regexp),
Expand All @@ -67,12 +71,38 @@ impl FromStr for ExampleKind {
}

impl ExampleKind {
const ALL: [Self; 3] = [Self::DateTime, Self::FunctionFactory, Self::Regexp];
const ALL_VARIANTS: [Self; 4] = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When looking at the amount of boiler plate code, I think we can use strum to do the same thing https://crates.io/crates/strum

I know in general adding a new dependency is something we try to avoid, but given strum is already in the workspace, using it in examples seems reasonable to me

Specifically,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually considering using strum for this part, and I agree that it would simplify the boilerplate. Since we try to keep dependencies minimal in examples, I initially decided not to introduce it and instead went with the more explicit implementation to keep the PR focused.

To avoid scope creep for this change, I’d prefer to finish this PR as-is.
After it is merged, I’d be happy to open a follow-up PR to refactor this piece using strum, especially since it’s already part of the workspace. That would make the examples cleaner and easier to maintain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this sounds like a good plan

Self::All,
Self::DateTime,
Self::FunctionFactory,
Self::Regexp,
];

const RUNNABLE_VARIANTS: [Self; 3] =
[Self::DateTime, Self::FunctionFactory, Self::Regexp];

const EXAMPLE_NAME: &str = "builtin_functions";

fn variants() -> Vec<&'static str> {
Self::ALL.iter().map(|x| x.as_ref()).collect()
Self::ALL_VARIANTS
.iter()
.map(|example| example.as_ref())
.collect()
}

async fn run(&self) -> Result<()> {
match self {
ExampleKind::All => {
for example in ExampleKind::RUNNABLE_VARIANTS {
println!("Running example: {}", example.as_ref());
Box::pin(example.run()).await?;
}
}
ExampleKind::DateTime => date_time::date_time().await?,
ExampleKind::FunctionFactory => function_factory::function_factory().await?,
ExampleKind::Regexp => regexp::regexp().await?,
}
Ok(())
}
}

Expand All @@ -89,11 +119,6 @@ async fn main() -> Result<()> {
DataFusionError::Execution("Missing argument".to_string())
})?;

match arg.parse::<ExampleKind>()? {
ExampleKind::DateTime => date_time::date_time().await?,
ExampleKind::FunctionFactory => function_factory::function_factory().await?,
ExampleKind::Regexp => regexp::regexp().await?,
}

Ok(())
let example = arg.parse::<ExampleKind>()?;
example.run().await
}
33 changes: 27 additions & 6 deletions datafusion-examples/examples/builtin_functions/regexp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

//! See `main.rs` for how to run it.

use std::{fs::File, io::Write};

use datafusion::common::{assert_batches_eq, assert_contains};
use datafusion::error::Result;
use datafusion::prelude::*;
use tempfile::tempdir;

/// This example demonstrates how to use the regexp_* functions
///
Expand All @@ -32,12 +35,30 @@ use datafusion::prelude::*;
/// https://docs.rs/regex/latest/regex/#grouping-and-flags
pub async fn regexp() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv(
"examples",
"datafusion/physical-expr/tests/data/regex.csv",
CsvReadOptions::new(),
)
.await?;
// content from file 'datafusion/physical-expr/tests/data/regex.csv'
let csv_data = r#"values,patterns,replacement,flags
abc,^(a),bb\1bb,i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why inline this content? It is fine, I am just curious

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I inlined the CSV because I ran into issues reading files that live outside the datafusion-examplescrate (#18946 (comment)) The relative path works locally but not in CI, so the example couldn’t reliably find datafusion/physical-expr/tests/data/regex.csv

To keep the example self-contained and stable in CI, I embedded the data and wrote it to a temp file before registering it.

Longer term, a cleaner solution might be to store example data directly inside the datafusion-examples crate (e.g. an examples/data/folder). I’d be happy to work with that in a follow-up.

ABC,^(A).*,B,i
aBc,(b|d),e,i
AbC,(B|D),e,
aBC,^(b|c),d,
4000,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
4010,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
Düsseldorf,[\p{Letter}-]+,München,
Москва,[\p{L}-]+,Moscow,
Köln,[a-zA-Z]ö[a-zA-Z]{2},Koln,
اليوم,^\p{Arabic}+$,Today,"#;
let dir = tempdir()?;
let file_path = dir.path().join("regex.csv");
{
let mut file = File::create(&file_path)?;
// write CSV data
file.write_all(csv_data.as_bytes())?;
} // scope closes the file
let file_path = file_path.to_str().unwrap();

ctx.register_csv("examples", file_path, CsvReadOptions::new())
.await?;

//
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion::datasource::listing::{
use datafusion::execution::context::SessionContext;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::physical_expr::expressions::CastExpr;
use datafusion::physical_expr::expressions::{CastColumnExpr, CastExpr};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::prelude::SessionConfig;
use datafusion_physical_expr_adapter::{
Expand Down Expand Up @@ -192,6 +192,19 @@ impl PhysicalExprAdapter for CustomCastsPhysicalExprAdapter {
);
}
}
if let Some(cast) = expr.as_any().downcast_ref::<CastColumnExpr>() {
let input_data_type =
cast.expr().data_type(&self.physical_file_schema)?;
let output_data_type = cast.data_type(&self.physical_file_schema)?;
if !CastExpr::check_bigger_cast(
cast.target_field().data_type(),
&input_data_type,
) {
return not_impl_err!(
"Unsupported CAST from {input_data_type} to {output_data_type}"
);
}
}
Ok(Transformed::no(expr))
})
.data()
Expand Down
80 changes: 58 additions & 22 deletions datafusion-examples/examples/custom_data_source/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
//!
//! ## Usage
//! ```bash
//! cargo run --example custom_data_source -- [csv_json_opener|csv_sql_streaming|custom_datasource|custom_file_casts|custom_file_format|default_column_values|file_stream_provider]
//! cargo run --example custom_data_source -- [all|csv_json_opener|csv_sql_streaming|custom_datasource|custom_file_casts|custom_file_format|default_column_values|file_stream_provider]
//! ```
//!
//! Each subcommand runs a corresponding example:
//! - `all` — run all examples included in this module
//! - `csv_json_opener` — use low level FileOpener APIs to read CSV/JSON into Arrow RecordBatches
//! - `csv_sql_streaming` — build and run a streaming query plan from a SQL statement against a local CSV file
//! - `custom_datasource` — run queries against a custom datasource (TableProvider)
Expand All @@ -46,25 +47,27 @@ use std::str::FromStr;
use datafusion::error::{DataFusionError, Result};

enum ExampleKind {
All,
CsvJsonOpener,
CsvSqlStreaming,
CustomDatasource,
CustomFileCasts,
CustomFileFormat,
DefaultColumnValues,
FileFtreamProvider,
FileStreamProvider,
}

impl AsRef<str> for ExampleKind {
fn as_ref(&self) -> &str {
match self {
Self::All => "all",
Self::CsvJsonOpener => "csv_json_opener",
Self::CsvSqlStreaming => "csv_sql_streaming",
Self::CustomDatasource => "custom_datasource",
Self::CustomFileCasts => "custom_file_casts",
Self::CustomFileFormat => "custom_file_format",
Self::DefaultColumnValues => "default_column_values",
Self::FileFtreamProvider => "file_stream_provider",
Self::FileStreamProvider => "file_stream_provider",
}
}
}
Expand All @@ -74,33 +77,79 @@ impl FromStr for ExampleKind {

fn from_str(s: &str) -> Result<Self> {
match s {
"all" => Ok(Self::All),
"csv_json_opener" => Ok(Self::CsvJsonOpener),
"csv_sql_streaming" => Ok(Self::CsvSqlStreaming),
"custom_datasource" => Ok(Self::CustomDatasource),
"custom_file_casts" => Ok(Self::CustomFileCasts),
"custom_file_format" => Ok(Self::CustomFileFormat),
"default_column_values" => Ok(Self::DefaultColumnValues),
"file_stream_provider" => Ok(Self::FileFtreamProvider),
"file_stream_provider" => Ok(Self::FileStreamProvider),
_ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))),
}
}
}

impl ExampleKind {
const ALL: [Self; 7] = [
const ALL_VARIANTS: [Self; 8] = [
Self::All,
Self::CsvJsonOpener,
Self::CsvSqlStreaming,
Self::CustomDatasource,
Self::CustomFileCasts,
Self::CustomFileFormat,
Self::DefaultColumnValues,
Self::FileFtreamProvider,
Self::FileStreamProvider,
];

const RUNNABLE_VARIANTS: [Self; 7] = [
Self::CsvJsonOpener,
Self::CsvSqlStreaming,
Self::CustomDatasource,
Self::CustomFileCasts,
Self::CustomFileFormat,
Self::DefaultColumnValues,
Self::FileStreamProvider,
];

const EXAMPLE_NAME: &str = "custom_data_source";

fn variants() -> Vec<&'static str> {
Self::ALL.iter().map(|x| x.as_ref()).collect()
Self::ALL_VARIANTS
.iter()
.map(|example| example.as_ref())
.collect()
}

async fn run(&self) -> Result<()> {
match self {
ExampleKind::All => {
for example in ExampleKind::RUNNABLE_VARIANTS {
println!("Running example: {}", example.as_ref());
Box::pin(example.run()).await?;
}
}
ExampleKind::CsvJsonOpener => csv_json_opener::csv_json_opener().await?,
ExampleKind::CsvSqlStreaming => {
csv_sql_streaming::csv_sql_streaming().await?
}
ExampleKind::CustomDatasource => {
custom_datasource::custom_datasource().await?
}
ExampleKind::CustomFileCasts => {
custom_file_casts::custom_file_casts().await?
}
ExampleKind::CustomFileFormat => {
custom_file_format::custom_file_format().await?
}
ExampleKind::DefaultColumnValues => {
default_column_values::default_column_values().await?
}
ExampleKind::FileStreamProvider => {
file_stream_provider::file_stream_provider().await?
}
}
Ok(())
}
}

Expand All @@ -117,19 +166,6 @@ async fn main() -> Result<()> {
DataFusionError::Execution("Missing argument".to_string())
})?;

match arg.parse::<ExampleKind>()? {
ExampleKind::CsvJsonOpener => csv_json_opener::csv_json_opener().await?,
ExampleKind::CsvSqlStreaming => csv_sql_streaming::csv_sql_streaming().await?,
ExampleKind::CustomDatasource => custom_datasource::custom_datasource().await?,
ExampleKind::CustomFileCasts => custom_file_casts::custom_file_casts().await?,
ExampleKind::CustomFileFormat => custom_file_format::custom_file_format().await?,
ExampleKind::DefaultColumnValues => {
default_column_values::default_column_values().await?
}
ExampleKind::FileFtreamProvider => {
file_stream_provider::file_stream_provider().await?
}
}

Ok(())
let example = arg.parse::<ExampleKind>()?;
example.run().await
}
Loading