Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 176 additions & 18 deletions datafusion/datasource-parquet/src/bloom_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,18 @@ use parquet::data_type::Decimal;
/// Parquet row groups and data pages based on the query predicate.
#[derive(Debug, Clone, Default)]
pub(crate) struct BloomFilterStatistics {
/// Per-column Bloom filters
/// Key: predicate column name
/// Value:
/// * [`Sbbf`] (Bloom filter),
/// * Parquet physical [`Type`] needed to evaluate literals against the filter
column_sbbf: HashMap<String, (Sbbf, Type)>,
/// Per-column Bloom filters keyed by predicate column name.
column_sbbf: HashMap<String, ColumnBloomFilter>,
}

#[derive(Debug, Clone)]
struct ColumnBloomFilter {
/// [`Sbbf`] (Bloom filter).
sbbf: Sbbf,
/// Parquet physical [`Type`] needed to evaluate literals against the filter.
physical_type: Type,
/// Type length from the Parquet column descriptor.
type_length: i32,
}

impl BloomFilterStatistics {
Expand All @@ -56,15 +62,33 @@ impl BloomFilterStatistics {
}

/// Add a Bloom filter and type for the specified column
pub(crate) fn insert(&mut self, column: impl Into<String>, sbbf: Sbbf, ty: Type) {
self.column_sbbf.insert(column.into(), (sbbf, ty));
pub(crate) fn insert(
&mut self,
column: impl Into<String>,
sbbf: Sbbf,
ty: Type,
type_length: i32,
) {
self.column_sbbf.insert(
column.into(),
ColumnBloomFilter {
sbbf,
physical_type: ty,
type_length,
},
);
}

/// Helper function for checking if [`Sbbf`] filter contains [`ScalarValue`].
///
/// In case the type of scalar is not supported, returns `true`, assuming that the
/// value may be present.
fn check_scalar(sbbf: &Sbbf, value: &ScalarValue, parquet_type: &Type) -> bool {
fn check_scalar(
sbbf: &Sbbf,
value: &ScalarValue,
parquet_type: &Type,
type_length: i32,
) -> bool {
match value {
ScalarValue::Utf8(Some(v))
| ScalarValue::Utf8View(Some(v))
Expand Down Expand Up @@ -113,8 +137,14 @@ impl BloomFilterStatistics {
sbbf.check(&decimal)
}
Type::FIXED_LEN_BYTE_ARRAY => {
// keep with from_bytes_to_i128
let b = v.to_be_bytes().to_vec();
let Ok(type_length) = usize::try_from(type_length) else {
return true;
};
if type_length == 0 || type_length > 16 {
return true;
}
let b = v.to_be_bytes();
let b = b[(b.len() - type_length)..].to_vec();
// Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325
let decimal = Decimal::Bytes {
value: b.into(),
Expand All @@ -125,9 +155,12 @@ impl BloomFilterStatistics {
}
_ => true,
},
ScalarValue::Dictionary(_, inner) => {
BloomFilterStatistics::check_scalar(sbbf, inner, parquet_type)
}
ScalarValue::Dictionary(_, inner) => BloomFilterStatistics::check_scalar(
sbbf,
inner,
parquet_type,
type_length,
),
_ => true,
}
}
Expand Down Expand Up @@ -164,7 +197,7 @@ impl PruningStatistics for BloomFilterStatistics {
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
let (sbbf, parquet_type) = self.column_sbbf.get(column.name.as_str())?;
let column_bloom_filter = self.column_sbbf.get(column.name.as_str())?;

// Bloom filters are probabilistic data structures that can return false
// positives (i.e. it might return true even if the value is not
Expand All @@ -173,7 +206,14 @@ impl PruningStatistics for BloomFilterStatistics {

let known_not_present = values
.iter()
.map(|value| BloomFilterStatistics::check_scalar(sbbf, value, parquet_type))
.map(|value| {
BloomFilterStatistics::check_scalar(
&column_bloom_filter.sbbf,
value,
&column_bloom_filter.physical_type,
column_bloom_filter.type_length,
)
})
// The row group doesn't contain any of the values if
// all the checks are false
.all(|v| !v);
Expand Down Expand Up @@ -201,15 +241,19 @@ mod tests {
use crate::test_util::ExpectedPruning;
use crate::{ParquetAccessPlan, ParquetFileMetrics, RowGroupAccessPlanFilter};

use arrow::array::Decimal128Array;
use arrow::datatypes::{DataType, Field, Schema};
use bytes::{BufMut, BytesMut};
use datafusion_common::Result;
use datafusion_expr::{Expr, col, lit};
use datafusion_physical_expr::planner::logical2physical;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_pruning::PruningPredicate;
use object_store::ObjectStoreExt;
use parquet::arrow::ArrowWriter;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::async_reader::ParquetObjectReader;
use parquet::file::properties::{EnabledStatistics, WriterProperties};

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
Expand Down Expand Up @@ -375,6 +419,82 @@ mod tests {
.await
}

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_decimal128() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice regression coverage for the fixed-width truncation path. It might be worth adding a negative decimal case as well, since Parquet fixed-len decimal bytes depend on two's-complement sign extension and truncation. For example, you could write row groups with negative values and assert that a predicate like decimal_col = -500 keeps only the matching row group.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a negative Decimal128 regression test using values from -100 to -600 and verifying decimal_col = -500 keeps only the matching row group.

for precision in [19, 20, 21, 28, 38] {
let scale = 2;
let data = parquet_decimal128_with_bloom_filter(
precision,
scale,
vec![100, 200, 300, 400, 500, 600],
);
let schema = Schema::new(vec![Field::new(
"decimal_col",
DataType::Decimal128(precision, scale),
true,
)]);
let expr = col("decimal_col").eq(Expr::Literal(
ScalarValue::Decimal128(Some(500), precision, scale),
None,
));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
&format!("decimal128-{precision}.parquet"),
data,
&pruning_predicate,
)
.await
.unwrap();

assert_eq!(
pruned_row_groups.access_plan().row_group_indexes(),
vec![2],
"precision {precision}"
);
}
}

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_negative_decimal128() {
for precision in [19, 20, 21, 28, 38] {
let scale = 2;
let data = parquet_decimal128_with_bloom_filter(
precision,
scale,
vec![-100, -200, -300, -400, -500, -600],
);
let schema = Schema::new(vec![Field::new(
"decimal_col",
DataType::Decimal128(precision, scale),
true,
)]);
let expr = col("decimal_col").eq(Expr::Literal(
ScalarValue::Decimal128(Some(-500), precision, scale),
None,
));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
&format!("negative-decimal128-{precision}.parquet"),
data,
&pruning_predicate,
)
.await
.unwrap();

assert_eq!(
pruned_row_groups.access_plan().row_group_indexes(),
vec![2],
"precision {precision}"
);
}
}

struct BloomFilterTest {
file_name: String,
schema: Schema,
Expand Down Expand Up @@ -467,6 +587,37 @@ mod tests {
}
}

fn parquet_decimal128_with_bloom_filter(
precision: u8,
scale: i8,
values: Vec<i128>,
) -> bytes::Bytes {
let schema = Arc::new(Schema::new(vec![Field::new(
"decimal_col",
DataType::Decimal128(precision, scale),
true,
)]));
let array = Arc::new(
Decimal128Array::from(values)
.with_precision_and_scale(precision, scale)
.unwrap(),
) as ArrayRef;
let batch =
arrow::array::RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
let props = WriterProperties::builder()
.set_max_row_group_row_count(Some(2))
.set_bloom_filter_enabled(true)
.set_statistics_enabled(EnabledStatistics::None)
.build();
let mut out = BytesMut::new().writer();
{
let mut writer = ArrowWriter::try_new(&mut out, schema, Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
}
out.into_inner().freeze()
}

/// Evaluates the pruning predicate on the specified row groups and returns the row groups that are left
async fn test_row_group_bloom_filter_pruning_predicate(
file_name: &str,
Expand Down Expand Up @@ -520,6 +671,7 @@ mod tests {
column_name.to_string(),
column_idx,
builder.parquet_schema().column(column_idx).physical_type(),
builder.parquet_schema().column(column_idx).type_length(),
))
})
.collect::<Vec<_>>();
Expand All @@ -532,7 +684,8 @@ mod tests {
for idx in pruned_row_groups.row_group_indexes() {
let mut bloom_filters =
BloomFilterStatistics::with_capacity(parquet_columns.len());
for (column_name, column_idx, physical_type) in &parquet_columns {
for (column_name, column_idx, physical_type, type_length) in &parquet_columns
{
let bf = match builder
.get_row_group_column_bloom_filter(idx, *column_idx)
.await
Expand All @@ -545,7 +698,12 @@ mod tests {
continue;
}
};
bloom_filters.insert(column_name.clone(), bf, *physical_type);
bloom_filters.insert(
column_name.clone(),
bf,
*physical_type,
*type_length,
);
}
row_group_bloom_filters[idx] = bloom_filters;
}
Expand Down
14 changes: 11 additions & 3 deletions datafusion/datasource-parquet/src/opener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@ impl RowGroupsPrunedParquetOpen {
mem::replace(&mut prepared.async_file_reader, replacement_reader),
reader_metadata,
);
let parquet_columns: Vec<(String, usize, Type)> = predicate
let parquet_columns: Vec<(String, usize, Type, i32)> = predicate
.literal_columns()
.into_iter()
.filter_map(|column_name| {
Expand All @@ -1184,14 +1184,17 @@ impl RowGroupsPrunedParquetOpen {
column_name,
column_idx,
parquet_schema.column(column_idx).physical_type(),
parquet_schema.column(column_idx).type_length(),
))
})
.collect();

for idx in self.row_groups.row_group_indexes() {
let mut row_group_filters =
BloomFilterStatistics::with_capacity(parquet_columns.len());
for (column_name, column_idx, physical_type) in &parquet_columns {
for (column_name, column_idx, physical_type, type_length) in
&parquet_columns
{
let bf: Sbbf = match builder
.get_row_group_column_bloom_filter(idx, *column_idx)
.await
Expand All @@ -1204,7 +1207,12 @@ impl RowGroupsPrunedParquetOpen {
continue;
}
};
row_group_filters.insert(column_name, bf, *physical_type);
row_group_filters.insert(
column_name,
bf,
*physical_type,
*type_length,
);
}
row_group_bloom_filters[idx] = row_group_filters;
}
Expand Down
Loading