From ceaa599ce45d9c55a7524bbdd16f7667dc9c7940 Mon Sep 17 00:00:00 2001 From: Mohamed Abdeen Date: Sun, 31 Mar 2024 16:43:36 +0200 Subject: [PATCH 01/11] prevent panic --- .../src/datasource/physical_plan/file_scan_config.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 370ca91a0b0e9..5c114f04f5263 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -32,7 +32,7 @@ use arrow::datatypes::{ArrowNativeType, UInt16Type}; use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_common::stats::Precision; -use datafusion_common::{exec_err, ColumnStatistics, Statistics}; +use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics}; use datafusion_physical_expr::LexOrdering; use log::warn; @@ -256,9 +256,17 @@ impl PartitionColumnProjector { file_batch.columns().len() ); } + let mut cols = file_batch.columns().to_vec(); for &(pidx, sidx) in &self.projected_partition_indexes { - let mut partition_value = Cow::Borrowed(&partition_values[pidx]); + let p_value = + partition_values + .get(pidx) + .ok_or(DataFusionError::Execution( + "Invalid partitioning".to_string(), + ))?; + + let mut partition_value = Cow::Borrowed(p_value); // check if user forgot to dict-encode the partition value let field = self.projected_schema.field(sidx); From 9c7559e2e6ab281b56ab1a15681ca6a38465ef24 Mon Sep 17 00:00:00 2001 From: Mohamed Abdeen Date: Sun, 31 Mar 2024 23:27:17 +0200 Subject: [PATCH 02/11] initial version, bad code --- .../core/src/datasource/listing/table.rs | 79 ++++++++++++++++++- .../src/datasource/listing_table_factory.rs | 5 +- datafusion/core/src/execution/context/mod.rs | 2 +- 3 files changed, 83 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index c1e337b5c44af..adc46b95d058d 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -61,7 +61,8 @@ use datafusion_physical_expr::{ use async_trait::async_trait; use futures::{future, stream, StreamExt, TryStreamExt}; -use object_store::ObjectStore; +use itertools::Itertools; +use object_store::{ObjectMeta, ObjectStore}; /// Configuration for creating a [`ListingTable`] #[derive(Debug, Clone)] @@ -438,6 +439,82 @@ impl ListingOptions { self.format.infer_schema(state, &store, &files).await } + + /// TODO: Finish this doc + pub async fn validate_partitions( + &self, + state: &SessionState, + table_path: &ListingTableUrl, + ) -> Result<()> { + let partitions = self.infer_partitions(state, table_path).await?; + + if partitions.is_empty() { + return Ok(()); + } + + let table_partition_names = self + .table_partition_cols + .iter() + .map(|(name, _)| name.clone()) + .collect::>(); + + if partitions != table_partition_names { + plan_err!( + "Expected partitions to be {:?}, but found {:?}", + partitions, + table_partition_names + ) + } else { + Ok(()) + } + } + + /// Infer the partitioning at the given path on the provided object store. + /// TODO: Finish this doc + async fn infer_partitions( + &self, + state: &SessionState, + table_path: &ListingTableUrl, + ) -> Result> { + let store = state.runtime_env().object_store(table_path)?; + let files: Vec = table_path + .list_all_files(state, store.as_ref(), &self.file_extension) + .await? + .try_collect() + .await?; + + if files.is_empty() { + return Ok(vec![]); + } + + // TODO: Avoid using all the files + let stripped_path_parts = files.iter().map(|object| { + table_path + .strip_prefix(&object.location) + .unwrap() + .collect::>() + }); + + let partitions = stripped_path_parts + .map(|path_parts| { + path_parts + .iter() + .rev() + .skip(1) // skip the file itself + .rev() + .map(|s| s.split_once('=').unwrap().0.to_string()) + .collect::>() + }) + .collect::>(); + + println!("{:?}", partitions); + partitions.into_iter().all_equal_value().map_err(|v| { + DataFusionError::Plan(format!( + "Found mixed partition values on disk {:?}", + v.unwrap() + )) + }) + } } /// Reads data from one or more files via an diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index b616e0181cfc1..3636b4886196e 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -134,13 +134,16 @@ impl TableProviderFactory for ListingTableFactory { .with_collect_stat(state.config().collect_statistics()) .with_file_extension(file_extension) .with_target_partitions(state.config().target_partitions()) - .with_table_partition_cols(table_partition_cols) + .with_table_partition_cols(table_partition_cols.clone()) .with_file_sort_order(cmd.order_exprs.clone()); + options.validate_partitions(state, &table_path).await?; + let resolved_schema = match provided_schema { None => options.infer_schema(state, &table_path).await?, Some(s) => s, }; + let config = ListingTableConfig::new(table_path) .with_listing_options(options) .with_schema(resolved_schema); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 116e45c8c1302..95cf74c4dc324 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1108,7 +1108,7 @@ impl SessionContext { table_ref: impl Into>, provider: Arc, ) -> Result>> { - let table_ref = table_ref.into(); + let table_ref: TableReference = table_ref.into(); let table = table_ref.table().to_owned(); self.state .read() From e1fcdd5172e60544eb530c95e24c66baa9620fa7 Mon Sep 17 00:00:00 2001 From: Mohamed Abdeen Date: Mon, 1 Apr 2024 21:38:37 +0200 Subject: [PATCH 03/11] some error handling --- .../core/src/datasource/listing/table.rs | 53 +++++++++++-------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index adc46b95d058d..d144b00cfd199 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -446,26 +446,27 @@ impl ListingOptions { state: &SessionState, table_path: &ListingTableUrl, ) -> Result<()> { - let partitions = self.infer_partitions(state, table_path).await?; + let inferred = self.infer_partitions(state, table_path).await?; - if partitions.is_empty() { + // no files found on disk or path is not a partitioned table + if inferred.is_empty() { return Ok(()); } let table_partition_names = self .table_partition_cols .iter() - .map(|(name, _)| name.clone()) - .collect::>(); + .map(|(col_name, _)| col_name.clone()) + .collect_vec(); - if partitions != table_partition_names { + if inferred == table_partition_names { + Ok(()) + } else { plan_err!( - "Expected partitions to be {:?}, but found {:?}", - partitions, + "Inferred partitions to be {:?}, but got {:?}", + inferred, table_partition_names ) - } else { - Ok(()) } } @@ -477,9 +478,14 @@ impl ListingOptions { table_path: &ListingTableUrl, ) -> Result> { let store = state.runtime_env().object_store(table_path)?; + + // only use 10 files for inference + // This can fail to detect inconsistent partition keys + // A DFS traversal approach can be helpful let files: Vec = table_path .list_all_files(state, store.as_ref(), &self.file_extension) .await? + .take(10) .try_collect() .await?; @@ -487,32 +493,37 @@ impl ListingOptions { return Ok(vec![]); } - // TODO: Avoid using all the files - let stripped_path_parts = files.iter().map(|object| { + let stripped_path_parts = files.iter().map(|file| { table_path - .strip_prefix(&object.location) + .strip_prefix(&file.location) .unwrap() .collect::>() }); - let partitions = stripped_path_parts + let partition_keys = stripped_path_parts .map(|path_parts| { path_parts .iter() .rev() - .skip(1) // skip the file itself + .skip(1) // get parent only; skip the file itself .rev() - .map(|s| s.split_once('=').unwrap().0.to_string()) + .map(|s| s.split_once('=')) + .filter_map(|result| result.map(|(col_name, _)| col_name.to_string())) .collect::>() }) .collect::>(); - println!("{:?}", partitions); - partitions.into_iter().all_equal_value().map_err(|v| { - DataFusionError::Plan(format!( - "Found mixed partition values on disk {:?}", - v.unwrap() - )) + partition_keys.into_iter().all_equal_value().map_err(|v| { + if let Some(diff) = v { + DataFusionError::Plan(format!( + "Found mixed partition values on disk {:?}", + diff + )) + } else { + DataFusionError::Internal(format!( + "Tried infering partitions using a non-hive partition", + )) // should be unreachable + } }) } } From 40d42b3c53196fa4cc66e36bbe82872959d617ce Mon Sep 17 00:00:00 2001 From: Mohamed Abdeen Date: Mon, 1 Apr 2024 22:32:55 +0200 Subject: [PATCH 04/11] Some slt tests --- .../core/src/datasource/listing/table.rs | 35 +++++++++---- .../test_files/create_external_table.slt | 49 +++++++++++++++++++ 2 files changed, 74 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index d144b00cfd199..318d72a4b84d1 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -440,7 +440,10 @@ impl ListingOptions { self.format.infer_schema(state, &store, &files).await } - /// TODO: Finish this doc + /// Infers the partition columns stored in `LOCATION` and comapres + /// it with the columns provided in `PARTITIONED BY` to help prevent + /// accidental corrupts of partitioned tables. + /// Allows specifying partial partitions. pub async fn validate_partitions( &self, state: &SessionState, @@ -459,15 +462,27 @@ impl ListingOptions { .map(|(col_name, _)| col_name.clone()) .collect_vec(); - if inferred == table_partition_names { - Ok(()) - } else { - plan_err!( + if inferred.len() < table_partition_names.len() { + return plan_err!( "Inferred partitions to be {:?}, but got {:?}", inferred, table_partition_names - ) + ); + } + + // match prefix to allow creating a tables using + // some of the partition keys + for (idx, col) in table_partition_names.iter().enumerate() { + if &inferred[idx] != col { + return plan_err!( + "Inferred partitions to be {:?}, but got {:?}", + inferred, + table_partition_names + ); + } } + + Ok(()) } /// Infer the partitioning at the given path on the provided object store. @@ -503,7 +518,7 @@ impl ListingOptions { let partition_keys = stripped_path_parts .map(|path_parts| { path_parts - .iter() + .into_iter() .rev() .skip(1) // get parent only; skip the file itself .rev() @@ -520,9 +535,9 @@ impl ListingOptions { diff )) } else { - DataFusionError::Internal(format!( - "Tried infering partitions using a non-hive partition", - )) // should be unreachable + DataFusionError::Internal( + "Tried infering partitions using a non-hive partition".to_string(), + ) // should be unreachable } }) } diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index c4a26a5e227d0..ea9832c862315 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -106,3 +106,52 @@ CREATE EXTERNAL TABLE csv_table (column1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123') + +# Wrong partition order error + +statement ok +CREATE EXTERNAL TABLE partitioned (c1 int) +PARTITIONED BY (p1 string, p2 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/'; + +query ITT +INSERT INTO partitioned VALUES (1, 'x', 'y'); +---- +1 + +query error DataFusion error: Error during planning: Inferred partitions to be \["p1", "p2"\], but got \["p2", "p1"\] +CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int) +PARTITIONED BY (p2 string, p1 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/'; + +statement error DataFusion error: Error during planning: Inferred partitions to be \["p1", "p2"\], but got \["p2"\] +CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int) +PARTITIONED BY (p2 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/'; + +# But allows partial partition selection + +statement ok +CREATE EXTERNAL TABLE partial_partitioned (c1 int) +PARTITIONED BY (p1 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/'; + +query IT +SELECT * FROM partial_partitioned; +---- +1 x + +statement ok +CREATE EXTERNAL TABLE inner_partition (c1 int) +PARTITIONED BY (p2 string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/bad_partitioning/p1=x/'; + +query IT +SELECT * FROM inner_partition; +---- +1 y From 7270a10298f8c69995f0e566f5dabd8af8eaa01e Mon Sep 17 00:00:00 2001 From: Mohamed Abdeen Date: Tue, 2 Apr 2024 16:52:06 +0200 Subject: [PATCH 05/11] docs and minor refactors --- datafusion/core/src/datasource/listing/table.rs | 15 ++++++++------- .../datasource/physical_plan/file_scan_config.rs | 2 +- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 318d72a4b84d1..208c1e2c73c2b 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -440,9 +440,10 @@ impl ListingOptions { self.format.infer_schema(state, &store, &files).await } - /// Infers the partition columns stored in `LOCATION` and comapres - /// it with the columns provided in `PARTITIONED BY` to help prevent + /// Infers the partition columns stored in `LOCATION` and compares + /// them with the columns provided in `PARTITIONED BY` to help prevent /// accidental corrupts of partitioned tables. + /// /// Allows specifying partial partitions. pub async fn validate_partitions( &self, @@ -451,7 +452,7 @@ impl ListingOptions { ) -> Result<()> { let inferred = self.infer_partitions(state, table_path).await?; - // no files found on disk or path is not a partitioned table + // no partitioned files found on disk if inferred.is_empty() { return Ok(()); } @@ -486,7 +487,8 @@ impl ListingOptions { } /// Infer the partitioning at the given path on the provided object store. - /// TODO: Finish this doc + /// For performance reasons, it doesn't read all the files on disk + /// and therefore may fail to detect invalid partitioning. async fn infer_partitions( &self, state: &SessionState, @@ -496,7 +498,7 @@ impl ListingOptions { // only use 10 files for inference // This can fail to detect inconsistent partition keys - // A DFS traversal approach can be helpful + // A DFS traversal approach of the store can help here let files: Vec = table_path .list_all_files(state, store.as_ref(), &self.file_extension) .await? @@ -522,8 +524,7 @@ impl ListingOptions { .rev() .skip(1) // get parent only; skip the file itself .rev() - .map(|s| s.split_once('=')) - .filter_map(|result| result.map(|(col_name, _)| col_name.to_string())) + .map(|s| s.split('=').take(1).collect()) .collect::>() }) .collect::>(); diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 5c114f04f5263..1ea411cb6f59f 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -263,7 +263,7 @@ impl PartitionColumnProjector { partition_values .get(pidx) .ok_or(DataFusionError::Execution( - "Invalid partitioning".to_string(), + "Invalid partitioning found on disk".to_string(), ))?; let mut partition_value = Cow::Borrowed(p_value); From 82c8e4c7a0f73a823b8e56c6374992069a028d41 Mon Sep 17 00:00:00 2001 From: Mohamed Abdeen Date: Tue, 2 Apr 2024 17:11:25 +0200 Subject: [PATCH 06/11] cleaning up --- datafusion/core/src/datasource/listing/table.rs | 10 +++++----- .../core/src/datasource/listing_table_factory.rs | 3 +-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 208c1e2c73c2b..8e30a7a849535 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -62,7 +62,7 @@ use datafusion_physical_expr::{ use async_trait::async_trait; use futures::{future, stream, StreamExt, TryStreamExt}; use itertools::Itertools; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::ObjectStore; /// Configuration for creating a [`ListingTable`] #[derive(Debug, Clone)] @@ -499,7 +499,7 @@ impl ListingOptions { // only use 10 files for inference // This can fail to detect inconsistent partition keys // A DFS traversal approach of the store can help here - let files: Vec = table_path + let files: Vec<_> = table_path .list_all_files(state, store.as_ref(), &self.file_extension) .await? .take(10) @@ -514,7 +514,7 @@ impl ListingOptions { table_path .strip_prefix(&file.location) .unwrap() - .collect::>() + .collect_vec() }); let partition_keys = stripped_path_parts @@ -525,9 +525,9 @@ impl ListingOptions { .skip(1) // get parent only; skip the file itself .rev() .map(|s| s.split('=').take(1).collect()) - .collect::>() + .collect_vec() }) - .collect::>(); + .collect_vec(); partition_keys.into_iter().all_equal_value().map_err(|v| { if let Some(diff) = v { diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 3636b4886196e..cd7ba2bc624b0 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -134,7 +134,7 @@ impl TableProviderFactory for ListingTableFactory { .with_collect_stat(state.config().collect_statistics()) .with_file_extension(file_extension) .with_target_partitions(state.config().target_partitions()) - .with_table_partition_cols(table_partition_cols.clone()) + .with_table_partition_cols(table_partition_cols) .with_file_sort_order(cmd.order_exprs.clone()); options.validate_partitions(state, &table_path).await?; @@ -143,7 +143,6 @@ impl TableProviderFactory for ListingTableFactory { None => options.infer_schema(state, &table_path).await?, Some(s) => s, }; - let config = ListingTableConfig::new(table_path) .with_listing_options(options) .with_schema(resolved_schema); From 2a7de51dbbd4d3dd6fe98854f84c2e876251d199 Mon Sep 17 00:00:00 2001 From: Mohamed Abdeen Date: Tue, 2 Apr 2024 18:59:18 +0200 Subject: [PATCH 07/11] fix tests --- datafusion/core/src/datasource/listing_table_factory.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index cd7ba2bc624b0..9fa5bf69df9be 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -128,6 +128,8 @@ impl TableProviderFactory for ListingTableFactory { (Some(schema), table_partition_cols) }; + let is_partitioned = !table_partition_cols.is_empty(); + let table_path = ListingTableUrl::parse(&cmd.location)?; let options = ListingOptions::new(file_format) @@ -137,7 +139,9 @@ impl TableProviderFactory for ListingTableFactory { .with_table_partition_cols(table_partition_cols) .with_file_sort_order(cmd.order_exprs.clone()); - options.validate_partitions(state, &table_path).await?; + if is_partitioned { + options.validate_partitions(state, &table_path).await?; + } let resolved_schema = match provided_schema { None => options.infer_schema(state, &table_path).await?, From 012c03b3eb82824cf1c546b7836312c509b6a358 Mon Sep 17 00:00:00 2001 From: Mohamed Abdeen Date: Tue, 2 Apr 2024 22:08:18 +0200 Subject: [PATCH 08/11] clear err message for single-file partitioned tables --- .../core/src/datasource/listing/table.rs | 34 ++++++++++--------- .../src/datasource/listing_table_factory.rs | 6 +--- .../test_files/create_external_table.slt | 7 ++++ 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 8e30a7a849535..ad9a21e43e0e6 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -450,6 +450,17 @@ impl ListingOptions { state: &SessionState, table_path: &ListingTableUrl, ) -> Result<()> { + if self.table_partition_cols.is_empty() { + return Ok(()); + } + + if !table_path.is_collection() { + return plan_err!( + "Can't create a partitioned table backed by a single file, \ + perhaps the URL is missing a trailing slash?" + ); + } + let inferred = self.infer_partitions(state, table_path).await?; // no partitioned files found on disk @@ -506,10 +517,6 @@ impl ListingOptions { .try_collect() .await?; - if files.is_empty() { - return Ok(vec![]); - } - let stripped_path_parts = files.iter().map(|file| { table_path .strip_prefix(&file.location) @@ -522,25 +529,20 @@ impl ListingOptions { path_parts .into_iter() .rev() - .skip(1) // get parent only; skip the file itself + .skip(1) // get parents only; skip the file itself .rev() .map(|s| s.split('=').take(1).collect()) .collect_vec() }) .collect_vec(); - partition_keys.into_iter().all_equal_value().map_err(|v| { - if let Some(diff) = v { - DataFusionError::Plan(format!( - "Found mixed partition values on disk {:?}", - diff - )) - } else { - DataFusionError::Internal( - "Tried infering partitions using a non-hive partition".to_string(), - ) // should be unreachable + match partition_keys.into_iter().all_equal_value() { + Ok(v) => Ok(v), + Err(None) => Ok(vec![]), + Err(Some(diff)) => { + plan_err!("Found mixed partition values on disk {:?}", diff) } - }) + } } } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 9fa5bf69df9be..cd7ba2bc624b0 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -128,8 +128,6 @@ impl TableProviderFactory for ListingTableFactory { (Some(schema), table_partition_cols) }; - let is_partitioned = !table_partition_cols.is_empty(); - let table_path = ListingTableUrl::parse(&cmd.location)?; let options = ListingOptions::new(file_format) @@ -139,9 +137,7 @@ impl TableProviderFactory for ListingTableFactory { .with_table_partition_cols(table_partition_cols) .with_file_sort_order(cmd.order_exprs.clone()); - if is_partitioned { - options.validate_partitions(state, &table_path).await?; - } + options.validate_partitions(state, &table_path).await?; let resolved_schema = match provided_schema { None => options.infer_schema(state, &table_path).await?, diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index c350e69fc70e9..4b57ec28c2670 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -115,6 +115,13 @@ STORED AS CSV LOCATION 'foo.csv' OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123') +# Partitioned table on a single file +query error DataFusion error: Error during planning: Can't create a partitioned table backed by a single file, perhaps the URL is missing a trailing slash\? +CREATE EXTERNAL TABLE single_file_partition(c1 int) +PARTITIONED BY (p2 string, p1 string) +STORED AS CSV +LOCATION 'foo.csv'; + # Wrong partition order error statement ok From 2d332d940a2cea5952bac67f572c6f65677984b6 Mon Sep 17 00:00:00 2001 From: Mohamed Abdeen Date: Tue, 2 Apr 2024 22:56:33 +0200 Subject: [PATCH 09/11] typo --- datafusion/core/src/datasource/listing/table.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index ad9a21e43e0e6..6d20c11922d65 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -482,8 +482,7 @@ impl ListingOptions { ); } - // match prefix to allow creating a tables using - // some of the partition keys + // match prefix to allow creating tables with partial partitions for (idx, col) in table_partition_names.iter().enumerate() { if &inferred[idx] != col { return plan_err!( From 911aba738fca4b8fa1bf8a1c03b137674db60fdd Mon Sep 17 00:00:00 2001 From: Mohamed Abdeen Date: Thu, 4 Apr 2024 21:43:26 +0200 Subject: [PATCH 10/11] test invalid/mixed partitions on disk --- .../test_files/create_external_table.slt | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 4b57ec28c2670..ad344e782cf16 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -170,3 +170,38 @@ query IT SELECT * FROM inner_partition; ---- 1 y + +# Simulate manual creation of invalid (mixed) partitions on disk + +statement ok +CREATE EXTERNAL TABLE test(name string) +PARTITIONED BY (year string, month string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/manual_partitioning/'; + +statement ok +-- passes the partition check since the previous statement didn't write to disk +CREATE EXTERNAL TABLE test2(name string) +PARTITIONED BY (month string, year string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/manual_partitioning/'; + +query TTT +-- creates year -> month partitions +INSERT INTO test VALUES('name', '2024', '03'); +---- +1 + +query TTT +-- creates month -> year partitions. +-- now table have both partitions (year -> month and month -> year) +INSERT INTO test2 VALUES('name', '2024', '03'); +---- +1 + +statement error DataFusion error: Error during planning: Found mixed partition values on disk \(\["year", "month"\], \["month", "year"\]\) +-- fails to infer as partitions are not consistent +CREATE EXTERNAL TABLE test3(name string) +PARTITIONED BY (month string, year string) +STORED AS parquet +LOCATION 'test_files/scratch/create_external_table/manual_partitioning/'; From 99a6eef9b16001ff4183470e0941d3bde0d37b4b Mon Sep 17 00:00:00 2001 From: Mohamed Abdeen Date: Thu, 4 Apr 2024 22:00:53 +0200 Subject: [PATCH 11/11] ensure order in error msg for testing --- datafusion/core/src/datasource/listing/table.rs | 4 +++- datafusion/sqllogictest/test_files/create_external_table.slt | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 6d20c11922d65..380ab70a1ac61 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -539,7 +539,9 @@ impl ListingOptions { Ok(v) => Ok(v), Err(None) => Ok(vec![]), Err(Some(diff)) => { - plan_err!("Found mixed partition values on disk {:?}", diff) + let mut sorted_diff = [diff.0, diff.1]; + sorted_diff.sort(); + plan_err!("Found mixed partition values on disk {:?}", sorted_diff) } } } diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index ad344e782cf16..8aeeb06c19099 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -199,7 +199,7 @@ INSERT INTO test2 VALUES('name', '2024', '03'); ---- 1 -statement error DataFusion error: Error during planning: Found mixed partition values on disk \(\["year", "month"\], \["month", "year"\]\) +statement error DataFusion error: Error during planning: Found mixed partition values on disk \[\["month", "year"\], \["year", "month"\]\] -- fails to infer as partitions are not consistent CREATE EXTERNAL TABLE test3(name string) PARTITIONED BY (month string, year string)