From d33aae6ffc00e22da4b803ec69c4ffc0e9b7a7af Mon Sep 17 00:00:00 2001 From: linfeng Date: Tue, 16 Jun 2026 22:54:34 +0800 Subject: [PATCH 1/2] fix --- .../datasource-parquet/src/bloom_filter.rs | 128 ++++++++++++++++-- .../datasource-parquet/src/opener/mod.rs | 14 +- 2 files changed, 126 insertions(+), 16 deletions(-) diff --git a/datafusion/datasource-parquet/src/bloom_filter.rs b/datafusion/datasource-parquet/src/bloom_filter.rs index 9388aba4385f2..3d4a17067a6df 100644 --- a/datafusion/datasource-parquet/src/bloom_filter.rs +++ b/datafusion/datasource-parquet/src/bloom_filter.rs @@ -39,7 +39,8 @@ pub(crate) struct BloomFilterStatistics { /// Value: /// * [`Sbbf`] (Bloom filter), /// * Parquet physical [`Type`] needed to evaluate literals against the filter - column_sbbf: HashMap, + /// * Type length from the Parquet column descriptor + column_sbbf: HashMap, } impl BloomFilterStatistics { @@ -56,15 +57,27 @@ impl BloomFilterStatistics { } /// Add a Bloom filter and type for the specified column - pub(crate) fn insert(&mut self, column: impl Into, sbbf: Sbbf, ty: Type) { - self.column_sbbf.insert(column.into(), (sbbf, ty)); + pub(crate) fn insert( + &mut self, + column: impl Into, + sbbf: Sbbf, + ty: Type, + type_length: i32, + ) { + self.column_sbbf + .insert(column.into(), (sbbf, ty, type_length)); } /// Helper function for checking if [`Sbbf`] filter contains [`ScalarValue`]. /// /// In case the type of scalar is not supported, returns `true`, assuming that the /// value may be present. - fn check_scalar(sbbf: &Sbbf, value: &ScalarValue, parquet_type: &Type) -> bool { + fn check_scalar( + sbbf: &Sbbf, + value: &ScalarValue, + parquet_type: &Type, + type_length: i32, + ) -> bool { match value { ScalarValue::Utf8(Some(v)) | ScalarValue::Utf8View(Some(v)) @@ -113,8 +126,14 @@ impl BloomFilterStatistics { sbbf.check(&decimal) } Type::FIXED_LEN_BYTE_ARRAY => { - // keep with from_bytes_to_i128 - let b = v.to_be_bytes().to_vec(); + let Ok(type_length) = usize::try_from(type_length) else { + return true; + }; + if type_length == 0 || type_length > 16 { + return true; + } + let b = v.to_be_bytes(); + let b = b[(b.len() - type_length)..].to_vec(); // Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325 let decimal = Decimal::Bytes { value: b.into(), @@ -125,9 +144,12 @@ impl BloomFilterStatistics { } _ => true, }, - ScalarValue::Dictionary(_, inner) => { - BloomFilterStatistics::check_scalar(sbbf, inner, parquet_type) - } + ScalarValue::Dictionary(_, inner) => BloomFilterStatistics::check_scalar( + sbbf, + inner, + parquet_type, + type_length, + ), _ => true, } } @@ -164,7 +186,8 @@ impl PruningStatistics for BloomFilterStatistics { column: &Column, values: &HashSet, ) -> Option { - let (sbbf, parquet_type) = self.column_sbbf.get(column.name.as_str())?; + let (sbbf, parquet_type, type_length) = + self.column_sbbf.get(column.name.as_str())?; // Bloom filters are probabilistic data structures that can return false // positives (i.e. it might return true even if the value is not @@ -173,7 +196,14 @@ impl PruningStatistics for BloomFilterStatistics { let known_not_present = values .iter() - .map(|value| BloomFilterStatistics::check_scalar(sbbf, value, parquet_type)) + .map(|value| { + BloomFilterStatistics::check_scalar( + sbbf, + value, + parquet_type, + *type_length, + ) + }) // The row group doesn't contain any of the values if // all the checks are false .all(|v| !v); @@ -201,15 +231,19 @@ mod tests { use crate::test_util::ExpectedPruning; use crate::{ParquetAccessPlan, ParquetFileMetrics, RowGroupAccessPlanFilter}; + use arrow::array::Decimal128Array; use arrow::datatypes::{DataType, Field, Schema}; + use bytes::{BufMut, BytesMut}; use datafusion_common::Result; use datafusion_expr::{Expr, col, lit}; use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_pruning::PruningPredicate; use object_store::ObjectStoreExt; + use parquet::arrow::ArrowWriter; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::async_reader::ParquetObjectReader; + use parquet::file::properties::{EnabledStatistics, WriterProperties}; #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() { @@ -375,6 +409,40 @@ mod tests { .await } + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_decimal128() { + for precision in [19, 20, 21, 28, 38] { + let scale = 2; + let data = parquet_decimal128_with_bloom_filter(precision, scale); + let schema = Schema::new(vec![Field::new( + "decimal_col", + DataType::Decimal128(precision, scale), + true, + )]); + let expr = col("decimal_col").eq(Expr::Literal( + ScalarValue::Decimal128(Some(500), precision, scale), + None, + )); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + &format!("decimal128-{precision}.parquet"), + data, + &pruning_predicate, + ) + .await + .unwrap(); + + assert_eq!( + pruned_row_groups.access_plan().row_group_indexes(), + vec![2], + "precision {precision}" + ); + } + } + struct BloomFilterTest { file_name: String, schema: Schema, @@ -467,6 +535,33 @@ mod tests { } } + fn parquet_decimal128_with_bloom_filter(precision: u8, scale: i8) -> bytes::Bytes { + let schema = Arc::new(Schema::new(vec![Field::new( + "decimal_col", + DataType::Decimal128(precision, scale), + true, + )])); + let array = Arc::new( + Decimal128Array::from(vec![100, 200, 300, 400, 500, 600]) + .with_precision_and_scale(precision, scale) + .unwrap(), + ) as ArrayRef; + let batch = + arrow::array::RecordBatch::try_new(schema.clone(), vec![array]).unwrap(); + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(2)) + .set_bloom_filter_enabled(true) + .set_statistics_enabled(EnabledStatistics::None) + .build(); + let mut out = BytesMut::new().writer(); + { + let mut writer = ArrowWriter::try_new(&mut out, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + out.into_inner().freeze() + } + /// Evaluates the pruning predicate on the specified row groups and returns the row groups that are left async fn test_row_group_bloom_filter_pruning_predicate( file_name: &str, @@ -520,6 +615,7 @@ mod tests { column_name.to_string(), column_idx, builder.parquet_schema().column(column_idx).physical_type(), + builder.parquet_schema().column(column_idx).type_length(), )) }) .collect::>(); @@ -532,7 +628,8 @@ mod tests { for idx in pruned_row_groups.row_group_indexes() { let mut bloom_filters = BloomFilterStatistics::with_capacity(parquet_columns.len()); - for (column_name, column_idx, physical_type) in &parquet_columns { + for (column_name, column_idx, physical_type, type_length) in &parquet_columns + { let bf = match builder .get_row_group_column_bloom_filter(idx, *column_idx) .await @@ -545,7 +642,12 @@ mod tests { continue; } }; - bloom_filters.insert(column_name.clone(), bf, *physical_type); + bloom_filters.insert( + column_name.clone(), + bf, + *physical_type, + *type_length, + ); } row_group_bloom_filters[idx] = bloom_filters; } diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index 5b517663f9c03..09b60f9d4df4c 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -1170,7 +1170,7 @@ impl RowGroupsPrunedParquetOpen { mem::replace(&mut prepared.async_file_reader, replacement_reader), reader_metadata, ); - let parquet_columns: Vec<(String, usize, Type)> = predicate + let parquet_columns: Vec<(String, usize, Type, i32)> = predicate .literal_columns() .into_iter() .filter_map(|column_name| { @@ -1184,6 +1184,7 @@ impl RowGroupsPrunedParquetOpen { column_name, column_idx, parquet_schema.column(column_idx).physical_type(), + parquet_schema.column(column_idx).type_length(), )) }) .collect(); @@ -1191,7 +1192,9 @@ impl RowGroupsPrunedParquetOpen { for idx in self.row_groups.row_group_indexes() { let mut row_group_filters = BloomFilterStatistics::with_capacity(parquet_columns.len()); - for (column_name, column_idx, physical_type) in &parquet_columns { + for (column_name, column_idx, physical_type, type_length) in + &parquet_columns + { let bf: Sbbf = match builder .get_row_group_column_bloom_filter(idx, *column_idx) .await @@ -1204,7 +1207,12 @@ impl RowGroupsPrunedParquetOpen { continue; } }; - row_group_filters.insert(column_name, bf, *physical_type); + row_group_filters.insert( + column_name, + bf, + *physical_type, + *type_length, + ); } row_group_bloom_filters[idx] = row_group_filters; } From caf34028af7fb5459eebe3c5d90ee749f44f1fbf Mon Sep 17 00:00:00 2001 From: linfeng Date: Fri, 19 Jun 2026 20:04:39 +0800 Subject: [PATCH 2/2] address bloom filter decimal review suggestions --- .../datasource-parquet/src/bloom_filter.rs | 90 +++++++++++++++---- 1 file changed, 73 insertions(+), 17 deletions(-) diff --git a/datafusion/datasource-parquet/src/bloom_filter.rs b/datafusion/datasource-parquet/src/bloom_filter.rs index 3d4a17067a6df..24cb5f3146ce3 100644 --- a/datafusion/datasource-parquet/src/bloom_filter.rs +++ b/datafusion/datasource-parquet/src/bloom_filter.rs @@ -34,13 +34,18 @@ use parquet::data_type::Decimal; /// Parquet row groups and data pages based on the query predicate. #[derive(Debug, Clone, Default)] pub(crate) struct BloomFilterStatistics { - /// Per-column Bloom filters - /// Key: predicate column name - /// Value: - /// * [`Sbbf`] (Bloom filter), - /// * Parquet physical [`Type`] needed to evaluate literals against the filter - /// * Type length from the Parquet column descriptor - column_sbbf: HashMap, + /// Per-column Bloom filters keyed by predicate column name. + column_sbbf: HashMap, +} + +#[derive(Debug, Clone)] +struct ColumnBloomFilter { + /// [`Sbbf`] (Bloom filter). + sbbf: Sbbf, + /// Parquet physical [`Type`] needed to evaluate literals against the filter. + physical_type: Type, + /// Type length from the Parquet column descriptor. + type_length: i32, } impl BloomFilterStatistics { @@ -64,8 +69,14 @@ impl BloomFilterStatistics { ty: Type, type_length: i32, ) { - self.column_sbbf - .insert(column.into(), (sbbf, ty, type_length)); + self.column_sbbf.insert( + column.into(), + ColumnBloomFilter { + sbbf, + physical_type: ty, + type_length, + }, + ); } /// Helper function for checking if [`Sbbf`] filter contains [`ScalarValue`]. @@ -186,8 +197,7 @@ impl PruningStatistics for BloomFilterStatistics { column: &Column, values: &HashSet, ) -> Option { - let (sbbf, parquet_type, type_length) = - self.column_sbbf.get(column.name.as_str())?; + let column_bloom_filter = self.column_sbbf.get(column.name.as_str())?; // Bloom filters are probabilistic data structures that can return false // positives (i.e. it might return true even if the value is not @@ -198,10 +208,10 @@ impl PruningStatistics for BloomFilterStatistics { .iter() .map(|value| { BloomFilterStatistics::check_scalar( - sbbf, + &column_bloom_filter.sbbf, value, - parquet_type, - *type_length, + &column_bloom_filter.physical_type, + column_bloom_filter.type_length, ) }) // The row group doesn't contain any of the values if @@ -413,7 +423,11 @@ mod tests { async fn test_row_group_bloom_filter_pruning_predicate_decimal128() { for precision in [19, 20, 21, 28, 38] { let scale = 2; - let data = parquet_decimal128_with_bloom_filter(precision, scale); + let data = parquet_decimal128_with_bloom_filter( + precision, + scale, + vec![100, 200, 300, 400, 500, 600], + ); let schema = Schema::new(vec![Field::new( "decimal_col", DataType::Decimal128(precision, scale), @@ -443,6 +457,44 @@ mod tests { } } + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_negative_decimal128() { + for precision in [19, 20, 21, 28, 38] { + let scale = 2; + let data = parquet_decimal128_with_bloom_filter( + precision, + scale, + vec![-100, -200, -300, -400, -500, -600], + ); + let schema = Schema::new(vec![Field::new( + "decimal_col", + DataType::Decimal128(precision, scale), + true, + )]); + let expr = col("decimal_col").eq(Expr::Literal( + ScalarValue::Decimal128(Some(-500), precision, scale), + None, + )); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + &format!("negative-decimal128-{precision}.parquet"), + data, + &pruning_predicate, + ) + .await + .unwrap(); + + assert_eq!( + pruned_row_groups.access_plan().row_group_indexes(), + vec![2], + "precision {precision}" + ); + } + } + struct BloomFilterTest { file_name: String, schema: Schema, @@ -535,14 +587,18 @@ mod tests { } } - fn parquet_decimal128_with_bloom_filter(precision: u8, scale: i8) -> bytes::Bytes { + fn parquet_decimal128_with_bloom_filter( + precision: u8, + scale: i8, + values: Vec, + ) -> bytes::Bytes { let schema = Arc::new(Schema::new(vec![Field::new( "decimal_col", DataType::Decimal128(precision, scale), true, )])); let array = Arc::new( - Decimal128Array::from(vec![100, 200, 300, 400, 500, 600]) + Decimal128Array::from(values) .with_precision_and_scale(precision, scale) .unwrap(), ) as ArrayRef;