diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 718f9f820af17..9e4334936d572 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -749,7 +749,7 @@ mod tests { use crate::datasource::file_format::options::CsvReadOptions; use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::datasource::file_format::test_util::scan_format; - use crate::datasource::listing::{FileRange, PartitionedFile}; + use crate::datasource::listing::{FileRange, ListingOptions, PartitionedFile}; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::context::SessionState; use crate::physical_plan::displayable; @@ -769,8 +769,8 @@ mod tests { }; use arrow_array::Date64Array; use chrono::{TimeZone, Utc}; - use datafusion_common::ScalarValue; use datafusion_common::{assert_contains, ToDFSchema}; + use datafusion_common::{FileType, GetExt, ScalarValue}; use datafusion_expr::{col, lit, when, Expr}; use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr::execution_props::ExecutionProps; @@ -1938,6 +1938,96 @@ mod tests { Ok(schema) } + #[tokio::test] + async fn write_table_results() -> Result<()> { + // create partitioned input file and context + let tmp_dir = TempDir::new()?; + // let mut ctx = create_ctx(&tmp_dir, 4).await?; + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_target_partitions(8), + ); + let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?; + // register csv file with the execution context + ctx.register_csv( + "test", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new().schema(&schema), + ) + .await?; + + // register a local file system object store for /tmp directory + let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); + let local_url = Url::parse("file://local").unwrap(); + ctx.runtime_env().register_object_store(&local_url, local); + + // Configure listing options + let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); + let listing_options = ListingOptions::new(Arc::new(file_format)) + .with_file_extension(FileType::PARQUET.get_ext()); + + // execute a simple query and write the results to parquet + let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; + std::fs::create_dir(&out_dir).unwrap(); + let df = ctx.sql("SELECT c1, c2 FROM test").await?; + let schema: Schema = df.schema().into(); + // Register a listing table - this will use all files in the directory as data sources + // for the query + ctx.register_listing_table( + "my_table", + &out_dir, + listing_options, + Some(Arc::new(schema)), + None, + ) + .await + .unwrap(); + df.write_table("my_table", DataFrameWriteOptions::new()) + .await?; + + // create a new context and verify that the results were saved to a partitioned parquet file + let ctx = SessionContext::new(); + + // get write_id + let mut paths = fs::read_dir(&out_dir).unwrap(); + let path = paths.next(); + let name = path + .unwrap()? + .path() + .file_name() + .expect("Should be a file name") + .to_str() + .expect("Should be a str") + .to_owned(); + let (parsed_id, _) = name.split_once('_').expect("File should contain _ !"); + let write_id = parsed_id.to_owned(); + + // register each partition as well as the top level dir + ctx.register_parquet( + "part0", + &format!("{out_dir}/{write_id}_0.parquet"), + ParquetReadOptions::default(), + ) + .await?; + + ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default()) + .await?; + + let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?; + let allparts = ctx + .sql("SELECT c1, c2 FROM allparts") + .await? + .collect() + .await?; + + let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum(); + + assert_eq!(part0[0].schema(), allparts[0].schema()); + + assert_eq!(allparts_count, 40); + + Ok(()) + } + #[tokio::test] async fn write_parquet_results() -> Result<()> { // create partitioned input file and context @@ -1982,7 +2072,6 @@ mod tests { .to_str() .expect("Should be a str") .to_owned(); - println!("{name}"); let (parsed_id, _) = name.split_once('_').expect("File should contain _ !"); let write_id = parsed_id.to_owned();