Skip to content
2 changes: 1 addition & 1 deletion datafusion-examples/examples/data_io/parquet_encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub async fn parquet_encrypted() -> datafusion::common::Result<()> {

// Create a temporary file location for the encrypted parquet file
let tmp_source = TempDir::new()?;
let tempfile = tmp_source.path().join("cars_encrypted");
let tempfile = tmp_source.path().join("cars_encrypted.parquet");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it need to be called .parquet even though the dataframe explicitly says write_parquet? Or is this just to clean up the code?

Copy link
Member

@martin-g martin-g Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no file extension and there is no configuration (Some(true)) then the heuristics decide that this is a folder and creates a Parquet partition file.
If there is no config and the path has file extension then it writes all the content in this file (no partitions)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually why is this change needed for the example? Is it fixing some behaviour made by this change or just clarifying the example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this PR, with_single_file_output(true) was silently ignored for paths without extensions it would create a directory cars_encrypted/ with parquet files inside. The read_parquet() call then found these files in the directory. After this PR, with_single_file_output(true) is correctly respected, creating a single file at the exact path cars_encrypted . However, read_parquet() with default options expects a .parquet extension and fails validation. Adding .parquet to the filename fixes this and makes the example explicit about the expected file format.


// Write encrypted parquet
let mut options = TableParquetOptions::default();
Expand Down
3 changes: 2 additions & 1 deletion datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_common::{
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::file_sink_config::FileSinkConfig;
use datafusion_datasource::file_sink_config::{FileOutputMode, FileSinkConfig};
#[expect(deprecated)]
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
Expand Down Expand Up @@ -674,6 +674,7 @@ impl TableProvider for ListingTable {
insert_op,
keep_partition_by_columns,
file_extension: self.options().format.get_ext(),
file_output_mode: FileOutputMode::Automatic,
};

// For writes, we only use user-specified ordering (no file groups to derive from)
Expand Down
35 changes: 27 additions & 8 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ pub struct DataFrameWriteOptions {
/// Controls how new data should be written to the table, determining whether
/// to append, overwrite, or replace existing data.
insert_op: InsertOp,
/// Controls if all partitions should be coalesced into a single output file
/// Generally will have slower performance when set to true.
single_file_output: bool,
/// Controls if all partitions should be coalesced into a single output file.
/// - `None`: Use automatic mode (extension-based heuristic)
/// - `Some(true)`: Force single file output at exact path
/// - `Some(false)`: Force directory output with generated filenames
single_file_output: Option<bool>,
/// Sets which columns should be used for hive-style partitioned writes by name.
/// Can be set to empty vec![] for non-partitioned writes.
partition_by: Vec<String>,
Expand All @@ -94,7 +96,7 @@ impl DataFrameWriteOptions {
pub fn new() -> Self {
DataFrameWriteOptions {
insert_op: InsertOp::Append,
single_file_output: false,
single_file_output: None,
partition_by: vec![],
sort_by: vec![],
}
Expand All @@ -108,9 +110,13 @@ impl DataFrameWriteOptions {

/// Set the single_file_output value to true or false
///
/// When set to true, an output file will always be created even if the DataFrame is empty
/// - `true`: Force single file output at the exact path specified
/// - `false`: Force directory output with generated filenames
///
/// When not called, automatic mode is used (extension-based heuristic).
/// When set to true, an output file will always be created even if the DataFrame is empty.
pub fn with_single_file_output(mut self, single_file_output: bool) -> Self {
self.single_file_output = single_file_output;
self.single_file_output = Some(single_file_output);
self
}

Expand All @@ -125,6 +131,15 @@ impl DataFrameWriteOptions {
self.sort_by = sort_by;
self
}

/// Build the options HashMap to pass to CopyTo for sink configuration.
fn build_sink_options(&self) -> HashMap<String, String> {
let mut options = HashMap::new();
if let Some(single_file) = self.single_file_output {
options.insert("single_file_output".to_string(), single_file.to_string());
}
options
}
}

impl Default for DataFrameWriteOptions {
Expand Down Expand Up @@ -2040,6 +2055,8 @@ impl DataFrame {

let file_type = format_as_file_type(format);

let copy_options = options.build_sink_options();

let plan = if options.sort_by.is_empty() {
self.plan
} else {
Expand All @@ -2052,7 +2069,7 @@ impl DataFrame {
plan,
path.into(),
file_type,
HashMap::new(),
copy_options,
options.partition_by,
)?
.build()?;
Expand Down Expand Up @@ -2108,6 +2125,8 @@ impl DataFrame {

let file_type = format_as_file_type(format);

let copy_options = options.build_sink_options();

let plan = if options.sort_by.is_empty() {
self.plan
} else {
Expand All @@ -2120,7 +2139,7 @@ impl DataFrame {
plan,
path.into(),
file_type,
Default::default(),
copy_options,
options.partition_by,
)?
.build()?;
Expand Down
156 changes: 155 additions & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ impl DataFrame {

let file_type = format_as_file_type(format);

let copy_options = options.build_sink_options();

let plan = if options.sort_by.is_empty() {
self.plan
} else {
Expand All @@ -88,7 +90,7 @@ impl DataFrame {
plan,
path.into(),
file_type,
Default::default(),
copy_options,
options.partition_by,
)?
.build()?;
Expand Down Expand Up @@ -324,4 +326,156 @@ mod tests {

Ok(())
}

/// Test FileOutputMode::SingleFile - explicitly request single file output
/// for paths WITHOUT file extensions. This verifies the fix for the regression
/// where extension heuristics ignored the explicit with_single_file_output(true).
#[tokio::test]
async fn test_file_output_mode_single_file() -> Result<()> {
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;

let ctx = SessionContext::new();
let tmp_dir = TempDir::new()?;

// Path WITHOUT .parquet extension - this is the key scenario
let output_path = tmp_dir.path().join("data_no_ext");
let output_path_str = output_path.to_str().unwrap();

let df = ctx.read_batch(RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)?)?;

// Explicitly request single file output
df.write_parquet(
output_path_str,
DataFrameWriteOptions::new().with_single_file_output(true),
None,
)
.await?;

// Verify: output should be a FILE, not a directory
assert!(
output_path.is_file(),
"Expected single file at {:?}, but got is_file={}, is_dir={}",
output_path,
output_path.is_file(),
output_path.is_dir()
);

// Verify the file is readable as parquet
let file = std::fs::File::open(&output_path)?;
let reader = parquet::file::reader::SerializedFileReader::new(file)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
assert_eq!(metadata.file_metadata().num_rows(), 3);

Ok(())
}

/// Test FileOutputMode::Automatic - uses extension heuristic.
/// Path WITH extension -> single file; path WITHOUT extension -> directory.
#[tokio::test]
async fn test_file_output_mode_automatic() -> Result<()> {
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;

let ctx = SessionContext::new();
let tmp_dir = TempDir::new()?;

let schema =
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)?;

// Case 1: Path WITH extension -> should create single file (Automatic mode)
let output_with_ext = tmp_dir.path().join("data.parquet");
let df = ctx.read_batch(batch.clone())?;
df.write_parquet(
output_with_ext.to_str().unwrap(),
DataFrameWriteOptions::new(), // Automatic mode (default)
None,
)
.await?;

assert!(
output_with_ext.is_file(),
"Path with extension should be a single file, got is_file={}, is_dir={}",
output_with_ext.is_file(),
output_with_ext.is_dir()
);

// Case 2: Path WITHOUT extension -> should create directory (Automatic mode)
let output_no_ext = tmp_dir.path().join("data_dir");
let df = ctx.read_batch(batch)?;
df.write_parquet(
output_no_ext.to_str().unwrap(),
DataFrameWriteOptions::new(), // Automatic mode (default)
None,
)
.await?;

assert!(
output_no_ext.is_dir(),
"Path without extension should be a directory, got is_file={}, is_dir={}",
output_no_ext.is_file(),
output_no_ext.is_dir()
);

Ok(())
}

/// Test FileOutputMode::Directory - explicitly request directory output
/// even for paths WITH file extensions.
#[tokio::test]
async fn test_file_output_mode_directory() -> Result<()> {
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;

let ctx = SessionContext::new();
let tmp_dir = TempDir::new()?;

// Path WITH .parquet extension but explicitly requesting directory output
let output_path = tmp_dir.path().join("output.parquet");
let output_path_str = output_path.to_str().unwrap();

let df = ctx.read_batch(RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)?)?;

// Explicitly request directory output (single_file_output = false)
df.write_parquet(
output_path_str,
DataFrameWriteOptions::new().with_single_file_output(false),
None,
)
.await?;

// Verify: output should be a DIRECTORY, not a single file
assert!(
output_path.is_dir(),
"Expected directory at {:?}, but got is_file={}, is_dir={}",
output_path,
output_path.is_file(),
output_path.is_dir()
);

// Verify the directory contains parquet file(s)
let entries: Vec<_> = std::fs::read_dir(&output_path)?
.filter_map(|e| e.ok())
.collect();
assert!(
!entries.is_empty(),
"Directory should contain at least one file"
);

Ok(())
}
}
7 changes: 6 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ mod tests {
use datafusion_common::test_util::batches_to_string;
use datafusion_common::{Result, ScalarValue};
use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
use datafusion_datasource::file_sink_config::{
FileOutputMode, FileSink, FileSinkConfig,
};
use datafusion_datasource::{ListingTableUrl, PartitionedFile};
use datafusion_datasource_parquet::{
ParquetFormat, ParquetFormatFactory, ParquetSink,
Expand Down Expand Up @@ -1547,6 +1549,7 @@ mod tests {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
file_extension: "parquet".into(),
file_output_mode: FileOutputMode::Automatic,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down Expand Up @@ -1638,6 +1641,7 @@ mod tests {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
file_extension: "parquet".into(),
file_output_mode: FileOutputMode::Automatic,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down Expand Up @@ -1728,6 +1732,7 @@ mod tests {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
file_extension: "parquet".into(),
file_output_mode: FileOutputMode::Automatic,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down
27 changes: 25 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;

use crate::datasource::file_format::file_type_to_format;
use crate::datasource::listing::ListingTableUrl;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::datasource::physical_plan::{FileOutputMode, FileSinkConfig};
use crate::datasource::{DefaultTableSource, source_as_provider};
use crate::error::{DataFusionError, Result};
use crate::execution::context::{ExecutionProps, SessionState};
Expand Down Expand Up @@ -549,8 +549,30 @@ impl DefaultPhysicalPlanner {
}
};

// Parse single_file_output option if explicitly set
let file_output_mode = match source_option_tuples
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better to push this enum into single_file_output itself as well instead of having this parse from an Option<bool> logic 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept with_single_file_output(bool) in DataFrameWriteOptions for backward compatibility, it's the existing public API. Internally, I changed the field to Option so we can distinguish "not set" from "explicitly set to false". The conversion to FileOutputMode happens in the physical planner. Adding with_file_output_mode(FileOutputMode) would expand the API surface; are you okay with that?

.get("single_file_output")
.map(|v| v.trim())
{
None => FileOutputMode::Automatic,
Some("true") => FileOutputMode::SingleFile,
Some("false") => FileOutputMode::Directory,
Some(value) => {
return Err(DataFusionError::Configuration(format!(
"provided value for 'single_file_output' was not recognized: \"{value}\""
)));
}
};

// Filter out sink-related options that are not format options
let format_options: HashMap<String, String> = source_option_tuples
Comment on lines +567 to +568
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the need for filtering the option here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single_file_output option is a sink-level configuration that controls file output behavior, not a format-specific option . The file_type_to_format().create(session_state, &format_options) call validates options against the format's config schema and would fail with "unrecognized option" if we pass single_file_output. So we extract it first for FileSinkConfig, then filter it out before passing the remaining options to the format factory.

.iter()
.filter(|(k, _)| k.as_str() != "single_file_output")
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

let sink_format = file_type_to_format(file_type)?
.create(session_state, source_option_tuples)?;
.create(session_state, &format_options)?;

// Determine extension based on format extension and compression
let file_extension = match sink_format.compression_type() {
Expand All @@ -571,6 +593,7 @@ impl DefaultPhysicalPlanner {
insert_op: InsertOp::Append,
keep_partition_by_columns,
file_extension,
file_output_mode,
};

let ordering = input_exec.properties().output_ordering().cloned();
Expand Down
Loading