Skip to content

Commit 03c1a8d

Browse files
authored
use ProjectionExprs:project_statistics in FileScanConfig (#19094)
- Fixes #14936 - fixes #19075
1 parent 1844c88 commit 03c1a8d

File tree

12 files changed

+272
-220
lines changed

12 files changed

+272
-220
lines changed

datafusion/common/src/stats.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,31 @@ impl Statistics {
317317
}
318318
}
319319

320+
/// Calculates `total_byte_size` based on the schema and `num_rows`.
321+
/// If any of the columns has non-primitive width, `total_byte_size` is set to inexact.
322+
pub fn calculate_total_byte_size(&mut self, schema: &Schema) {
323+
let mut row_size = Some(0);
324+
for field in schema.fields() {
325+
match field.data_type().primitive_width() {
326+
Some(width) => {
327+
row_size = row_size.map(|s| s + width);
328+
}
329+
None => {
330+
row_size = None;
331+
break;
332+
}
333+
}
334+
}
335+
match row_size {
336+
None => {
337+
self.total_byte_size = self.total_byte_size.to_inexact();
338+
}
339+
Some(size) => {
340+
self.total_byte_size = self.num_rows.multiply(&Precision::Exact(size));
341+
}
342+
}
343+
}
344+
320345
/// Returns an unbounded `ColumnStatistics` for each field in the schema.
321346
pub fn unknown_column(schema: &Schema) -> Vec<ColumnStatistics> {
322347
schema

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ mod tests {
724724
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
725725
assert_eq!(
726726
exec.partition_statistics(None)?.total_byte_size,
727-
Precision::Exact(671)
727+
Precision::Absent,
728728
);
729729

730730
Ok(())
@@ -770,10 +770,9 @@ mod tests {
770770
exec.partition_statistics(None)?.num_rows,
771771
Precision::Exact(8)
772772
);
773-
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
774773
assert_eq!(
775774
exec.partition_statistics(None)?.total_byte_size,
776-
Precision::Exact(671)
775+
Precision::Absent,
777776
);
778777
let batches = collect(exec, task_ctx).await?;
779778
assert_eq!(1, batches.len());

datafusion/core/src/datasource/listing/table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ mod tests {
257257
);
258258
assert_eq!(
259259
exec.partition_statistics(None)?.total_byte_size,
260-
Precision::Exact(671)
260+
Precision::Absent,
261261
);
262262

263263
Ok(())
@@ -1397,7 +1397,7 @@ mod tests {
13971397
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
13981398
assert_eq!(
13991399
exec_enabled.partition_statistics(None)?.total_byte_size,
1400-
Precision::Exact(671)
1400+
Precision::Absent,
14011401
);
14021402

14031403
Ok(())

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,9 @@ async fn load_table_stats_with_session_level_cache() {
128128
);
129129
assert_eq!(
130130
exec1.partition_statistics(None).unwrap().total_byte_size,
131-
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
132-
Precision::Exact(671),
131+
// Byte size is absent because we cannot estimate the output size
132+
// of the Arrow data since there are variable length columns.
133+
Precision::Absent,
133134
);
134135
assert_eq!(get_static_cache_size(&state1), 1);
135136

@@ -143,8 +144,8 @@ async fn load_table_stats_with_session_level_cache() {
143144
);
144145
assert_eq!(
145146
exec2.partition_statistics(None).unwrap().total_byte_size,
146-
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
147-
Precision::Exact(671),
147+
// Absent because the data contains variable length columns
148+
Precision::Absent,
148149
);
149150
assert_eq!(get_static_cache_size(&state2), 1);
150151

@@ -158,8 +159,8 @@ async fn load_table_stats_with_session_level_cache() {
158159
);
159160
assert_eq!(
160161
exec3.partition_statistics(None).unwrap().total_byte_size,
161-
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
162-
Precision::Exact(671),
162+
// Absent because the data contains variable length columns
163+
Precision::Absent,
163164
);
164165
// List same file no increase
165166
assert_eq!(get_static_cache_size(&state1), 1);

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ mod test {
6767
/// - Each partition has an "id" column (INT) with the following values:
6868
/// - First partition: [3, 4]
6969
/// - Second partition: [1, 2]
70-
/// - Each row is 110 bytes in size
70+
/// - Each partition has 16 bytes total (Int32 id: 4 bytes × 2 rows + Date32 date: 4 bytes × 2 rows)
7171
///
7272
/// @param create_table_sql Optional parameter to set the create table SQL
7373
/// @param target_partition Optional parameter to set the target partitions
@@ -215,9 +215,9 @@ mod test {
215215
.map(|idx| scan.partition_statistics(Some(idx)))
216216
.collect::<Result<Vec<_>>>()?;
217217
let expected_statistic_partition_1 =
218-
create_partition_statistics(2, 110, 3, 4, true);
218+
create_partition_statistics(2, 16, 3, 4, true);
219219
let expected_statistic_partition_2 =
220-
create_partition_statistics(2, 110, 1, 2, true);
220+
create_partition_statistics(2, 16, 1, 2, true);
221221
// Check the statistics of each partition
222222
assert_eq!(statistics.len(), 2);
223223
assert_eq!(statistics[0], expected_statistic_partition_1);
@@ -277,8 +277,7 @@ mod test {
277277
let statistics = (0..sort_exec.output_partitioning().partition_count())
278278
.map(|idx| sort_exec.partition_statistics(Some(idx)))
279279
.collect::<Result<Vec<_>>>()?;
280-
let expected_statistic_partition =
281-
create_partition_statistics(4, 220, 1, 4, true);
280+
let expected_statistic_partition = create_partition_statistics(4, 32, 1, 4, true);
282281
assert_eq!(statistics.len(), 1);
283282
assert_eq!(statistics[0], expected_statistic_partition);
284283
// Check the statistics_by_partition with real results
@@ -292,9 +291,9 @@ mod test {
292291
SortExec::new(ordering.into(), scan_2).with_preserve_partitioning(true),
293292
);
294293
let expected_statistic_partition_1 =
295-
create_partition_statistics(2, 110, 3, 4, true);
294+
create_partition_statistics(2, 16, 3, 4, true);
296295
let expected_statistic_partition_2 =
297-
create_partition_statistics(2, 110, 1, 2, true);
296+
create_partition_statistics(2, 16, 1, 2, true);
298297
let statistics = (0..sort_exec.output_partitioning().partition_count())
299298
.map(|idx| sort_exec.partition_statistics(Some(idx)))
300299
.collect::<Result<Vec<_>>>()?;
@@ -366,9 +365,9 @@ mod test {
366365
// Check that we have 4 partitions (2 from each scan)
367366
assert_eq!(statistics.len(), 4);
368367
let expected_statistic_partition_1 =
369-
create_partition_statistics(2, 110, 3, 4, true);
368+
create_partition_statistics(2, 16, 3, 4, true);
370369
let expected_statistic_partition_2 =
371-
create_partition_statistics(2, 110, 1, 2, true);
370+
create_partition_statistics(2, 16, 1, 2, true);
372371
// Verify first partition (from first scan)
373372
assert_eq!(statistics[0], expected_statistic_partition_1);
374373
// Verify second partition (from first scan)
@@ -418,7 +417,7 @@ mod test {
418417

419418
let expected_stats = Statistics {
420419
num_rows: Precision::Inexact(4),
421-
total_byte_size: Precision::Inexact(220),
420+
total_byte_size: Precision::Inexact(32),
422421
column_statistics: vec![
423422
ColumnStatistics::new_unknown(),
424423
ColumnStatistics::new_unknown(),
@@ -462,7 +461,7 @@ mod test {
462461
// Check that we have 2 partitions
463462
assert_eq!(statistics.len(), 2);
464463
let mut expected_statistic_partition_1 =
465-
create_partition_statistics(8, 48400, 1, 4, true);
464+
create_partition_statistics(8, 512, 1, 4, true);
466465
expected_statistic_partition_1
467466
.column_statistics
468467
.push(ColumnStatistics {
@@ -473,7 +472,7 @@ mod test {
473472
distinct_count: Precision::Absent,
474473
});
475474
let mut expected_statistic_partition_2 =
476-
create_partition_statistics(8, 48400, 1, 4, true);
475+
create_partition_statistics(8, 512, 1, 4, true);
477476
expected_statistic_partition_2
478477
.column_statistics
479478
.push(ColumnStatistics {
@@ -501,9 +500,9 @@ mod test {
501500
let coalesce_batches: Arc<dyn ExecutionPlan> =
502501
Arc::new(CoalesceBatchesExec::new(scan, 2));
503502
let expected_statistic_partition_1 =
504-
create_partition_statistics(2, 110, 3, 4, true);
503+
create_partition_statistics(2, 16, 3, 4, true);
505504
let expected_statistic_partition_2 =
506-
create_partition_statistics(2, 110, 1, 2, true);
505+
create_partition_statistics(2, 16, 1, 2, true);
507506
let statistics = (0..coalesce_batches.output_partitioning().partition_count())
508507
.map(|idx| coalesce_batches.partition_statistics(Some(idx)))
509508
.collect::<Result<Vec<_>>>()?;
@@ -525,8 +524,7 @@ mod test {
525524
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
526525
let coalesce_partitions: Arc<dyn ExecutionPlan> =
527526
Arc::new(CoalescePartitionsExec::new(scan));
528-
let expected_statistic_partition =
529-
create_partition_statistics(4, 220, 1, 4, true);
527+
let expected_statistic_partition = create_partition_statistics(4, 32, 1, 4, true);
530528
let statistics = (0..coalesce_partitions.output_partitioning().partition_count())
531529
.map(|idx| coalesce_partitions.partition_statistics(Some(idx)))
532530
.collect::<Result<Vec<_>>>()?;
@@ -575,8 +573,7 @@ mod test {
575573
.map(|idx| global_limit.partition_statistics(Some(idx)))
576574
.collect::<Result<Vec<_>>>()?;
577575
assert_eq!(statistics.len(), 1);
578-
let expected_statistic_partition =
579-
create_partition_statistics(2, 110, 3, 4, true);
576+
let expected_statistic_partition = create_partition_statistics(2, 16, 3, 4, true);
580577
assert_eq!(statistics[0], expected_statistic_partition);
581578
Ok(())
582579
}
@@ -627,7 +624,11 @@ mod test {
627624

628625
let expected_p0_statistics = Statistics {
629626
num_rows: Precision::Inexact(2),
630-
total_byte_size: Precision::Inexact(110),
627+
// Each row produces 8 bytes of data:
628+
// - id column: Int32 (4 bytes) × 2 rows = 8 bytes
629+
// - id + 1 column: Int32 (4 bytes) × 2 rows = 8 bytes
630+
// AggregateExec cannot yet derive byte sizes for the COUNT(c) column
631+
total_byte_size: Precision::Inexact(16),
631632
column_statistics: vec![
632633
ColumnStatistics {
633634
null_count: Precision::Absent,
@@ -645,7 +646,11 @@ mod test {
645646

646647
let expected_p1_statistics = Statistics {
647648
num_rows: Precision::Inexact(2),
648-
total_byte_size: Precision::Inexact(110),
649+
// Each row produces 8 bytes of data:
650+
// - id column: Int32 (4 bytes) × 2 rows = 8 bytes
651+
// - id + 1 column: Int32 (4 bytes) × 2 rows = 8 bytes
652+
// AggregateExec cannot yet derive byte sizes for the COUNT(c) column
653+
total_byte_size: Precision::Inexact(16),
649654
column_statistics: vec![
650655
ColumnStatistics {
651656
null_count: Precision::Absent,
@@ -851,7 +856,7 @@ mod test {
851856

852857
let expected_stats = Statistics {
853858
num_rows: Precision::Inexact(1),
854-
total_byte_size: Precision::Inexact(73),
859+
total_byte_size: Precision::Inexact(10),
855860
column_statistics: vec![
856861
ColumnStatistics::new_unknown(),
857862
ColumnStatistics::new_unknown(),
@@ -955,7 +960,7 @@ mod test {
955960

956961
let expected_stats = Statistics {
957962
num_rows: Precision::Inexact(2),
958-
total_byte_size: Precision::Inexact(110),
963+
total_byte_size: Precision::Inexact(16),
959964
column_statistics: vec![
960965
ColumnStatistics::new_unknown(),
961966
ColumnStatistics::new_unknown(),

datafusion/datasource-parquet/src/metadata.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,8 @@ impl<'a> DFParquetMetadata<'a> {
247247
let mut statistics = Statistics::new_unknown(table_schema);
248248
let mut has_statistics = false;
249249
let mut num_rows = 0_usize;
250-
let mut total_byte_size = 0_usize;
251250
for row_group_meta in row_groups_metadata {
252251
num_rows += row_group_meta.num_rows() as usize;
253-
total_byte_size += row_group_meta.total_byte_size() as usize;
254252

255253
if !has_statistics {
256254
has_statistics = row_group_meta
@@ -260,7 +258,7 @@ impl<'a> DFParquetMetadata<'a> {
260258
}
261259
}
262260
statistics.num_rows = Precision::Exact(num_rows);
263-
statistics.total_byte_size = Precision::Exact(total_byte_size);
261+
statistics.calculate_total_byte_size(table_schema);
264262

265263
let file_metadata = metadata.file_metadata();
266264
let mut file_schema = parquet_to_arrow_schema(

0 commit comments

Comments
 (0)