Skip to content

Commit 5501e8e

Browse files
devinjdangeloLordwormsWeijun-Halamb
authored
Support COPY TO Externally Defined File Formats, add FileType trait (#11060)
* wip create and register ext file types with session * Add contains function, and support in datafusion substrait consumer (#10879) * adding new function contains * adding substrait test * adding doc * adding doc * Update docs/source/user-guide/sql/scalar_functions.md Co-authored-by: Alex Huang <huangweijun1001@gmail.com> * adding entry --------- Co-authored-by: Alex Huang <huangweijun1001@gmail.com> * logical planning updated * compiling * removing filetype enum * compiling * working on tests * fix some tests * test fixes * cli fix * cli fmt * Update datafusion/core/src/datasource/file_format/mod.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Update datafusion/core/src/execution/session_state.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * review comments * review comments * review comments * typo fix * fmt * fix err log style * fmt --------- Co-authored-by: Lordworms <48054792+Lordworms@users.noreply.github.com> Co-authored-by: Alex Huang <huangweijun1001@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent acadfbf commit 5501e8e

File tree

40 files changed

+1305
-644
lines changed

40 files changed

+1305
-644
lines changed

datafusion-cli/src/exec.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::collections::HashMap;
2121
use std::fs::File;
2222
use std::io::prelude::*;
2323
use std::io::BufReader;
24-
use std::str::FromStr;
2524

2625
use crate::cli_context::CliSessionContext;
2726
use crate::helper::split_from_semicolon;
@@ -35,14 +34,14 @@ use crate::{
3534

3635
use datafusion::common::instant::Instant;
3736
use datafusion::common::plan_datafusion_err;
37+
use datafusion::config::ConfigFileType;
3838
use datafusion::datasource::listing::ListingTableUrl;
3939
use datafusion::error::{DataFusionError, Result};
4040
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
4141
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
4242
use datafusion::sql::parser::{DFParser, Statement};
4343
use datafusion::sql::sqlparser::dialect::dialect_from_str;
4444

45-
use datafusion::common::FileType;
4645
use datafusion::sql::sqlparser;
4746
use rustyline::error::ReadlineError;
4847
use rustyline::Editor;
@@ -291,6 +290,15 @@ impl AdjustedPrintOptions {
291290
}
292291
}
293292

293+
fn config_file_type_from_str(ext: &str) -> Option<ConfigFileType> {
294+
match ext.to_lowercase().as_str() {
295+
"csv" => Some(ConfigFileType::CSV),
296+
"json" => Some(ConfigFileType::JSON),
297+
"parquet" => Some(ConfigFileType::PARQUET),
298+
_ => None,
299+
}
300+
}
301+
294302
async fn create_plan(
295303
ctx: &mut dyn CliSessionContext,
296304
statement: Statement,
@@ -302,7 +310,7 @@ async fn create_plan(
302310
// will raise Configuration errors.
303311
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
304312
// To support custom formats, treat error as None
305-
let format = FileType::from_str(&cmd.file_type).ok();
313+
let format = config_file_type_from_str(&cmd.file_type);
306314
register_object_store_and_config_extensions(
307315
ctx,
308316
&cmd.location,
@@ -313,13 +321,13 @@ async fn create_plan(
313321
}
314322

315323
if let LogicalPlan::Copy(copy_to) = &mut plan {
316-
let format: FileType = (&copy_to.format_options).into();
324+
let format = config_file_type_from_str(&copy_to.file_type.get_ext());
317325

318326
register_object_store_and_config_extensions(
319327
ctx,
320328
&copy_to.output_url,
321329
&copy_to.options,
322-
Some(format),
330+
format,
323331
)
324332
.await?;
325333
}
@@ -357,7 +365,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
357365
ctx: &dyn CliSessionContext,
358366
location: &String,
359367
options: &HashMap<String, String>,
360-
format: Option<FileType>,
368+
format: Option<ConfigFileType>,
361369
) -> Result<()> {
362370
// Parse the location URL to extract the scheme and other components
363371
let table_path = ListingTableUrl::parse(location)?;
@@ -374,7 +382,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
374382
// Clone and modify the default table options based on the provided options
375383
let mut table_options = ctx.session_state().default_table_options().clone();
376384
if let Some(format) = format {
377-
table_options.set_file_format(format);
385+
table_options.set_config_format(format);
378386
}
379387
table_options.alter_with_string_hash_map(options)?;
380388

@@ -392,7 +400,6 @@ pub(crate) async fn register_object_store_and_config_extensions(
392400
mod tests {
393401
use super::*;
394402

395-
use datafusion::common::config::FormatOptions;
396403
use datafusion::common::plan_err;
397404

398405
use datafusion::prelude::SessionContext;
@@ -403,7 +410,7 @@ mod tests {
403410
let plan = ctx.state().create_logical_plan(sql).await?;
404411

405412
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
406-
let format = FileType::from_str(&cmd.file_type).ok();
413+
let format = config_file_type_from_str(&cmd.file_type);
407414
register_object_store_and_config_extensions(
408415
&ctx,
409416
&cmd.location,
@@ -429,12 +436,12 @@ mod tests {
429436
let plan = ctx.state().create_logical_plan(sql).await?;
430437

431438
if let LogicalPlan::Copy(cmd) = &plan {
432-
let format: FileType = (&cmd.format_options).into();
439+
let format = config_file_type_from_str(&cmd.file_type.get_ext());
433440
register_object_store_and_config_extensions(
434441
&ctx,
435442
&cmd.output_url,
436443
&cmd.options,
437-
Some(format),
444+
format,
438445
)
439446
.await?;
440447
} else {
@@ -484,7 +491,7 @@ mod tests {
484491
let mut plan = create_plan(&mut ctx, statement).await?;
485492
if let LogicalPlan::Copy(copy_to) = &mut plan {
486493
assert_eq!(copy_to.output_url, location);
487-
assert!(matches!(copy_to.format_options, FormatOptions::PARQUET(_)));
494+
assert_eq!(copy_to.file_type.get_ext(), "parquet".to_string());
488495
ctx.runtime_env()
489496
.object_store_registry
490497
.get_store(&Url::parse(&copy_to.output_url).unwrap())?;

datafusion-examples/examples/external_dependency/dataframe-to-s3.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ use std::sync::Arc;
2020

2121
use datafusion::dataframe::DataFrameWriteOptions;
2222
use datafusion::datasource::file_format::parquet::ParquetFormat;
23+
use datafusion::datasource::file_format::FileFormat;
2324
use datafusion::datasource::listing::ListingOptions;
2425
use datafusion::error::Result;
2526
use datafusion::prelude::*;
26-
use datafusion_common::{FileType, GetExt};
2727

2828
use object_store::aws::AmazonS3Builder;
2929
use url::Url;
@@ -54,7 +54,7 @@ async fn main() -> Result<()> {
5454
let path = format!("s3://{bucket_name}/test_data/");
5555
let file_format = ParquetFormat::default().with_enable_pruning(true);
5656
let listing_options = ListingOptions::new(Arc::new(file_format))
57-
.with_file_extension(FileType::PARQUET.get_ext());
57+
.with_file_extension(ParquetFormat::default().get_ext());
5858
ctx.register_listing_table("test", &path, listing_options, None, None)
5959
.await?;
6060

@@ -79,7 +79,7 @@ async fn main() -> Result<()> {
7979

8080
let file_format = ParquetFormat::default().with_enable_pruning(true);
8181
let listing_options = ListingOptions::new(Arc::new(file_format))
82-
.with_file_extension(FileType::PARQUET.get_ext());
82+
.with_file_extension(ParquetFormat::default().get_ext());
8383
ctx.register_listing_table("test2", &out_path, listing_options, None, None)
8484
.await?;
8585

datafusion/common/src/config.rs

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::str::FromStr;
2424

2525
use crate::error::_config_err;
2626
use crate::parsers::CompressionTypeVariant;
27-
use crate::{DataFusionError, FileType, Result};
27+
use crate::{DataFusionError, Result};
2828

2929
/// A macro that wraps a configuration struct and automatically derives
3030
/// [`Default`] and [`ConfigField`] for it, allowing it to be used
@@ -1116,6 +1116,16 @@ macro_rules! extensions_options {
11161116
}
11171117
}
11181118

1119+
/// These file types have special built in behavior for configuration.
1120+
/// Use TableOptions::Extensions for configuring other file types.
1121+
#[derive(Debug, Clone)]
1122+
pub enum ConfigFileType {
1123+
CSV,
1124+
#[cfg(feature = "parquet")]
1125+
PARQUET,
1126+
JSON,
1127+
}
1128+
11191129
/// Represents the configuration options available for handling different table formats within a data processing application.
11201130
/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
11211131
/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
@@ -1134,7 +1144,7 @@ pub struct TableOptions {
11341144

11351145
/// The current file format that the table operations should assume. This option allows
11361146
/// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1137-
pub current_format: Option<FileType>,
1147+
pub current_format: Option<ConfigFileType>,
11381148

11391149
/// Optional extensions that can be used to extend or customize the behavior of the table
11401150
/// options. Extensions can be registered using `Extensions::insert` and might include
@@ -1152,10 +1162,9 @@ impl ConfigField for TableOptions {
11521162
if let Some(file_type) = &self.current_format {
11531163
match file_type {
11541164
#[cfg(feature = "parquet")]
1155-
FileType::PARQUET => self.parquet.visit(v, "format", ""),
1156-
FileType::CSV => self.csv.visit(v, "format", ""),
1157-
FileType::JSON => self.json.visit(v, "format", ""),
1158-
_ => {}
1165+
ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1166+
ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1167+
ConfigFileType::JSON => self.json.visit(v, "format", ""),
11591168
}
11601169
} else {
11611170
self.csv.visit(v, "csv", "");
@@ -1188,12 +1197,9 @@ impl ConfigField for TableOptions {
11881197
match key {
11891198
"format" => match format {
11901199
#[cfg(feature = "parquet")]
1191-
FileType::PARQUET => self.parquet.set(rem, value),
1192-
FileType::CSV => self.csv.set(rem, value),
1193-
FileType::JSON => self.json.set(rem, value),
1194-
_ => {
1195-
_config_err!("Config value \"{key}\" is not supported on {}", format)
1196-
}
1200+
ConfigFileType::PARQUET => self.parquet.set(rem, value),
1201+
ConfigFileType::CSV => self.csv.set(rem, value),
1202+
ConfigFileType::JSON => self.json.set(rem, value),
11971203
},
11981204
_ => _config_err!("Config value \"{key}\" not found on TableOptions"),
11991205
}
@@ -1210,15 +1216,6 @@ impl TableOptions {
12101216
Self::default()
12111217
}
12121218

1213-
/// Sets the file format for the table.
1214-
///
1215-
/// # Parameters
1216-
///
1217-
/// * `format`: The file format to use (e.g., CSV, Parquet).
1218-
pub fn set_file_format(&mut self, format: FileType) {
1219-
self.current_format = Some(format);
1220-
}
1221-
12221219
/// Creates a new `TableOptions` instance initialized with settings from a given session config.
12231220
///
12241221
/// # Parameters
@@ -1249,6 +1246,15 @@ impl TableOptions {
12491246
clone
12501247
}
12511248

1249+
/// Sets the file format for the table.
1250+
///
1251+
/// # Parameters
1252+
///
1253+
/// * `format`: The file format to use (e.g., CSV, Parquet).
1254+
pub fn set_config_format(&mut self, format: ConfigFileType) {
1255+
self.current_format = Some(format);
1256+
}
1257+
12521258
/// Sets the extensions for this `TableOptions` instance.
12531259
///
12541260
/// # Parameters
@@ -1673,6 +1679,8 @@ config_namespace! {
16731679
}
16741680
}
16751681

1682+
pub trait FormatOptionsExt: Display {}
1683+
16761684
#[derive(Debug, Clone, PartialEq)]
16771685
#[allow(clippy::large_enum_variant)]
16781686
pub enum FormatOptions {
@@ -1698,28 +1706,15 @@ impl Display for FormatOptions {
16981706
}
16991707
}
17001708

1701-
impl From<FileType> for FormatOptions {
1702-
fn from(value: FileType) -> Self {
1703-
match value {
1704-
FileType::ARROW => FormatOptions::ARROW,
1705-
FileType::AVRO => FormatOptions::AVRO,
1706-
#[cfg(feature = "parquet")]
1707-
FileType::PARQUET => FormatOptions::PARQUET(TableParquetOptions::default()),
1708-
FileType::CSV => FormatOptions::CSV(CsvOptions::default()),
1709-
FileType::JSON => FormatOptions::JSON(JsonOptions::default()),
1710-
}
1711-
}
1712-
}
1713-
17141709
#[cfg(test)]
17151710
mod tests {
17161711
use std::any::Any;
17171712
use std::collections::HashMap;
17181713

17191714
use crate::config::{
1720-
ConfigEntry, ConfigExtension, ExtensionOptions, Extensions, TableOptions,
1715+
ConfigEntry, ConfigExtension, ConfigFileType, ExtensionOptions, Extensions,
1716+
TableOptions,
17211717
};
1722-
use crate::FileType;
17231718

17241719
#[derive(Default, Debug, Clone)]
17251720
pub struct TestExtensionConfig {
@@ -1777,7 +1772,7 @@ mod tests {
17771772
let mut extension = Extensions::new();
17781773
extension.insert(TestExtensionConfig::default());
17791774
let mut table_config = TableOptions::new().with_extensions(extension);
1780-
table_config.set_file_format(FileType::CSV);
1775+
table_config.set_config_format(ConfigFileType::CSV);
17811776
table_config.set("format.delimiter", ";").unwrap();
17821777
assert_eq!(table_config.csv.delimiter, b';');
17831778
table_config.set("test.bootstrap.servers", "asd").unwrap();
@@ -1794,7 +1789,7 @@ mod tests {
17941789
#[test]
17951790
fn csv_u8_table_options() {
17961791
let mut table_config = TableOptions::new();
1797-
table_config.set_file_format(FileType::CSV);
1792+
table_config.set_config_format(ConfigFileType::CSV);
17981793
table_config.set("format.delimiter", ";").unwrap();
17991794
assert_eq!(table_config.csv.delimiter as char, ';');
18001795
table_config.set("format.escape", "\"").unwrap();
@@ -1807,7 +1802,7 @@ mod tests {
18071802
#[test]
18081803
fn parquet_table_options() {
18091804
let mut table_config = TableOptions::new();
1810-
table_config.set_file_format(FileType::PARQUET);
1805+
table_config.set_config_format(ConfigFileType::PARQUET);
18111806
table_config
18121807
.set("format.bloom_filter_enabled::col1", "true")
18131808
.unwrap();
@@ -1821,7 +1816,7 @@ mod tests {
18211816
#[test]
18221817
fn parquet_table_options_config_entry() {
18231818
let mut table_config = TableOptions::new();
1824-
table_config.set_file_format(FileType::PARQUET);
1819+
table_config.set_config_format(ConfigFileType::PARQUET);
18251820
table_config
18261821
.set("format.bloom_filter_enabled::col1", "true")
18271822
.unwrap();
@@ -1835,7 +1830,7 @@ mod tests {
18351830
#[test]
18361831
fn parquet_table_options_config_metadata_entry() {
18371832
let mut table_config = TableOptions::new();
1838-
table_config.set_file_format(FileType::PARQUET);
1833+
table_config.set_config_format(ConfigFileType::PARQUET);
18391834
table_config.set("format.metadata::key1", "").unwrap();
18401835
table_config.set("format.metadata::key2", "value2").unwrap();
18411836
table_config

0 commit comments

Comments
 (0)