diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs index 24334d7983d5e..72279f49f57d3 100644 --- a/datafusion/src/physical_optimizer/pruning.rs +++ b/datafusion/src/physical_optimizer/pruning.rs @@ -37,6 +37,7 @@ use arrow::{ record_batch::RecordBatch, }; +use crate::prelude::lit; use crate::{ error::{DataFusionError, Result}, execution::context::ExecutionContextState, @@ -75,6 +76,12 @@ pub trait PruningStatistics { /// return the number of containers (e.g. row groups) being /// pruned with these statistics fn num_containers(&self) -> usize; + + /// return the number of null values for the named column as an + /// `Option`. + /// + /// Note: the returned array must contain `num_containers()` rows. + fn null_counts(&self, column: &Column) -> Option; } /// Evaluates filter expressions on statistics in order to @@ -200,7 +207,7 @@ impl PruningPredicate { struct RequiredStatColumns { /// The statistics required to evaluate this predicate: /// * The unqualified column in the input schema - /// * Statistics type (e.g. Min or Max) + /// * Statistics type (e.g. Min or Max or Null_Count) /// * The field the statistics value should be placed in for /// pruning predicate evaluation columns: Vec<(Column, StatisticsType, Field)>, @@ -281,6 +288,22 @@ impl RequiredStatColumns { ) -> Result { self.stat_column_expr(column, column_expr, field, StatisticsType::Max, "max") } + + /// rewrite col --> col_null_count + fn null_count_column_expr( + &mut self, + column: &Column, + column_expr: &Expr, + field: &Field, + ) -> Result { + self.stat_column_expr( + column, + column_expr, + field, + StatisticsType::NullCount, + "null_count", + ) + } } impl From> for RequiredStatColumns { @@ -329,6 +352,7 @@ fn build_statistics_record_batch( let array = match statistics_type { StatisticsType::Min => statistics.min_values(column), StatisticsType::Max => statistics.max_values(column), + StatisticsType::NullCount => statistics.null_counts(column), }; let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers)); @@ -582,6 +606,32 @@ fn build_single_column_expr( } } +/// Given an expression reference to `expr`, if `expr` is a column expression, +/// returns a pruning expression in terms of IsNull that will evaluate to true +/// if the column may contain null, and false if definitely does not +/// contain null. +fn build_is_null_column_expr( + expr: &Expr, + schema: &Schema, + required_columns: &mut RequiredStatColumns, +) -> Option { + match expr { + Expr::Column(ref col) => { + let field = schema.field_with_name(&col.name).ok()?; + + let null_count_field = &Field::new(field.name(), DataType::UInt64, false); + required_columns + .null_count_column_expr(col, expr, null_count_field) + .map(|null_count_column_expr| { + // IsNull(column) => null_count > 0 + null_count_column_expr.gt(lit::(0)) + }) + .ok() + } + _ => None, + } +} + /// Translate logical filter expression into pruning predicate /// expression that will evaluate to FALSE if it can be determined no /// rows between the min/max values could pass the predicates. @@ -602,6 +652,11 @@ fn build_predicate_expression( // predicate expression can only be a binary expression let (left, op, right) = match expr { Expr::BinaryExpr { left, op, right } => (left, *op, right), + Expr::IsNull(expr) => { + let expr = build_is_null_column_expr(expr, schema, required_columns) + .unwrap_or(unhandled); + return Ok(expr); + } Expr::Column(col) => { let expr = build_single_column_expr(col, schema, required_columns, false) .unwrap_or(unhandled); @@ -702,6 +757,7 @@ fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result< enum StatisticsType { Min, Max, + NullCount, } #[cfg(test)] @@ -812,6 +868,10 @@ mod tests { .map(|container_stats| container_stats.len()) .unwrap_or(0) } + + fn null_counts(&self, _column: &Column) -> Option { + None + } } /// Returns the specified min/max container values @@ -833,6 +893,10 @@ mod tests { fn num_containers(&self) -> usize { self.num_containers } + + fn null_counts(&self, _column: &Column) -> Option { + None + } } #[test] diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 73f0b8ddb6395..780ad68d2582b 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -342,6 +342,31 @@ macro_rules! get_min_max_values { }} } +// Extract the null count value on the ParquetStatistics +macro_rules! get_null_count_values { + ($self:expr, $column:expr) => {{ + let column_index = + if let Some((v, _)) = $self.parquet_schema.column_with_name(&$column.name) { + v + } else { + // Named column was not present + return None; + }; + + let scalar_values: Vec = $self + .row_group_metadata + .iter() + .flat_map(|meta| meta.column(column_index).statistics()) + .map(|stats| { + ScalarValue::UInt64(Some(stats.null_count().try_into().unwrap())) + }) + .collect(); + + // ignore errors converting to arrays (e.g. different types) + ScalarValue::iter_to_array(scalar_values).ok() + }}; +} + impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn min_values(&self, column: &Column) -> Option { get_min_max_values!(self, column, min, min_bytes) @@ -354,6 +379,10 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn num_containers(&self) -> usize { self.row_group_metadata.len() } + + fn null_counts(&self, column: &Column) -> Option { + get_null_count_values!(self, column) + } } fn build_row_group_predicate( @@ -713,21 +742,7 @@ mod tests { Ok(()) } - #[test] - fn row_group_pruning_predicate_null_expr() -> Result<()> { - use crate::logical_plan::{col, lit}; - // test row group predicate with an unknown (Null) expr - // - // int > 1 and bool = NULL => c1_max > 1 and null - let expr = col("c1") - .gt(lit(15)) - .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Boolean, false), - ])); - let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; - + fn gen_row_group_meta_data_for_pruning_predicate() -> Vec { let schema_descr = get_test_schema_descr(vec![ ("c1", PhysicalType::INT32), ("c2", PhysicalType::BOOLEAN), @@ -743,10 +758,56 @@ mod tests { &schema_descr, vec![ ParquetStatistics::int32(Some(11), Some(20), None, 0, false), - ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), + ParquetStatistics::boolean(Some(false), Some(true), None, 1, false), ], ); - let row_group_metadata = vec![rgm1, rgm2]; + vec![rgm1, rgm2] + } + + #[test] + fn row_group_pruning_predicate_null_expr() -> Result<()> { + use crate::logical_plan::{col, lit}; + // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0 + let expr = col("c1").gt(lit(15)).and(col("c2").is_null()); + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Boolean, false), + ])); + let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; + let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); + + let row_group_predicate = build_row_group_predicate( + &pruning_predicate, + parquet_file_metrics(), + &row_group_metadata, + ); + let row_group_filter = row_group_metadata + .iter() + .enumerate() + .map(|(i, g)| row_group_predicate(g, i)) + .collect::>(); + // First row group was filtered out because it contains no null value on "c2". + assert_eq!(row_group_filter, vec![false, true]); + + Ok(()) + } + + #[test] + fn row_group_pruning_predicate_eq_null_expr() -> Result<()> { + use crate::logical_plan::{col, lit}; + // test row group predicate with an unknown (Null) expr + // + // int > 1 and bool = NULL => c1_max > 1 and null + let expr = col("c1") + .gt(lit(15)) + .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Boolean, false), + ])); + let pruning_predicate = PruningPredicate::try_new(&expr, schema)?; + let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); + let row_group_predicate = build_row_group_predicate( &pruning_predicate, parquet_file_metrics(), @@ -759,7 +820,6 @@ mod tests { .collect::>(); // no row group is filtered out because the predicate expression can't be evaluated // when a null array is generated for a statistics column, - // because the null values propagate to the end result, making the predicate result undefined assert_eq!(row_group_filter, vec![true, true]); Ok(())