Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ fn range_analysis_demo() -> Result<()> {
// In this case, we can see that, as expected, `analyze` has figured out
// that in this case, `date` must be in the range `['2020-09-01', '2020-10-01']`
let expected_range = Interval::try_new(september_1, october_1)?;
assert_eq!(analysis_result.boundaries[0].interval, expected_range);
assert_eq!(analysis_result.boundaries[0].interval, Some(expected_range));

Ok(())
}
Expand Down
185 changes: 131 additions & 54 deletions datafusion/physical-expr/src/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ impl AnalysisContext {
#[derive(Clone, Debug, PartialEq)]
pub struct ExprBoundaries {
pub column: Column,
/// Minimum and maximum values this expression can have.
pub interval: Interval,
/// Minimum and maximum values this expression can have. A `None` value
/// indicates that evaluating the given column results in an empty set.
/// For example, if the column `a` has values in the range [10, 20],
/// and there is a filter asserting that `a > 50`, then the resulting interval
/// range of `a` will be `None`.
pub interval: Option<Interval>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the api-change label due to making interval an Option.

/// Maximum number of distinct values this expression can produce, if known.
pub distinct_count: Precision<usize>,
}
Expand Down Expand Up @@ -118,7 +122,7 @@ impl ExprBoundaries {
let column = Column::new(field.name(), col_index);
Ok(ExprBoundaries {
column,
interval,
interval: Some(interval),
distinct_count: col_stats.distinct_count,
})
}
Expand All @@ -133,7 +137,7 @@ impl ExprBoundaries {
.map(|(i, field)| {
Ok(Self {
column: Column::new(field.name(), i),
interval: Interval::make_unbounded(field.data_type())?,
interval: Some(Interval::make_unbounded(field.data_type())?),
distinct_count: Precision::Absent,
})
})
Expand Down Expand Up @@ -161,40 +165,71 @@ pub fn analyze(
context: AnalysisContext,
schema: &Schema,
) -> Result<AnalysisContext> {
let target_boundaries = context.boundaries;

let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?;

let columns = collect_columns(expr)
.into_iter()
.map(|c| Arc::new(c) as _)
.collect::<Vec<_>>();

let target_expr_and_indices = graph.gather_node_indices(columns.as_slice());

let mut target_indices_and_boundaries = target_expr_and_indices
let initial_boundaries = &context.boundaries;
if initial_boundaries
.iter()
.filter_map(|(expr, i)| {
target_boundaries.iter().find_map(|bound| {
expr.as_any()
.downcast_ref::<Column>()
.filter(|expr_column| bound.column.eq(*expr_column))
.map(|_| (*i, bound.interval.clone()))
})
})
.collect::<Vec<_>>();

match graph
.update_ranges(&mut target_indices_and_boundaries, Interval::CERTAINLY_TRUE)?
.all(|bound| bound.interval.is_none())
{
PropagationResult::Success => {
shrink_boundaries(graph, target_boundaries, target_expr_and_indices)
if initial_boundaries
.iter()
.any(|bound| bound.distinct_count != Precision::Exact(0))
{
return internal_err!(
"ExprBoundaries has a non-zero distinct count although it represents an empty table"
);
}
if context.selectivity != Some(0.0) {
return internal_err!(
"AnalysisContext has a non-zero selectivity although it represents an empty table"
);
}
PropagationResult::Infeasible => {
Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
Ok(context)
} else if initial_boundaries
.iter()
.any(|bound| bound.interval.is_none())
{
internal_err!(
"AnalysisContext is an inconsistent state. Some columns represent empty table while others don't"
)
} else {
let mut target_boundaries = context.boundaries;
let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?;
let columns = collect_columns(expr)
.into_iter()
.map(|c| Arc::new(c) as _)
.collect::<Vec<_>>();

let mut target_indices_and_boundaries = vec![];
let target_expr_and_indices = graph.gather_node_indices(columns.as_slice());

for (expr, index) in &target_expr_and_indices {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
if let Some(bound) =
target_boundaries.iter().find(|b| b.column == *column)
{
// Now, it's safe to unwrap
target_indices_and_boundaries
.push((*index, bound.interval.as_ref().unwrap().clone()));
}
}
}
PropagationResult::CannotPropagate => {
Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0))

match graph
.update_ranges(&mut target_indices_and_boundaries, Interval::CERTAINLY_TRUE)?
{
PropagationResult::Success => {
shrink_boundaries(graph, target_boundaries, target_expr_and_indices)
}
PropagationResult::Infeasible => {
// If the propagation result is infeasible, set intervals to None
target_boundaries
.iter_mut()
.for_each(|bound| bound.interval = None);
Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
}
PropagationResult::CannotPropagate => {
Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0))
}
}
}
}
Expand All @@ -215,12 +250,12 @@ fn shrink_boundaries(
.iter_mut()
.find(|bound| bound.column.eq(column))
{
bound.interval = graph.get_interval(*i);
bound.interval = Some(graph.get_interval(*i));
};
}
});

let selectivity = calculate_selectivity(&target_boundaries, &initial_boundaries);
let selectivity = calculate_selectivity(&target_boundaries, &initial_boundaries)?;

if !(0.0..=1.0).contains(&selectivity) {
return internal_err!("Selectivity is out of limit: {}", selectivity);
Expand All @@ -235,16 +270,31 @@ fn shrink_boundaries(
fn calculate_selectivity(
target_boundaries: &[ExprBoundaries],
initial_boundaries: &[ExprBoundaries],
) -> f64 {
) -> Result<f64> {
// Since the intervals are assumed uniform and the values
// are not correlated, we need to multiply the selectivities
// of multiple columns to get the overall selectivity.
initial_boundaries
.iter()
.zip(target_boundaries.iter())
.fold(1.0, |acc, (initial, target)| {
acc * cardinality_ratio(&initial.interval, &target.interval)
})
if target_boundaries.len() != initial_boundaries.len() {
return Err(internal_datafusion_err!(
"The number of columns in the initial and target boundaries should be the same"
));
}
let mut acc: f64 = 1.0;
for (initial, target) in initial_boundaries.iter().zip(target_boundaries) {
match (initial.interval.as_ref(), target.interval.as_ref()) {
(Some(initial), Some(target)) => {
acc *= cardinality_ratio(initial, target);
}
(None, Some(_)) => {
return internal_err!(
"Initial boundary cannot be None while having a Some() target boundary"
);
}
_ => return Ok(0.0),
}
}

Ok(acc)
}

#[cfg(test)]
Expand Down Expand Up @@ -313,16 +363,6 @@ mod tests {
Some(16),
Some(19),
),
// (a > 10 AND a < 20) AND (a > 20 AND a < 30)
(
col("a")
.gt(lit(10))
.and(col("a").lt(lit(20)))
.and(col("a").gt(lit(20)))
.and(col("a").lt(lit(30))),
None,
None,
),
];
for (expr, lower, upper) in test_cases {
let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
Expand All @@ -335,7 +375,9 @@ mod tests {
df_schema.as_ref(),
)
.unwrap();
let actual = &analysis_result.boundaries[0].interval;
let Some(actual) = &analysis_result.boundaries[0].interval else {
panic!("The analysis result should contain non-empty intervals for all columns");
};
let expected = Interval::make(lower, upper).unwrap();
assert_eq!(
&expected, actual,
Expand All @@ -344,6 +386,41 @@ mod tests {
}
}

#[test]
fn test_analyze_empty_set_boundary_exprs() {
let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));

let test_cases: Vec<Expr> = vec![
// a > 10 AND a < 10
col("a").gt(lit(10)).and(col("a").lt(lit(10))),
// a > 5 AND (a < 20 OR a > 20)
// a > 10 AND a < 20
// (a > 10 AND a < 20) AND (a > 20 AND a < 30)
col("a")
.gt(lit(10))
.and(col("a").lt(lit(20)))
.and(col("a").gt(lit(20)))
.and(col("a").lt(lit(30))),
];

for expr in test_cases {
let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
let physical_expr =
create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
let analysis_result = analyze(
&physical_expr,
AnalysisContext::new(boundaries),
df_schema.as_ref(),
)
.unwrap();

for boundary in analysis_result.boundaries {
assert!(boundary.interval.is_none());
}
}
}

#[test]
fn test_analyze_invalid_boundary_exprs() {
let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
Expand Down
25 changes: 18 additions & 7 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::stats::Precision;
use datafusion_common::{
internal_err, plan_err, project_schema, DataFusionError, Result,
internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue,
};
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
Expand Down Expand Up @@ -421,6 +421,15 @@ fn collect_new_statistics(
..
},
)| {
let Some(interval) = interval else {
// If the interval is `None`, we can say that there are no rows:
return ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Null),
min_value: Precision::Exact(ScalarValue::Null),
distinct_count: Precision::Exact(0),
};
};
let (lower, upper) = interval.into_bounds();
let (min_value, max_value) = if lower.eq(&upper) {
(Precision::Exact(lower), Precision::Exact(upper))
Expand Down Expand Up @@ -1042,14 +1051,16 @@ mod tests {
statistics.column_statistics,
vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
min_value: Precision::Exact(ScalarValue::Null),
max_value: Precision::Exact(ScalarValue::Null),
distinct_count: Precision::Exact(0),
null_count: Precision::Exact(0),
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
..Default::default()
min_value: Precision::Exact(ScalarValue::Null),
max_value: Precision::Exact(ScalarValue::Null),
distinct_count: Precision::Exact(0),
null_count: Precision::Exact(0),
},
]
);
Expand Down
Loading