Skip to content
11 changes: 9 additions & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ impl TableProviderFactory for ListingTableFactory {
(Some(schema), table_partition_cols)
};

let table_path = ListingTableUrl::parse(&cmd.location)?;
let mut table_path = ListingTableUrl::parse(&cmd.location)?;

let options = ListingOptions::new(file_format)
.with_file_extension(file_extension)
.with_file_extension(&file_extension)
.with_session_config_options(session_state.config())
.with_table_partition_cols(table_partition_cols);

Expand All @@ -125,6 +125,13 @@ impl TableProviderFactory for ListingTableFactory {
// specifically for parquet file format.
// See: https://github.com/apache/datafusion/issues/7317
None => {
// if the folder then rewrite a file path as 'path/*.parquet'

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an actual fix

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will mean a directory of files like foo/my_file.parquet.snappy would not be readable anymore -- I think that spark creates files like my_file.snappy.parquet so it should be ok

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be ok, compressed files are usually *.codec.parquet and more broad wildcard *.parquet should read them. My local test I did against part-00000-9b95f137-d11f-44b6-84b7-d49c95bc7c5b-c000.snappy.parquet

// to only read the files the reader can understand
if table_path.is_folder() && table_path.get_glob().is_none() {
table_path = table_path.with_glob(
format!("*.{}", cmd.file_type.to_lowercase()).as_ref(),
)?;
}
let schema = options.infer_schema(session_state, &table_path).await?;
let df_schema = Arc::clone(&schema).to_dfschema()?;
let column_refs: HashSet<_> = cmd
Expand Down
126 changes: 124 additions & 2 deletions datafusion/core/src/execution/context/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ mod tests {
use crate::test_util::parquet_test_data;

use arrow::util::pretty::pretty_format_batches;
use datafusion_common::assert_contains;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{
assert_batches_eq, assert_batches_sorted_eq, assert_contains,
};
use datafusion_execution::config::SessionConfig;

use tempfile::tempdir;
use tempfile::{tempdir, TempDir};

#[tokio::test]
async fn read_with_glob_path() -> Result<()> {
Expand Down Expand Up @@ -400,4 +402,124 @@ mod tests {
assert_eq!(total_rows, 5);
Ok(())
}

#[tokio::test]
async fn read_from_parquet_folder() -> Result<()> {
let ctx = SessionContext::new();
let tmp_dir = TempDir::new()?;
let test_path = tmp_dir.path().to_str().unwrap().to_string();

ctx.sql("SELECT 1 a")
.await?
.write_parquet(&test_path, DataFrameWriteOptions::default(), None)
.await?;

ctx.sql("SELECT 2 a")
.await?
.write_parquet(&test_path, DataFrameWriteOptions::default(), None)
.await?;

// Adding CSV to check it is not read with Parquet reader
ctx.sql("SELECT 3 a")
.await?
.write_csv(&test_path, DataFrameWriteOptions::default(), None)
.await?;

let actual = ctx
.read_parquet(&test_path, ParquetReadOptions::default())
.await?
.collect()
.await?;

#[cfg_attr(any(), rustfmt::skip)]
assert_batches_sorted_eq!(&[
"+---+",
"| a |",
"+---+",
"| 2 |",
"| 1 |",
"+---+",
], &actual);

let actual = ctx
.read_parquet(test_path, ParquetReadOptions::default())
.await?
.collect()
.await?;

#[cfg_attr(any(), rustfmt::skip)]
assert_batches_sorted_eq!(&[
"+---+",
"| a |",
"+---+",
"| 2 |",
"| 1 |",
"+---+",
], &actual);

Ok(())
}

#[tokio::test]
async fn read_from_parquet_folder_table() -> Result<()> {
let ctx = SessionContext::new();
let tmp_dir = TempDir::new()?;
let test_path = tmp_dir.path().to_str().unwrap().to_string();

ctx.sql("SELECT 1 a")
.await?
.write_parquet(&test_path, DataFrameWriteOptions::default(), None)
.await?;

ctx.sql("SELECT 2 a")
.await?
.write_parquet(&test_path, DataFrameWriteOptions::default(), None)
.await?;

// Adding CSV to check it is not read with Parquet reader
ctx.sql("SELECT 3 a")
.await?
.write_csv(&test_path, DataFrameWriteOptions::default(), None)
.await?;

ctx.sql(format!("CREATE EXTERNAL TABLE parquet_folder_t1 STORED AS PARQUET LOCATION '{test_path}'").as_ref())
.await?;

let actual = ctx
.sql("select * from parquet_folder_t1")
.await?
.collect()
.await?;
#[cfg_attr(any(), rustfmt::skip)]
assert_batches_sorted_eq!(&[
"+---+",
"| a |",
"+---+",
"| 2 |",
"| 1 |",
"+---+",
], &actual);

Ok(())
}

#[tokio::test]
async fn read_dummy_folder() -> Result<()> {
let ctx = SessionContext::new();
let test_path = "/foo/";

let actual = ctx
.read_parquet(test_path, ParquetReadOptions::default())
.await?
.collect()
.await?;

#[cfg_attr(any(), rustfmt::skip)]
assert_batches_eq!(&[
"++",
"++",
], &actual);

Ok(())
}
}
22 changes: 22 additions & 0 deletions datafusion/datasource/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,28 @@ impl ListingTableUrl {
let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
ObjectStoreUrl::parse(url).unwrap()
}

/// Returns true if the [`ListingTableUrl`] points to the folder
pub fn is_folder(&self) -> bool {
self.url.scheme() == "file" && self.is_collection()
}

/// Return the `url` for [`ListingTableUrl`]
pub fn get_url(&self) -> &Url {
&self.url
}

/// Return the `glob` for [`ListingTableUrl`]
pub fn get_glob(&self) -> &Option<Pattern> {
&self.glob
}

/// Returns a copy of current [`ListingTableUrl`] with a specified `glob`
pub fn with_glob(self, glob: &str) -> Result<Self> {
let glob =
Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
Self::try_new(self.url, Some(glob))
}
}

/// Creates a file URL from a potentially relative filesystem path
Expand Down
48 changes: 48 additions & 0 deletions datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,54 @@ select count(*) from listing_table;
----
12

# Test table pointing to the folder with parquet files(ends with /)
statement ok
CREATE EXTERNAL TABLE listing_table_folder_0
STORED AS PARQUET
LOCATION 'test_files/scratch/parquet/test_table/';

statement ok
set datafusion.execution.listing_table_ignore_subdirectory = true;

# scan file: 0.parquet 1.parquet 2.parquet
query I
select count(*) from listing_table_folder_0;
----
9

statement ok
set datafusion.execution.listing_table_ignore_subdirectory = false;

# scan file: 0.parquet 1.parquet 2.parquet 3.parquet
query I
select count(*) from listing_table_folder_0;
----
12

# Test table pointing to the folder with parquet files(doesn't end with /)
statement ok
CREATE EXTERNAL TABLE listing_table_folder_1
STORED AS PARQUET
LOCATION 'test_files/scratch/parquet/test_table';

statement ok
set datafusion.execution.listing_table_ignore_subdirectory = true;

# scan file: 0.parquet 1.parquet 2.parquet
query I
select count(*) from listing_table_folder_1;
----
9

statement ok
set datafusion.execution.listing_table_ignore_subdirectory = false;

# scan file: 0.parquet 1.parquet 2.parquet 3.parquet
query I
select count(*) from listing_table_folder_1;
----
12

# Clean up
statement ok
DROP TABLE timestamp_with_tz;
Expand Down