From 05d3ef2d8412b6b58947484c316aa2bc1a1d74cf Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 19 Jun 2025 10:16:11 -0700 Subject: [PATCH 1/6] minor: Avoid parquet read failure if the folder doesn't end with slash --- datafusion/core/src/execution/context/mod.rs | 11 ++- .../core/src/execution/context/parquet.rs | 78 ++++++++++++++++++- datafusion/datasource/src/url.rs | 22 ++++++ .../sqllogictest/test_files/parquet.slt | 48 ++++++++++++ 4 files changed, 154 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index dbe5c2c00f17e..71dc2a0eb0f2e 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1327,7 +1327,7 @@ impl SessionContext { table_paths: P, options: impl ReadOptions<'a>, ) -> Result { - let table_paths = table_paths.to_urls()?; + let mut table_paths = table_paths.to_urls()?; let session_config = self.copied_config(); let listing_options = options.to_listing_options(&session_config, self.copied_table_options()); @@ -1339,9 +1339,14 @@ impl SessionContext { } // check if the file extension matches the expected extension - for path in &table_paths { + for path in table_paths.iter_mut() { let file_path = path.as_str(); - if !file_path.ends_with(option_extension.clone().as_str()) + // if the folder then rewrite a file path as 'path/*.parquet' + if path.is_folder() && path.get_glob().is_none() { + *path = path + .clone() + .with_glob(format!("*{option_extension}").as_ref())? + } else if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() { return exec_err!( diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 2fb763bee495f..96cb3d1f121b0 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -107,11 +107,11 @@ mod tests { use crate::test_util::parquet_test_data; use arrow::util::pretty::pretty_format_batches; - use datafusion_common::assert_contains; use datafusion_common::config::TableParquetOptions; + use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq, assert_contains}; use datafusion_execution::config::SessionConfig; - use tempfile::tempdir; + use tempfile::{tempdir, TempDir}; #[tokio::test] async fn read_with_glob_path() -> Result<()> { @@ -400,4 +400,78 @@ mod tests { assert_eq!(total_rows, 5); Ok(()) } + + #[tokio::test] + async fn read_from_parquet_folder() -> Result<()> { + let ctx = SessionContext::new(); + let tmp_dir = TempDir::new()?; + let test_path = tmp_dir.path().to_str().unwrap().to_string(); + + ctx.sql("SELECT 1 a") + .await? + .write_parquet(&test_path, DataFrameWriteOptions::default(), None) + .await?; + + ctx.sql("SELECT 2 a") + .await? + .write_parquet(&test_path, DataFrameWriteOptions::default(), None) + .await?; + + let actual = ctx + .read_parquet(&test_path, ParquetReadOptions::default()) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_sorted_eq!(&[ + "+---+", + "| a |", + "+---+", + "| 2 |", + "| 1 |", + "+---+", + ], &actual); + + let actual = ctx + .read_parquet( + format!("{test_path}/*.parquet"), + ParquetReadOptions::default(), + ) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_sorted_eq!(&[ + "+---+", + "| a |", + "+---+", + "| 2 |", + "| 1 |", + "+---+", + ], &actual); + + Ok(()) + } + + #[tokio::test] + async fn read_dummy_folder() -> Result<()> { + let ctx = SessionContext::new(); + let test_path = "/foo/"; + + let actual = ctx + .read_parquet(test_path, ParquetReadOptions::default()) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "++", + "++", + ], &actual); + + Ok(()) + } } diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index bddfdbcc06d13..348791be9828d 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -282,6 +282,28 @@ impl ListingTableUrl { let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath]; ObjectStoreUrl::parse(url).unwrap() } + + /// Returns true if the [`ListingTableUrl`] points to the folder + pub fn is_folder(&self) -> bool { + self.url.scheme() == "file" && self.is_collection() + } + + /// Return the `url` for [`ListingTableUrl`] + pub fn get_url(&self) -> &Url { + &self.url + } + + /// Return the `glob` for [`ListingTableUrl`] + pub fn get_glob(&self) -> &Option { + &self.glob + } + + /// Returns a copy of current [`ListingTableUrl`] with a specified `glob` + pub fn with_glob(self, glob: &str) -> Result { + let glob = + Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?; + Self::try_new(self.url, Some(glob)) + } } /// Creates a file URL from a potentially relative filesystem path diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index abc6fdab3c8a0..33bb052baa519 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -304,6 +304,54 @@ select count(*) from listing_table; ---- 12 +# Test table pointing to the folder with parquet files(ends with /) +statement ok +CREATE EXTERNAL TABLE listing_table_folder_0 +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet/test_table/'; + +statement ok +set datafusion.execution.listing_table_ignore_subdirectory = true; + +# scan file: 0.parquet 1.parquet 2.parquet +query I +select count(*) from listing_table_folder_0; +---- +9 + +statement ok +set datafusion.execution.listing_table_ignore_subdirectory = false; + +# scan file: 0.parquet 1.parquet 2.parquet 3.parquet +query I +select count(*) from listing_table_folder_0; +---- +12 + +# Test table pointing to the folder with parquet files(doesn't end with /) +statement ok +CREATE EXTERNAL TABLE listing_table_folder_1 +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet/test_table'; + +statement ok +set datafusion.execution.listing_table_ignore_subdirectory = true; + +# scan file: 0.parquet 1.parquet 2.parquet +query I +select count(*) from listing_table_folder_1; +---- +9 + +statement ok +set datafusion.execution.listing_table_ignore_subdirectory = false; + +# scan file: 0.parquet 1.parquet 2.parquet 3.parquet +query I +select count(*) from listing_table_folder_1; +---- +12 + # Clean up statement ok DROP TABLE timestamp_with_tz; From a78de194aeb515a926528bdc165893523956e644 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 19 Jun 2025 10:30:02 -0700 Subject: [PATCH 2/6] minor: Avoid parquet read failure if the folder doesn't end with slash --- .../core/src/execution/context/parquet.rs | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 96cb3d1f121b0..be0a4c69bd364 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -108,7 +108,9 @@ mod tests { use arrow::util::pretty::pretty_format_batches; use datafusion_common::config::TableParquetOptions; - use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq, assert_contains}; + use datafusion_common::{ + assert_batches_eq, assert_batches_sorted_eq, assert_contains, + }; use datafusion_execution::config::SessionConfig; use tempfile::{tempdir, TempDir}; @@ -455,6 +457,43 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_from_parquet_folder_table() -> Result<()> { + let ctx = SessionContext::new(); + let tmp_dir = TempDir::new()?; + let test_path = tmp_dir.path().to_str().unwrap().to_string(); + + ctx.sql("SELECT 1 a") + .await? + .write_parquet(&test_path, DataFrameWriteOptions::default(), None) + .await?; + + ctx.sql("SELECT 2 a") + .await? + .write_parquet(&test_path, DataFrameWriteOptions::default(), None) + .await?; + + ctx.sql(format!("CREATE EXTERNAL TABLE parquet_folder_t1 STORED AS PARQUET LOCATION '{test_path}'").as_ref()) + .await?; + + let actual = ctx + .sql("select * from parquet_folder_t1") + .await? + .collect() + .await?; + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_sorted_eq!(&[ + "+---+", + "| a |", + "+---+", + "| 2 |", + "| 1 |", + "+---+", + ], &actual); + + Ok(()) + } + #[tokio::test] async fn read_dummy_folder() -> Result<()> { let ctx = SessionContext::new(); From 742b135b594ac4711631f5e79a4407dbb25c31b4 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 23 Jun 2025 17:11:16 -0700 Subject: [PATCH 3/6] minor: Avoid parquet read failure if the folder contains non parquet files --- .../src/datasource/listing_table_factory.rs | 11 +++++++++-- .../core/src/execution/context/parquet.rs | 17 +++++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 6a88ad88e5d4e..580fa4be47afb 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -108,10 +108,10 @@ impl TableProviderFactory for ListingTableFactory { (Some(schema), table_partition_cols) }; - let table_path = ListingTableUrl::parse(&cmd.location)?; + let mut table_path = ListingTableUrl::parse(&cmd.location)?; let options = ListingOptions::new(file_format) - .with_file_extension(file_extension) + .with_file_extension(&file_extension) .with_session_config_options(session_state.config()) .with_table_partition_cols(table_partition_cols); @@ -125,6 +125,13 @@ impl TableProviderFactory for ListingTableFactory { // specifically for parquet file format. // See: https://github.com/apache/datafusion/issues/7317 None => { + // if the folder then rewrite a file path as 'path/*.parquet' + // to only read the files the reader can understand + if table_path.is_folder() && table_path.get_glob().is_none() { + table_path = table_path.with_glob( + format!("*.{}", cmd.file_type.to_lowercase()).as_ref(), + )?; + } let schema = options.infer_schema(session_state, &table_path).await?; let df_schema = Arc::clone(&schema).to_dfschema()?; let column_refs: HashSet<_> = cmd diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index be0a4c69bd364..731f7e59ecfaf 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -419,6 +419,12 @@ mod tests { .write_parquet(&test_path, DataFrameWriteOptions::default(), None) .await?; + // Adding CSV to check it is not read with Parquet reader + ctx.sql("SELECT 3 a") + .await? + .write_csv(&test_path, DataFrameWriteOptions::default(), None) + .await?; + let actual = ctx .read_parquet(&test_path, ParquetReadOptions::default()) .await? @@ -436,10 +442,7 @@ mod tests { ], &actual); let actual = ctx - .read_parquet( - format!("{test_path}/*.parquet"), - ParquetReadOptions::default(), - ) + .read_parquet(test_path, ParquetReadOptions::default()) .await? .collect() .await?; @@ -473,6 +476,12 @@ mod tests { .write_parquet(&test_path, DataFrameWriteOptions::default(), None) .await?; + // Adding CSV to check it is not read with Parquet reader + ctx.sql("SELECT 3 a") + .await? + .write_csv(&test_path, DataFrameWriteOptions::default(), None) + .await?; + ctx.sql(format!("CREATE EXTERNAL TABLE parquet_folder_t1 STORED AS PARQUET LOCATION '{test_path}'").as_ref()) .await?; From c2993586ff4267fd2c9574f07b250b23e2a756be Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 23 Jun 2025 17:14:13 -0700 Subject: [PATCH 4/6] minor: Avoid parquet read failure if the folder contains non parquet files --- .../core/src/execution/context/parquet.rs | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 731f7e59ecfaf..da2782ac68537 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -506,6 +506,26 @@ mod tests { #[tokio::test] async fn read_dummy_folder() -> Result<()> { let ctx = SessionContext::new(); + + ctx.sql("CREATE EXTERNAL TABLE parquet_folder_t1_foo STORED AS PARQUET LOCATION '/foo'".to_string().as_ref()) + .await?; + let actual = ctx + .sql("select * from parquet_folder_t1") + .await? + .collect() + .await?; + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "++", + "++", + ], &actual); + + Ok(()) + } + + #[tokio::test] + async fn read_dummy_folder_with_table_api() -> Result<()> { + let ctx = SessionContext::new(); let test_path = "/foo/"; let actual = ctx From b550df0ec172181a3dadd280fa468e11e27cb9cd Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 23 Jun 2025 17:19:30 -0700 Subject: [PATCH 5/6] minor: Avoid parquet read failure if the folder contains non parquet files --- datafusion/core/src/execution/context/mod.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 71dc2a0eb0f2e..dbe5c2c00f17e 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1327,7 +1327,7 @@ impl SessionContext { table_paths: P, options: impl ReadOptions<'a>, ) -> Result { - let mut table_paths = table_paths.to_urls()?; + let table_paths = table_paths.to_urls()?; let session_config = self.copied_config(); let listing_options = options.to_listing_options(&session_config, self.copied_table_options()); @@ -1339,14 +1339,9 @@ impl SessionContext { } // check if the file extension matches the expected extension - for path in table_paths.iter_mut() { + for path in &table_paths { let file_path = path.as_str(); - // if the folder then rewrite a file path as 'path/*.parquet' - if path.is_folder() && path.get_glob().is_none() { - *path = path - .clone() - .with_glob(format!("*{option_extension}").as_ref())? - } else if !file_path.ends_with(option_extension.clone().as_str()) + if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() { return exec_err!( From 79cb057d8ed489162dbfbb2cb220237c046b63d9 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 23 Jun 2025 18:22:17 -0700 Subject: [PATCH 6/6] minor: Avoid parquet read failure if the folder contains non parquet files --- .../core/src/execution/context/parquet.rs | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index da2782ac68537..731f7e59ecfaf 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -506,26 +506,6 @@ mod tests { #[tokio::test] async fn read_dummy_folder() -> Result<()> { let ctx = SessionContext::new(); - - ctx.sql("CREATE EXTERNAL TABLE parquet_folder_t1_foo STORED AS PARQUET LOCATION '/foo'".to_string().as_ref()) - .await?; - let actual = ctx - .sql("select * from parquet_folder_t1") - .await? - .collect() - .await?; - #[cfg_attr(any(), rustfmt::skip)] - assert_batches_eq!(&[ - "++", - "++", - ], &actual); - - Ok(()) - } - - #[tokio::test] - async fn read_dummy_folder_with_table_api() -> Result<()> { - let ctx = SessionContext::new(); let test_path = "/foo/"; let actual = ctx