From e2ee2cf797845d258cf4aacca684c003d2fee153 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Thu, 25 Jun 2026 23:16:01 +0800 Subject: [PATCH 1/2] refactor: remove redundant partitioned_by_file_group file scan field `FileScanConfig` had two overlapping ways to declare file scan output partitioning: the `partitioned_by_file_group` bool and `output_partitioning`. Collapse them onto `output_partitioning` as the single source of truth. - Remove the `partitioned_by_file_group` field, the builder field, and the `with_partitioned_by_file_group` builder method. - `ListingTable` now derives the partition-column `Partitioning::Hash` once its file groups are finalized and passes it via `with_output_partitioning`; `hash_partitioning_from_partition_fields` is made `pub` for this. - proto already round-trips `output_partitioning`, so the now-vestigial wire bool is left unset on write and ignored on read (the proto field is kept for backward compatibility). Closes #23099. Signed-off-by: Jiawei Zhao --- datafusion/catalog-listing/src/table.rs | 14 +++- .../datasource/src/file_scan_config/mod.rs | 74 ++++++------------- datafusion/datasource/src/file_stream/mod.rs | 12 ++- .../proto/src/physical_plan/from_proto.rs | 9 +-- .../proto/src/physical_plan/to_proto.rs | 4 +- .../tests/cases/roundtrip_physical_plan.rs | 18 ----- 6 files changed, 51 insertions(+), 80 deletions(-) diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 36d85b981c06c..5b01ef95de6c7 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -30,7 +30,9 @@ use datafusion_common::{ }; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; -use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; +use datafusion_datasource::file_scan_config::{ + FileScanConfig, FileScanConfigBuilder, hash_partitioning_from_partition_fields, +}; use datafusion_datasource::file_sink_config::{FileOutputMode, FileSinkConfig}; #[expect(deprecated)] use datafusion_datasource::schema_adapter::SchemaAdapterFactory; @@ -628,6 +630,15 @@ impl TableProvider for ListingTable { ); } Some(output_partitioning) + } else if partitioned_by_file_group { + // Files are grouped by partition column values: declare Hash + // partitioning on those columns so the optimizer can skip hash + // repartitioning for aggregates and joins on the partition columns. + hash_partitioning_from_partition_fields( + &self.table_schema, + &table_partition_cols.clone().into(), + partitioned_file_lists.len(), + ) } else { None }; @@ -650,7 +661,6 @@ impl TableProvider for ListingTable { .with_output_ordering(output_ordering) .with_output_partitioning(output_partitioning) .with_expr_adapter(self.expr_adapter_factory.clone()) - .with_partitioned_by_file_group(partitioned_by_file_group) .build(); // create the execution plan diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 21d733458cbc9..532fe9015d242 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -198,17 +198,6 @@ pub struct FileScanConfig { /// would be incorrect if there are filters being applied, thus this should be accessed /// via [`FileScanConfig::statistics`]. pub(crate) statistics: Statistics, - /// When true, file_groups are organized by partition column values - /// and output_partitioning will return Hash partitioning on partition columns. - /// This allows the optimizer to skip hash repartitioning for aggregates and joins - /// on partition columns. - /// - /// If the number of file partitions > target_partitions, the file partitions will be grouped - /// in a round-robin fashion such that number of file partitions = target_partitions. - /// - /// Follow-up: remove this redundant field in favor of - /// `output_partitioning`, see . - pub partitioned_by_file_group: bool, /// Declared physical output partitioning for this scan. /// /// Expressions are against the full table schema, before scan projection or @@ -288,7 +277,6 @@ pub struct FileScanConfigBuilder { file_compression_type: Option, batch_size: Option, expr_adapter_factory: Option>, - partitioned_by_file_group: bool, } impl FileScanConfigBuilder { @@ -315,7 +303,6 @@ impl FileScanConfigBuilder { constraints: None, batch_size: None, expr_adapter_factory: None, - partitioned_by_file_group: false, } } @@ -513,18 +500,6 @@ impl FileScanConfigBuilder { self } - /// Set whether file groups are organized by partition column values. - /// - /// When set to true, the output partitioning will be declared as Hash partitioning - /// on the partition columns. - pub fn with_partitioned_by_file_group( - mut self, - partitioned_by_file_group: bool, - ) -> Self { - self.partitioned_by_file_group = partitioned_by_file_group; - self - } - /// Build the final [`FileScanConfig`] with all the configured settings. /// /// This method takes ownership of the builder and returns the constructed `FileScanConfig`. @@ -546,7 +521,6 @@ impl FileScanConfigBuilder { file_compression_type, batch_size, expr_adapter_factory: expr_adapter, - partitioned_by_file_group, } = self; let constraints = constraints.unwrap_or_default(); @@ -571,7 +545,6 @@ impl FileScanConfigBuilder { batch_size, expr_adapter_factory: expr_adapter, statistics, - partitioned_by_file_group, output_partitioning, } } @@ -592,12 +565,15 @@ impl From for FileScanConfigBuilder { constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, - partitioned_by_file_group: config.partitioned_by_file_group, } } } -fn hash_partitioning_from_partition_fields( +/// Builds `Partitioning::Hash` over `partition_cols` (resolved to their indices in +/// `schema`) with `partition_count` partitions. Returns `None` when there are no +/// partition columns. Callers use this to declare the output partitioning of a scan +/// whose file groups are organized by partition column values. +pub fn hash_partitioning_from_partition_fields( schema: &Schema, partition_cols: &Fields, partition_count: usize, @@ -759,7 +735,7 @@ impl DataSource for FileScanConfig { ) -> Result>> { // When file groups define output partitioning, repartitioning files // would invalidate the partition-to-file-group mapping. - if self.output_partitioning.is_some() || self.partitioned_by_file_group { + if self.output_partitioning.is_some() { return Ok(None); } @@ -776,10 +752,8 @@ impl DataSource for FileScanConfig { /// Returns the output partitioning for this file scan. /// /// When `output_partitioning` is set, this returns the declared partitioning - /// after applying scan projection. When `partitioned_by_file_group` is true, - /// this returns `Partitioning::Hash` on the Hive partition columns, allowing - /// the optimizer to skip hash repartitioning for aggregates and joins on - /// those columns. + /// after applying scan projection, allowing the optimizer to skip hash + /// repartitioning for aggregates and joins on the partitioning columns. /// /// If projection or partition count validation fails, this returns /// `UnknownPartitioning`. @@ -795,15 +769,7 @@ impl DataSource for FileScanConfig { /// - Idea: Could allow byte-range splitting within partition-aware groups, /// preserving I/O parallelism while maintaining partition semantics. fn output_partitioning(&self) -> Partitioning { - let Some(output_partitioning) = self.output_partitioning.clone().or_else(|| { - self.partitioned_by_file_group.then(|| { - hash_partitioning_from_partition_fields( - self.file_source.table_schema().table_schema(), - self.table_partition_cols(), - self.file_groups.len(), - ) - })? - }) else { + let Some(output_partitioning) = self.output_partitioning.clone() else { return Partitioning::UnknownPartitioning(self.file_groups.len()); }; if output_partitioning.partition_count() != self.file_groups.len() { @@ -1127,10 +1093,7 @@ impl DataSource for FileScanConfig { /// when file order must be preserved or the file groups define the output /// partitioning needed for the rest of the plan fn create_sibling_state(&self) -> Option> { - if self.preserve_order - || self.output_partitioning.is_some() - || self.partitioned_by_file_group - { + if self.preserve_order || self.output_partitioning.is_some() { return None; } @@ -2541,7 +2504,7 @@ mod tests { vec![partition_col], ); - // partitioned_by_file_group defaults to false + // output_partitioning defaults to None let partitioning = config.output_partitioning(); assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_))); } @@ -2601,13 +2564,12 @@ mod tests { #[test] fn test_output_partitioning_no_partition_columns() { let file_schema = aggr_test_schema(); - let mut config = config_for_projection( + let config = config_for_projection( Arc::clone(&file_schema), None, Statistics::new_unknown(&file_schema), vec![], // No partition columns ); - config.partitioned_by_file_group = true; let partitioning = config.output_partitioning(); assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_))); @@ -2630,12 +2592,16 @@ mod tests { Statistics::new_unknown(&file_schema), single_partition_col, ); - config.partitioned_by_file_group = true; config.file_groups = vec![ FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]), FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]), FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]), ]; + config.output_partitioning = hash_partitioning_from_partition_fields( + config.file_source.table_schema().table_schema(), + config.table_partition_cols(), + config.file_groups.len(), + ); let partitioning = config.output_partitioning(); match partitioning { @@ -2659,11 +2625,15 @@ mod tests { Statistics::new_unknown(&file_schema), multiple_partition_cols, ); - config.partitioned_by_file_group = true; config.file_groups = vec![ FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]), FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]), ]; + config.output_partitioning = hash_partitioning_from_partition_fields( + config.file_source.table_schema().table_schema(), + config.table_partition_cols(), + config.file_groups.len(), + ); let partitioning = config.output_partitioning(); match partitioning { diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index d976bf955dbb2..aa1078df8574b 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -1581,6 +1581,16 @@ mod tests { DataType::Int32, false, )]))); + // Declaring an output partitioning marks the scan as pre-grouped, which + // keeps each stream's files local (disables shared work stealing). + let output_partitioning = self.partitioned_by_file_group.then(|| { + datafusion_physical_expr::Partitioning::Hash( + vec![Arc::new( + datafusion_physical_expr::expressions::Column::new("i", 0), + )], + file_groups.len(), + ) + }); FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), @@ -1588,7 +1598,7 @@ mod tests { .with_file_groups(file_groups) .with_limit(self.limit) .with_preserve_order(self.preserve_order) - .with_partitioned_by_file_group(self.partitioned_by_file_group) + .with_output_partitioning(output_partitioning) .build() } } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 53ff4a41d466e..8f894bfa404ea 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -586,18 +586,15 @@ pub fn parse_protobuf_file_scan_config( file_source }; - let mut config_builder = FileScanConfigBuilder::new(object_store_url, file_source) + let config = FileScanConfigBuilder::new(object_store_url, file_source) .with_file_groups(file_groups) .with_constraints(constraints) .with_statistics(statistics) .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_output_ordering(output_ordering) .with_output_partitioning(output_partitioning) - .with_batch_size(proto.batch_size.map(|s| s as usize)); - if proto.partitioned_by_file_group.unwrap_or(false) { - config_builder = config_builder.with_partitioned_by_file_group(true); - } - let config = config_builder.build(); + .with_batch_size(proto.batch_size.map(|s| s as usize)) + .build(); Ok(config) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 4614c4f002169..e93bb9cc5fed2 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -562,7 +562,9 @@ pub fn serialize_file_scan_config( constraints: Some(conf.constraints.clone().into()), batch_size: conf.batch_size.map(|s| s as u64), projection_exprs, - partitioned_by_file_group: Some(conf.partitioned_by_file_group), + // Partition grouping is now encoded in `output_partitioning`; this legacy + // wire field is left unset (readers rely on `output_partitioning`). + partitioned_by_file_group: None, output_partitioning, }) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 8acb891d6a94d..7e8d87fd8c6bb 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -4219,24 +4219,6 @@ fn roundtrip_file_scan_config(scan_config: FileScanConfig) -> Result Result<()> { - let file_schema = - Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); - let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); - let scan_config = - FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) - .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( - "/path/to/file.parquet".to_string(), - 1024, - )])]) - .with_partitioned_by_file_group(true) - .build(); - - assert!(roundtrip_file_scan_config(scan_config)?.partitioned_by_file_group); - Ok(()) -} - #[test] fn roundtrip_parquet_exec_output_partitioning() -> Result<()> { let file_schema = From 30a0d3ccbcb798b64bdcde985ac1c3b8e829b766 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Fri, 26 Jun 2026 10:13:29 +0800 Subject: [PATCH 2/2] test: show output_partitioning in EXPLAIN for partition-grouped scans After collapsing `partitioned_by_file_group` onto `output_partitioning`, the declared Hash partitioning is now stored on the scan and therefore rendered by `DataSourceExec`'s Display. Update the affected sqllogictest expected plans accordingly. Behavior is unchanged; only the EXPLAIN text gains an `output_partitioning=Hash(...)` entry on partition-grouped scans. Signed-off-by: Jiawei Zhao --- .../test_files/preserve_file_partitioning.slt | 16 ++++++++-------- .../repartition_subset_satisfaction.slt | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt index 412d606df903f..e2dd22cc82bba 100644 --- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt +++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt @@ -258,7 +258,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), sum(fact_table.value)@2 as sum(fact_table.value)] 02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), sum(fact_table.value)] -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_partitioning=Hash([f_dkey@1], 3), file_type=parquet # Verify results with optimization match results without optimization query TIR rowsort @@ -320,7 +320,7 @@ physical_plan 01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)] 03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted -04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], output_partitioning=Hash([f_dkey@1], 3), file_type=parquet query TIR SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY f_dkey ORDER BY f_dkey; @@ -418,7 +418,7 @@ physical_plan 06)----------FilterExec: service@2 = log 07)------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -09)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ], dynamic_rg_pruning=eligible +09)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], output_partitioning=Hash([f_dkey@1], 3), file_type=parquet, predicate=DynamicFilter [ empty ], dynamic_rg_pruning=eligible query TTTIR rowsort SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value) @@ -493,7 +493,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[f_dkey@2 as f_dkey, timestamp@0 as timestamp, value@1 as value, row_number() PARTITION BY [fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], output_partitioning=Hash([f_dkey@2], 3), file_type=parquet query TPRI rowsort SELECT f_dkey, timestamp, value, @@ -548,7 +548,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), sum(high_cardinality_table.value)@2 as sum(high_cardinality_table.value)] 02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), sum(high_cardinality_table.value)] -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_partitioning=Hash([f_dkey@1], 3), file_type=parquet # Verify results with optimization match results without optimization query TIR rowsort @@ -685,8 +685,8 @@ physical_plan 02)--RepartitionExec: partitioning=Hash([f_dkey@0, env@1], 3), input_partitions=3 03)----AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)] 04)------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[value@2, f_dkey@3, env@0] -05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet -06)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet +05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], output_partitioning=Hash([d_dkey@1], 3), file_type=parquet +06)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_partitioning=Hash([f_dkey@1], 3), file_type=parquet query TTR rowsort SELECT f.f_dkey, d.env, sum(f.value) @@ -722,7 +722,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[f_dkey@0 as f_dkey, timestamp@1 as timestamp, count(Int64(1))@2 as count(*), avg(fact_table.value)@3 as avg(fact_table.value)] 02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey, timestamp@0 as timestamp], aggr=[count(Int64(1)), avg(fact_table.value)] -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_partitioning=Hash([f_dkey@2], 3), file_type=parquet query TPIR rowsort SELECT f_dkey, timestamp, diff --git a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt index 043a62314cb5c..5d45c45fe8535 100644 --- a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt +++ b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt @@ -164,7 +164,7 @@ physical_plan 03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted 04)------RepartitionExec: partitioning=Hash([f_dkey@0, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1], 3), input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 ASC NULLS LAST 05)--------AggregateExec: mode=Partial, gby=[f_dkey@2 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted -06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet +06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], output_partitioning=Hash([f_dkey@2], 3), file_type=parquet # Verify results without subset satisfaction query TPIR rowsort @@ -204,7 +204,7 @@ physical_plan 01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST] 02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as time_bin, count(Int64(1))@2 as count(*), avg(fact_table_ordered.value)@3 as avg(fact_table_ordered.value)] 03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted -04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], output_partitioning=Hash([f_dkey@2], 3), file_type=parquet # Verify results match with subset satisfaction query TPIR rowsort @@ -251,7 +251,7 @@ physical_plan 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)] ORDER BY [fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }\"),fact_table_ordered.timestamp)] ORDER BY [fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 03)----SortExec: expr=[f_dkey@2 ASC NULLS LAST, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@0) ASC NULLS LAST, timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] 04)------RepartitionExec: partitioning=Hash([f_dkey@2, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@0)], 3), input_partitions=3 -05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet +05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], output_partitioning=Hash([f_dkey@2], 3), file_type=parquet # Verify results without subset satisfaction query TPRI rowsort @@ -292,7 +292,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[f_dkey@2 as f_dkey, timestamp@0 as timestamp, value@1 as value, row_number() PARTITION BY [fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)] ORDER BY [fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)] ORDER BY [fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }\"),fact_table_ordered.timestamp)] ORDER BY [fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], output_partitioning=Hash([f_dkey@2], 3), file_type=parquet # Verify results match with subset satisfaction query TPRI rowsort @@ -379,8 +379,8 @@ physical_plan 11)--------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@1, f_dkey@2)], projection=[f_dkey@4, env@0, timestamp@2, value@3] 12)----------------------CoalescePartitionsExec 13)------------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] -14)--------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -15)----------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ], dynamic_rg_pruning=eligible +14)--------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], output_partitioning=Hash([d_dkey@2], 3), file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] +15)----------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], output_partitioning=Hash([f_dkey@2], 3), file_type=parquet, predicate=DynamicFilter [ empty ], dynamic_rg_pruning=eligible # Verify results without subset satisfaction query TPR rowsort @@ -474,8 +474,8 @@ physical_plan 09)----------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@1, f_dkey@2)], projection=[f_dkey@4, env@0, timestamp@2, value@3] 10)------------------CoalescePartitionsExec 11)--------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] -12)----------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -13)------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ], dynamic_rg_pruning=eligible +12)----------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], output_partitioning=Hash([d_dkey@2], 3), file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] +13)------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], output_partitioning=Hash([f_dkey@2], 3), file_type=parquet, predicate=DynamicFilter [ empty ], dynamic_rg_pruning=eligible # Verify results match with subset satisfaction query TPR rowsort