Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use datafusion_common::{
};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::file_scan_config::{
FileScanConfig, FileScanConfigBuilder, hash_partitioning_from_partition_fields,
};
use datafusion_datasource::file_sink_config::{FileOutputMode, FileSinkConfig};
#[expect(deprecated)]
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
Expand Down Expand Up @@ -628,6 +630,15 @@ impl TableProvider for ListingTable {
);
}
Some(output_partitioning)
} else if partitioned_by_file_group {
// Files are grouped by partition column values: declare Hash
// partitioning on those columns so the optimizer can skip hash
// repartitioning for aggregates and joins on the partition columns.
hash_partitioning_from_partition_fields(
&self.table_schema,
&table_partition_cols.clone().into(),
partitioned_file_lists.len(),
)
} else {
None
};
Expand All @@ -650,7 +661,6 @@ impl TableProvider for ListingTable {
.with_output_ordering(output_ordering)
.with_output_partitioning(output_partitioning)
.with_expr_adapter(self.expr_adapter_factory.clone())
.with_partitioned_by_file_group(partitioned_by_file_group)
.build();

// create the execution plan
Expand Down
74 changes: 22 additions & 52 deletions datafusion/datasource/src/file_scan_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,6 @@ pub struct FileScanConfig {
/// would be incorrect if there are filters being applied, thus this should be accessed
/// via [`FileScanConfig::statistics`].
pub(crate) statistics: Statistics,
/// When true, file_groups are organized by partition column values
/// and output_partitioning will return Hash partitioning on partition columns.
/// This allows the optimizer to skip hash repartitioning for aggregates and joins
/// on partition columns.
///
/// If the number of file partitions > target_partitions, the file partitions will be grouped
/// in a round-robin fashion such that number of file partitions = target_partitions.
///
/// Follow-up: remove this redundant field in favor of
/// `output_partitioning`, see <https://github.com/apache/datafusion/issues/23099>.
pub partitioned_by_file_group: bool,
/// Declared physical output partitioning for this scan.
///
/// Expressions are against the full table schema, before scan projection or
Expand Down Expand Up @@ -288,7 +277,6 @@ pub struct FileScanConfigBuilder {
file_compression_type: Option<FileCompressionType>,
batch_size: Option<usize>,
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
partitioned_by_file_group: bool,
}

impl FileScanConfigBuilder {
Expand All @@ -315,7 +303,6 @@ impl FileScanConfigBuilder {
constraints: None,
batch_size: None,
expr_adapter_factory: None,
partitioned_by_file_group: false,
}
}

Expand Down Expand Up @@ -513,18 +500,6 @@ impl FileScanConfigBuilder {
self
}

/// Set whether file groups are organized by partition column values.
///
/// When set to true, the output partitioning will be declared as Hash partitioning
/// on the partition columns.
pub fn with_partitioned_by_file_group(
mut self,
partitioned_by_file_group: bool,
) -> Self {
self.partitioned_by_file_group = partitioned_by_file_group;
self
}

/// Build the final [`FileScanConfig`] with all the configured settings.
///
/// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
Expand All @@ -546,7 +521,6 @@ impl FileScanConfigBuilder {
file_compression_type,
batch_size,
expr_adapter_factory: expr_adapter,
partitioned_by_file_group,
} = self;

let constraints = constraints.unwrap_or_default();
Expand All @@ -571,7 +545,6 @@ impl FileScanConfigBuilder {
batch_size,
expr_adapter_factory: expr_adapter,
statistics,
partitioned_by_file_group,
output_partitioning,
}
}
Expand All @@ -592,12 +565,15 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
constraints: Some(config.constraints),
batch_size: config.batch_size,
expr_adapter_factory: config.expr_adapter_factory,
partitioned_by_file_group: config.partitioned_by_file_group,
}
}
}

fn hash_partitioning_from_partition_fields(
/// Builds `Partitioning::Hash` over `partition_cols` (resolved to their indices in
/// `schema`) with `partition_count` partitions. Returns `None` when there are no
/// partition columns. Callers use this to declare the output partitioning of a scan
/// whose file groups are organized by partition column values.
pub fn hash_partitioning_from_partition_fields(
schema: &Schema,
partition_cols: &Fields,
partition_count: usize,
Expand Down Expand Up @@ -759,7 +735,7 @@ impl DataSource for FileScanConfig {
) -> Result<Option<Arc<dyn DataSource>>> {
// When file groups define output partitioning, repartitioning files
// would invalidate the partition-to-file-group mapping.
if self.output_partitioning.is_some() || self.partitioned_by_file_group {
if self.output_partitioning.is_some() {
return Ok(None);
}

Expand All @@ -776,10 +752,8 @@ impl DataSource for FileScanConfig {
/// Returns the output partitioning for this file scan.
///
/// When `output_partitioning` is set, this returns the declared partitioning
/// after applying scan projection. When `partitioned_by_file_group` is true,
/// this returns `Partitioning::Hash` on the Hive partition columns, allowing
/// the optimizer to skip hash repartitioning for aggregates and joins on
/// those columns.
/// after applying scan projection, allowing the optimizer to skip hash
/// repartitioning for aggregates and joins on the partitioning columns.
///
/// If projection or partition count validation fails, this returns
/// `UnknownPartitioning`.
Expand All @@ -795,15 +769,7 @@ impl DataSource for FileScanConfig {
/// - Idea: Could allow byte-range splitting within partition-aware groups,
/// preserving I/O parallelism while maintaining partition semantics.
fn output_partitioning(&self) -> Partitioning {
let Some(output_partitioning) = self.output_partitioning.clone().or_else(|| {
self.partitioned_by_file_group.then(|| {
hash_partitioning_from_partition_fields(
self.file_source.table_schema().table_schema(),
self.table_partition_cols(),
self.file_groups.len(),
)
})?
}) else {
let Some(output_partitioning) = self.output_partitioning.clone() else {
return Partitioning::UnknownPartitioning(self.file_groups.len());
};
if output_partitioning.partition_count() != self.file_groups.len() {
Expand Down Expand Up @@ -1127,10 +1093,7 @@ impl DataSource for FileScanConfig {
/// when file order must be preserved or the file groups define the output
/// partitioning needed for the rest of the plan
fn create_sibling_state(&self) -> Option<Arc<dyn Any + Send + Sync>> {
if self.preserve_order
|| self.output_partitioning.is_some()
|| self.partitioned_by_file_group
{
if self.preserve_order || self.output_partitioning.is_some() {
return None;
}

Expand Down Expand Up @@ -2541,7 +2504,7 @@ mod tests {
vec![partition_col],
);

// partitioned_by_file_group defaults to false
// output_partitioning defaults to None
let partitioning = config.output_partitioning();
assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
}
Expand Down Expand Up @@ -2601,13 +2564,12 @@ mod tests {
#[test]
fn test_output_partitioning_no_partition_columns() {
let file_schema = aggr_test_schema();
let mut config = config_for_projection(
let config = config_for_projection(
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
vec![], // No partition columns
);
config.partitioned_by_file_group = true;

let partitioning = config.output_partitioning();
assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
Expand All @@ -2630,12 +2592,16 @@ mod tests {
Statistics::new_unknown(&file_schema),
single_partition_col,
);
config.partitioned_by_file_group = true;
config.file_groups = vec![
FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]),
];
config.output_partitioning = hash_partitioning_from_partition_fields(
config.file_source.table_schema().table_schema(),
config.table_partition_cols(),
config.file_groups.len(),
);

let partitioning = config.output_partitioning();
match partitioning {
Expand All @@ -2659,11 +2625,15 @@ mod tests {
Statistics::new_unknown(&file_schema),
multiple_partition_cols,
);
config.partitioned_by_file_group = true;
config.file_groups = vec![
FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
];
config.output_partitioning = hash_partitioning_from_partition_fields(
config.file_source.table_schema().table_schema(),
config.table_partition_cols(),
config.file_groups.len(),
);

let partitioning = config.output_partitioning();
match partitioning {
Expand Down
12 changes: 11 additions & 1 deletion datafusion/datasource/src/file_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1581,14 +1581,24 @@ mod tests {
DataType::Int32,
false,
)])));
// Declaring an output partitioning marks the scan as pre-grouped, which
// keeps each stream's files local (disables shared work stealing).
let output_partitioning = self.partitioned_by_file_group.then(|| {
datafusion_physical_expr::Partitioning::Hash(
vec![Arc::new(
datafusion_physical_expr::expressions::Column::new("i", 0),
)],
file_groups.len(),
)
});
FileScanConfigBuilder::new(
ObjectStoreUrl::parse("test:///").unwrap(),
Arc::new(MockSource::new(table_schema)),
)
.with_file_groups(file_groups)
.with_limit(self.limit)
.with_preserve_order(self.preserve_order)
.with_partitioned_by_file_group(self.partitioned_by_file_group)
.with_output_partitioning(output_partitioning)
.build()
}
}
Expand Down
9 changes: 3 additions & 6 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,18 +586,15 @@ pub fn parse_protobuf_file_scan_config(
file_source
};

let mut config_builder = FileScanConfigBuilder::new(object_store_url, file_source)
let config = FileScanConfigBuilder::new(object_store_url, file_source)
.with_file_groups(file_groups)
.with_constraints(constraints)
.with_statistics(statistics)
.with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
.with_output_ordering(output_ordering)
.with_output_partitioning(output_partitioning)
.with_batch_size(proto.batch_size.map(|s| s as usize));
if proto.partitioned_by_file_group.unwrap_or(false) {
config_builder = config_builder.with_partitioned_by_file_group(true);
}
let config = config_builder.build();
.with_batch_size(proto.batch_size.map(|s| s as usize))
.build();
Ok(config)
}

Expand Down
4 changes: 3 additions & 1 deletion datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,9 @@ pub fn serialize_file_scan_config(
constraints: Some(conf.constraints.clone().into()),
batch_size: conf.batch_size.map(|s| s as u64),
projection_exprs,
partitioned_by_file_group: Some(conf.partitioned_by_file_group),
// Partition grouping is now encoded in `output_partitioning`; this legacy
// wire field is left unset (readers rely on `output_partitioning`).
partitioned_by_file_group: None,
output_partitioning,
})
}
Expand Down
18 changes: 0 additions & 18 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4219,24 +4219,6 @@ fn roundtrip_file_scan_config(scan_config: FileScanConfig) -> Result<FileScanCon
Ok(file_scan_config.clone())
}

#[test]
fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> {
let file_schema =
Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema)));
let scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
.with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
"/path/to/file.parquet".to_string(),
1024,
)])])
.with_partitioned_by_file_group(true)
.build();

assert!(roundtrip_file_scan_config(scan_config)?.partitioned_by_file_group);
Ok(())
}

#[test]
fn roundtrip_parquet_exec_output_partitioning() -> Result<()> {
let file_schema =
Expand Down
Loading
Loading