From 789897b2fb6eb05d7631e37fffce12cdb83bb4a1 Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Thu, 12 Dec 2024 23:44:55 +0100 Subject: [PATCH 1/7] fix: Ignore empty files in ListingTable when listing files with or without partition filters, as well as when inferring schema --- .../core/src/datasource/listing/helpers.rs | 111 +++++++++++++++++- .../core/src/datasource/listing/table.rs | 2 + 2 files changed, 112 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index a601aec32f162..730130e69765f 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -171,7 +171,13 @@ impl Partition { trace!("Listing partition {}", self.path); let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty()); let result = store.list_with_delimiter(prefix).await?; - self.files = Some(result.objects); + self.files = Some( + result + .objects + .into_iter() + .filter(|object_meta| object_meta.size > 0) + .collect(), + ); Ok((self, result.common_prefixes)) } } @@ -418,6 +424,7 @@ pub async fn pruned_partition_list<'a>( table_path .list_all_files(ctx, store, file_extension) .await? + .try_filter(|object_meta| futures::future::ready(object_meta.size > 0)) .map_ok(|object_meta| object_meta.into()), )); } @@ -566,6 +573,7 @@ mod tests { async fn test_pruned_partition_list_empty() { let (store, state) = make_test_store_and_state(&[ ("tablepath/mypartition=val1/notparquetfile", 100), + ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), ("tablepath/file.parquet", 100), ]); let filter = Expr::eq(col("mypartition"), lit("val1")); @@ -590,6 +598,7 @@ mod tests { let (store, state) = make_test_store_and_state(&[ ("tablepath/mypartition=val1/file.parquet", 100), ("tablepath/mypartition=val2/file.parquet", 100), + ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0), ("tablepath/mypartition=val1/other=val3/file.parquet", 100), ]); let filter = Expr::eq(col("mypartition"), lit("val1")); @@ -671,6 +680,106 @@ mod tests { ); } + fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) { + ( + partition.path.as_ref(), + partition.depth, + partition + .files + .as_ref() + .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect()) + .unwrap_or(Vec::new()), + ) + } + + #[tokio::test] + async fn test_list_partition() { + let (store, _) = make_test_store_and_state(&[ + ("tablepath/part1=p1v1/file.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100), + ("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100), + ("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0), + ]); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 0, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec![]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ] + ); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 1, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v2/part2=p2v1", 2, vec![]), + ("tablepath/part1=p1v2/part2=p2v2", 2, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ("tablepath/part1=p1v3/part2=p2v1", 2, vec![]), + ] + ); + + let partitions = list_partitions( + store.as_ref(), + &ListingTableUrl::parse("file:///tablepath/").unwrap(), + 2, + None, + ) + .await + .expect("listing partitions failed"); + + assert_eq!( + &partitions + .iter() + .map(describe_partition) + .collect::>(), + &vec![ + ("tablepath", 0, vec![]), + ("tablepath/part1=p1v1", 1, vec!["file.parquet"]), + ("tablepath/part1=p1v2", 1, vec![]), + ("tablepath/part1=p1v3", 1, vec![]), + ( + "tablepath/part1=p1v2/part2=p2v1", + 2, + vec!["file1.parquet", "file2.parquet"] + ), + ("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]), + ("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]), + ] + ); + } + #[test] fn test_parse_partitions_for_path() { assert_eq!( diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index ffe49dd2ba116..624b4c20628b2 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -470,6 +470,8 @@ impl ListingOptions { let files: Vec<_> = table_path .list_all_files(state, store.as_ref(), &self.file_extension) .await? + // Empty files cannot affect schema but may throw when trying to read for it + .try_filter(|object_meta| future::ready(object_meta.size > 0)) .try_collect() .await?; From c5d46f26fcd5dc99e9468f5aee66ea2eff199f04 Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Fri, 13 Dec 2024 09:46:14 +0100 Subject: [PATCH 2/7] clippy --- datafusion/core/src/datasource/listing/helpers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 730130e69765f..c8379a6431483 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -688,7 +688,7 @@ mod tests { .files .as_ref() .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect()) - .unwrap_or(Vec::new()), + .unwrap_or_default(), ) } From 61c5e0d3d599d3047e5460f576f0f7c6e6508ed8 Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Fri, 13 Dec 2024 10:16:22 +0100 Subject: [PATCH 3/7] fix csv and json tests --- .../core/src/datasource/file_format/csv.rs | 41 ++++--------------- .../core/src/datasource/file_format/json.rs | 8 +--- 2 files changed, 11 insertions(+), 38 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 9c96c682865fd..0278cd5e6514b 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -1259,18 +1259,13 @@ mod tests { Ok(()) } - /// Read a single empty csv file in parallel + /// Read a single empty csv file /// /// empty_0_byte.csv: /// (file is empty) - #[rstest(n_partitions, case(1), case(2), case(3), case(4))] #[tokio::test] - async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> { - let config = SessionConfig::new() - .with_repartition_file_scans(true) - .with_repartition_file_min_size(0) - .with_target_partitions(n_partitions); - let ctx = SessionContext::new_with_config(config); + async fn test_csv_empty_file() -> Result<()> { + let ctx = SessionContext::new(); ctx.register_csv( "empty", "tests/data/empty_0_byte.csv", @@ -1278,32 +1273,24 @@ mod tests { ) .await?; - // Require a predicate to enable repartition for the optimizer let query = "select * from empty where random() > 0.5;"; let query_result = ctx.sql(query).await?.collect().await?; - let actual_partitions = count_query_csv_partitions(&ctx, query).await?; #[rustfmt::skip] let expected = ["++", "++"]; assert_batches_eq!(expected, &query_result); - assert_eq!(1, actual_partitions); // Won't get partitioned if all files are empty Ok(()) } - /// Read a single empty csv file with header in parallel + /// Read a single empty csv file with header /// /// empty.csv: /// c1,c2,c3 - #[rstest(n_partitions, case(1), case(2), case(3))] #[tokio::test] - async fn test_csv_parallel_empty_with_header(n_partitions: usize) -> Result<()> { - let config = SessionConfig::new() - .with_repartition_file_scans(true) - .with_repartition_file_min_size(0) - .with_target_partitions(n_partitions); - let ctx = SessionContext::new_with_config(config); + async fn test_csv_empty_with_header() -> Result<()> { + let ctx = SessionContext::new(); ctx.register_csv( "empty", "tests/data/empty.csv", @@ -1311,21 +1298,18 @@ mod tests { ) .await?; - // Require a predicate to enable repartition for the optimizer let query = "select * from empty where random() > 0.5;"; let query_result = ctx.sql(query).await?.collect().await?; - let actual_partitions = count_query_csv_partitions(&ctx, query).await?; #[rustfmt::skip] let expected = ["++", "++"]; assert_batches_eq!(expected, &query_result); - assert_eq!(n_partitions, actual_partitions); Ok(()) } - /// Read multiple empty csv files in parallel + /// Read multiple empty csv files /// /// all_empty /// ├── empty0.csv @@ -1334,14 +1318,9 @@ mod tests { /// /// empty0.csv/empty1.csv/empty2.csv: /// (file is empty) - #[rstest(n_partitions, case(1), case(2), case(3), case(4))] #[tokio::test] - async fn test_csv_parallel_multiple_empty_files(n_partitions: usize) -> Result<()> { - let config = SessionConfig::new() - .with_repartition_file_scans(true) - .with_repartition_file_min_size(0) - .with_target_partitions(n_partitions); - let ctx = SessionContext::new_with_config(config); + async fn test_csv_multiple_empty_files() -> Result<()> { + let ctx = SessionContext::new(); let file_format = Arc::new(CsvFormat::default().with_has_header(false)); let listing_options = ListingOptions::new(file_format.clone()) .with_file_extension(file_format.get_ext()); @@ -1358,13 +1337,11 @@ mod tests { // Require a predicate to enable repartition for the optimizer let query = "select * from empty where random() > 0.5;"; let query_result = ctx.sql(query).await?.collect().await?; - let actual_partitions = count_query_csv_partitions(&ctx, query).await?; #[rustfmt::skip] let expected = ["++", "++"]; assert_batches_eq!(expected, &query_result); - assert_eq!(1, actual_partitions); // Won't get partitioned if all files are empty Ok(()) } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index e97853e9e7d72..4bdf336881c9c 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -619,13 +619,11 @@ mod tests { Ok(()) } - #[rstest(n_partitions, case(1), case(2), case(3), case(4))] #[tokio::test] - async fn it_can_read_empty_ndjson_in_parallel(n_partitions: usize) -> Result<()> { + async fn it_can_read_empty_ndjson() -> Result<()> { let config = SessionConfig::new() .with_repartition_file_scans(true) - .with_repartition_file_min_size(0) - .with_target_partitions(n_partitions); + .with_repartition_file_min_size(0); let ctx = SessionContext::new_with_config(config); @@ -638,7 +636,6 @@ mod tests { let query = "SELECT * FROM json_parallel_empty WHERE random() > 0.5;"; let result = ctx.sql(query).await?.collect().await?; - let actual_partitions = count_num_partitions(&ctx, query).await?; #[rustfmt::skip] let expected = [ @@ -647,7 +644,6 @@ mod tests { ]; assert_batches_eq!(expected, &result); - assert_eq!(1, actual_partitions); Ok(()) } From af25b268ced4bdc19c90bfdc7223a0b9e4bf11f7 Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Fri, 13 Dec 2024 15:58:21 +0100 Subject: [PATCH 4/7] add testing for parquet --- .../src/datasource/file_format/parquet.rs | 77 ++++++++++++++++++- parquet-testing | 2 +- 2 files changed, 76 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 1d08de172273f..79ab6eeebe39a 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1312,7 +1312,7 @@ mod tests { use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::physical_plan::metrics::MetricValue; - use crate::prelude::{SessionConfig, SessionContext}; + use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use arrow::array::{Array, ArrayRef, StringArray}; use arrow_array::types::Int32Type; use arrow_array::{DictionaryArray, Int32Array, Int64Array}; @@ -1323,8 +1323,8 @@ mod tests { as_float64_array, as_int32_array, as_timestamp_nanosecond_array, }; use datafusion_common::config::ParquetOptions; - use datafusion_common::ScalarValue; use datafusion_common::ScalarValue::Utf8; + use datafusion_common::{assert_batches_eq, ScalarValue}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; @@ -2251,6 +2251,79 @@ mod tests { scan_format(state, &*format, &testdata, file_name, projection, limit).await } + #[tokio::test] + async fn test_read_parquet() -> Result<()> { + let testdata = crate::test_util::parquet_test_data(); + let path = format!("{testdata}/alltypes_tiny_pages.parquet"); + let file = File::open(path).await.unwrap(); + let options = ArrowReaderOptions::new().with_page_index(true); + let builder = + ParquetRecordBatchStreamBuilder::new_with_options(file, options.clone()) + .await + .unwrap() + .metadata() + .clone(); + check_page_index_validation(builder.column_index(), builder.offset_index()); + + let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); + let file = File::open(path).await.unwrap(); + + let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options) + .await + .unwrap() + .metadata() + .clone(); + check_page_index_validation(builder.column_index(), builder.offset_index()); + + Ok(()) + } + + #[tokio::test] + async fn test_read_empty_parquet() -> Result<()> { + let testdata = crate::test_util::parquet_test_data(); + let path = format!("{testdata}/empty.parquet"); + + let ctx = SessionContext::new(); + + let df = ctx + .read_parquet(&path, ParquetReadOptions::default()) + .await + .expect("read_parquet should succeed"); + + let result = df.collect().await?; + #[rustfmt::skip] + let expected = ["++", + "++"]; + assert_batches_eq!(expected, &result); + + Ok(()) + } + + #[tokio::test] + async fn test_read_partitioned_empty_parquet() -> Result<()> { + let testdata = crate::test_util::parquet_test_data(); + let path = format!("{testdata}/partitioned/"); + + let ctx = SessionContext::new(); + + let df = ctx + .read_parquet( + &path, + ParquetReadOptions::new() + .table_partition_cols(vec![("col1".to_string(), DataType::Utf8)]), + ) + .await + .expect("read_parquet should succeed"); + + let result = df.collect().await?; + #[rustfmt::skip] + let expected = ["++", + "++"]; + assert_batches_eq!(expected, &result); + + Ok(()) + } + fn build_ctx(store_url: &url::Url) -> Arc { let tmp_dir = tempfile::TempDir::new().unwrap(); let local = Arc::new( diff --git a/parquet-testing b/parquet-testing index e45cd23f784aa..a87256729f1ab 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit e45cd23f784aab3d6bf0701f8f4e621469ed3be7 +Subproject commit a87256729f1ab8a70474e04ef5d7f87d2bdeea86 From 53d96be985991e62348b82ab2084b908c90a30d8 Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Fri, 13 Dec 2024 16:05:23 +0100 Subject: [PATCH 5/7] cleanup --- .../src/datasource/file_format/parquet.rs | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 79ab6eeebe39a..77524b275bb35 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -2251,33 +2251,6 @@ mod tests { scan_format(state, &*format, &testdata, file_name, projection, limit).await } - #[tokio::test] - async fn test_read_parquet() -> Result<()> { - let testdata = crate::test_util::parquet_test_data(); - let path = format!("{testdata}/alltypes_tiny_pages.parquet"); - let file = File::open(path).await.unwrap(); - let options = ArrowReaderOptions::new().with_page_index(true); - let builder = - ParquetRecordBatchStreamBuilder::new_with_options(file, options.clone()) - .await - .unwrap() - .metadata() - .clone(); - check_page_index_validation(builder.column_index(), builder.offset_index()); - - let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); - let file = File::open(path).await.unwrap(); - - let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options) - .await - .unwrap() - .metadata() - .clone(); - check_page_index_validation(builder.column_index(), builder.offset_index()); - - Ok(()) - } - #[tokio::test] async fn test_read_empty_parquet() -> Result<()> { let testdata = crate::test_util::parquet_test_data(); From 38cdbf26335ca6bbb8e9ca2ed373bdd74b1615ae Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Fri, 13 Dec 2024 16:39:50 +0100 Subject: [PATCH 6/7] fix parquet tests --- .../core/src/datasource/file_format/parquet.rs | 17 ++++++++++++----- parquet-testing | 2 +- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 77524b275bb35..383fd65752349 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -2251,10 +2251,12 @@ mod tests { scan_format(state, &*format, &testdata, file_name, projection, limit).await } + /// Test that 0-byte files don't break while reading #[tokio::test] async fn test_read_empty_parquet() -> Result<()> { - let testdata = crate::test_util::parquet_test_data(); - let path = format!("{testdata}/empty.parquet"); + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}/empty.parquet", tmp_dir.path().to_string_lossy()); + File::create(&path).await?; let ctx = SessionContext::new(); @@ -2272,16 +2274,21 @@ mod tests { Ok(()) } + /// Test that 0-byte files don't break while reading #[tokio::test] async fn test_read_partitioned_empty_parquet() -> Result<()> { - let testdata = crate::test_util::parquet_test_data(); - let path = format!("{testdata}/partitioned/"); + let tmp_dir = tempfile::TempDir::new().unwrap(); + let partition_dir = tmp_dir.path().join("col1=a"); + std::fs::create_dir(&partition_dir).unwrap(); + File::create(partition_dir.join("empty.parquet")) + .await + .unwrap(); let ctx = SessionContext::new(); let df = ctx .read_parquet( - &path, + tmp_dir.path().to_str().unwrap(), ParquetReadOptions::new() .table_partition_cols(vec![("col1".to_string(), DataType::Utf8)]), ) diff --git a/parquet-testing b/parquet-testing index a87256729f1ab..e45cd23f784aa 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit a87256729f1ab8a70474e04ef5d7f87d2bdeea86 +Subproject commit e45cd23f784aab3d6bf0701f8f4e621469ed3be7 From 3f58b0270a89cdabd9062cf204cfb69687e76e48 Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Mon, 16 Dec 2024 10:03:20 +0100 Subject: [PATCH 7/7] document describe_partition, add back repartition options to one of the csv empty files tests --- datafusion/core/src/datasource/file_format/csv.rs | 7 ++++++- datafusion/core/src/datasource/listing/helpers.rs | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 0278cd5e6514b..e9a93475d3ce4 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -1320,7 +1320,12 @@ mod tests { /// (file is empty) #[tokio::test] async fn test_csv_multiple_empty_files() -> Result<()> { - let ctx = SessionContext::new(); + // Testing that partitioning doesn't break with empty files + let config = SessionConfig::new() + .with_repartition_file_scans(true) + .with_repartition_file_min_size(0) + .with_target_partitions(4); + let ctx = SessionContext::new_with_config(config); let file_format = Arc::new(CsvFormat::default().with_has_header(false)); let listing_options = ListingOptions::new(file_format.clone()) .with_file_extension(file_format.get_ext()); diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index c8379a6431483..228b9a4e9f6bf 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -680,6 +680,7 @@ mod tests { ); } + /// Describe a partition as a (path, depth, files) tuple for easier assertions fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) { ( partition.path.as_ref(),