Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
save
  • Loading branch information
xudong963 committed Apr 25, 2025
commit 2b4a14feb773e1efbc126461a22b8cdbdebd25b3
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ impl ExecutionPlan for ArrowExec {
fn statistics(&self) -> Result<Statistics> {
self.inner.statistics()
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
self.inner.statistics_by_partition()
}

fn fetch(&self) -> Option<usize> {
self.inner.fetch()
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/datasource-avro/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ impl ExecutionPlan for AvroExec {
self.inner.statistics()
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
self.inner.statistics_by_partition()
}

fn metrics(&self) -> Option<MetricsSet> {
self.inner.metrics()
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/datasource-csv/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ impl ExecutionPlan for CsvExec {
self.inner.statistics()
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
self.inner.statistics_by_partition()
}

fn metrics(&self) -> Option<MetricsSet> {
self.inner.metrics()
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
Ok(vec![self.statistics()?])
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing as CoalesceBatchesExec doesn't change partitioning of its input I think this is the incorrect number of partitions? It should be repeated N times, once for each partition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find. I was misled by the coalesce : )


fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(CoalesceBatchesExec {
input: Arc::clone(&self.input),
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
Ok(vec![self.statistics()?])
}

fn supports_limit_pushdown(&self) -> bool {
true
}
Expand Down
32 changes: 26 additions & 6 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,11 @@ impl FilterExec {

/// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics.
fn statistics_helper(
input: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
input_stats: Statistics,
predicate: &Arc<dyn PhysicalExpr>,
default_selectivity: u8,
) -> Result<Statistics> {
let input_stats = input.statistics()?;
let schema = input.schema();
if !check_support(predicate, &schema) {
let selectivity = default_selectivity as f64 / 100.0;
let mut stats = input_stats.to_inexact();
Expand All @@ -193,7 +192,7 @@ impl FilterExec {
let num_rows = input_stats.num_rows;
let total_byte_size = input_stats.total_byte_size;
let input_analysis_ctx = AnalysisContext::try_from_statistics(
&input.schema(),
&schema,
&input_stats.column_statistics,
)?;

Expand Down Expand Up @@ -260,7 +259,12 @@ impl FilterExec {
) -> Result<PlanProperties> {
// Combine the equal predicates with the input equivalence properties
// to construct the equivalence properties:
let stats = Self::statistics_helper(input, predicate, default_selectivity)?;
let stats = Self::statistics_helper(
input.schema(),
input.statistics()?,
predicate,
default_selectivity,
)?;
let mut eq_properties = input.equivalence_properties().clone();
let (equal_pairs, _) = collect_columns_from_predicate(predicate);
for (lhs, rhs) in equal_pairs {
Expand Down Expand Up @@ -401,13 +405,29 @@ impl ExecutionPlan for FilterExec {
/// predicate's selectivity value can be determined for the incoming data.
fn statistics(&self) -> Result<Statistics> {
let stats = Self::statistics_helper(
&self.input,
self.schema(),
self.input().statistics()?,
self.predicate(),
self.default_selectivity,
)?;
Ok(stats.project(self.projection.as_ref()))
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
let input_stats = self.input.statistics_by_partition()?;
let mut stats = Vec::with_capacity(input_stats.len());
for input_stat in input_stats {
let stat = Self::statistics_helper(
self.schema(),
input_stat,
self.predicate(),
self.default_selectivity,
)?;
stats.push(stat.project(self.projection.as_ref()));
}
Ok(stats)
}

fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::LowerEqual
}
Expand Down
25 changes: 25 additions & 0 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,31 @@ impl ExecutionPlan for CrossJoinExec {
))
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I saw a test for this code. Maybe I missed it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed it lol

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 910454a

todo!()
/*
let left_stats = self.left.statistics_by_partition()?;
// Summarize the `left_stats`
let statistics =
compute_summary_statistics(file_group.iter(), &file_schema, |file| {
file.statistics.as_ref().map(|stats| stats.as_ref())
});
let right_stats = self.right.statistics_by_partition()?;

if left_stats.is_empty() || right_stats.is_empty() {
return Ok(vec![]);
}

let mut stats = Vec::new();
for left in left_stats.iter() {
for right in right_stats.iter() {
stats.push(stats_cartesian_product(left.clone(), right.clone()));
}
}
Ok(stats)
*/
}

/// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done,
/// it returns the new swapped version having the [`CrossJoinExec`] as the top plan.
/// Otherwise, it returns None.
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ impl ExecutionPlan for HashJoinExec {
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
let stats = estimate_join_statistics(
Arc::clone(&self.left),
Arc::clone(&self.right),
self.left.statistics()?,
self.right.statistics()?,
self.on.clone(),
&self.join_type,
&self.join_schema,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,8 @@ impl ExecutionPlan for NestedLoopJoinExec {

fn statistics(&self) -> Result<Statistics> {
estimate_join_statistics(
Arc::clone(&self.left),
Arc::clone(&self.right),
self.left.statistics()?,
self.right.statistics()?,
vec![],
&self.join_type,
&self.join_schema,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ impl ExecutionPlan for SortMergeJoinExec {
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
estimate_join_statistics(
Arc::clone(&self.left),
Arc::clone(&self.right),
self.left.statistics()?,
self.right.statistics()?,
self.on.clone(),
&self.join_type,
&self.schema,
Expand Down
7 changes: 2 additions & 5 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,15 +403,12 @@ struct PartialJoinStatistics {

/// Estimate the statistics for the given join's output.
pub(crate) fn estimate_join_statistics(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
left_stats: Statistics,
right_stats: Statistics,
on: JoinOn,
join_type: &JoinType,
schema: &Schema,
) -> Result<Statistics> {
let left_stats = left.statistics()?;
let right_stats = right.statistics()?;

let join_stats = estimate_join_cardinality(join_type, left_stats, right_stats, &on);
let (num_rows, column_statistics) = match join_stats {
Some(stats) => (Precision::Inexact(stats.num_rows), stats.column_statistics),
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ impl ExecutionPlan for GlobalLimitExec {
)
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
Ok(vec![self.statistics()?])
}

fn fetch(&self) -> Option<usize> {
self.fetch
}
Expand Down
13 changes: 12 additions & 1 deletion datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,18 @@ impl ExecutionPlan for SortExec {
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
todo!()
let input_stats = self.input.statistics_by_partition()?;
let mut stats = Vec::with_capacity(input_stats.len());
for stat in input_stats {
stats.push(Statistics::with_fetch(
stat,
self.schema(),
self.fetch,
0,
1,
)?);
}
Ok(stats)
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Expand Down
40 changes: 27 additions & 13 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,23 @@ impl BoundedWindowAggExec {
.unwrap_or_else(Vec::new)
}
}

fn statistics_helper(&self, statistics: Statistics) -> Result<Statistics> {
let win_cols = self.window_expr.len();
let input_cols = self.input.schema().fields().len();
// TODO stats: some windowing function will maintain invariants such as min, max...
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
// copy stats of the input to the beginning of the schema.
column_statistics.extend(statistics.column_statistics);
for _ in 0..win_cols {
column_statistics.push(ColumnStatistics::new_unknown())
}
Ok(Statistics {
num_rows: statistics.num_rows,
column_statistics,
total_byte_size: Precision::Absent,
})
}
}

impl DisplayAs for BoundedWindowAggExec {
Expand Down Expand Up @@ -344,20 +361,17 @@ impl ExecutionPlan for BoundedWindowAggExec {

fn statistics(&self) -> Result<Statistics> {
let input_stat = self.input.statistics()?;
let win_cols = self.window_expr.len();
let input_cols = self.input.schema().fields().len();
// TODO stats: some windowing function will maintain invariants such as min, max...
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
// copy stats of the input to the beginning of the schema.
column_statistics.extend(input_stat.column_statistics);
for _ in 0..win_cols {
column_statistics.push(ColumnStatistics::new_unknown())
self.statistics_helper(input_stat)
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
let input_stats = self.input.statistics_by_partition()?;
let mut output_stats = Vec::with_capacity(input_stats.len());
for stat in input_stats {
let output_stat = self.statistics_helper(stat.clone())?;
output_stats.push(output_stat);
}
Ok(Statistics {
num_rows: input_stat.num_rows,
column_statistics,
total_byte_size: Precision::Absent,
})
Ok(output_stats)
}
}

Expand Down