Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Support parquet page filtering on min_max for decimal128 columns
Signed-off-by: yangjiang <yangjiang@ebay.com>
  • Loading branch information
Ted-Jiang committed Nov 17, 2022
commit fc8475439bbb5613dc09648e491e0d7ec7d09283
41 changes: 40 additions & 1 deletion datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Execution plan for reading Parquet files

use arrow::datatypes::SchemaRef;
use arrow::datatypes::{DataType, SchemaRef};
use fmt::Debug;
use std::any::Any;
use std::fmt;
Expand Down Expand Up @@ -55,8 +55,10 @@ use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::basic::{ConvertedType, LogicalType};
use parquet::errors::ParquetError;
use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties};
use parquet::schema::types::ColumnDescriptor;

mod metrics;
mod page_filter;
Expand Down Expand Up @@ -654,6 +656,43 @@ pub async fn plan_to_parquet(
}
}

// TODO: consolidate code with arrow-rs
// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
// Copy from the arrow-rs
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the common func here.

assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
}
// The bytes array are from parquet file and must be the big-endian.
// The endian is defined by parquet format, and the reference document
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
i128::from_be_bytes(result)
}

// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
pub(crate) fn parquet_to_arrow_decimal_type(
parquet_column: &ColumnDescriptor,
) -> Option<DataType> {
let type_ptr = parquet_column.self_type_ptr();
match type_ptr.get_basic_info().logical_type() {
Some(LogicalType::Decimal { scale, precision }) => {
Some(DataType::Decimal128(precision as u8, scale as u8))
}
_ => match type_ptr.get_basic_info().converted_type() {
ConvertedType::DECIMAL => Some(DataType::Decimal128(
type_ptr.get_precision() as u8,
type_ptr.get_scale() as u8,
)),
_ => None,
},
}
}

#[cfg(test)]
mod tests {
// See also `parquet_exec` integration test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
//! Contains code to filter entire pages

use arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, StringArray,
BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array,
StringArray,
};
use arrow::datatypes::DataType;
use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
use datafusion_common::{Column, DataFusionError, Result};
use datafusion_optimizer::utils::split_conjunction;
use log::{debug, error, trace};
use parquet::schema::types::ColumnDescriptor;
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
errors::ParquetError,
Expand All @@ -37,6 +40,9 @@ use std::collections::VecDeque;
use std::sync::Arc;

use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use crate::physical_plan::file_format::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};

use super::metrics::ParquetFileMetrics;

Expand Down Expand Up @@ -134,6 +140,7 @@ pub(crate) fn build_page_filter(
&predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
Expand Down Expand Up @@ -307,15 +314,18 @@ fn prune_pages_in_one_row_group(
predicate: &PruningPredicate,
col_offset_indexes: Option<&Vec<PageLocation>>,
col_page_indexes: Option<&Index>,
col_desc: &ColumnDescriptor,
metrics: &ParquetFileMetrics,
) -> Result<Vec<RowSelector>> {
let num_rows = group.num_rows() as usize;
if let (Some(col_offset_indexes), Some(col_page_indexes)) =
(col_offset_indexes, col_page_indexes)
{
let target_type = parquet_to_arrow_decimal_type(col_desc);
let pruning_stats = PagesPruningStatistics {
col_page_indexes,
col_offset_indexes,
target_type: &target_type,
};

match predicate.prune(&pruning_stats) {
Expand Down Expand Up @@ -384,6 +394,9 @@ fn create_row_count_in_each_page(
struct PagesPruningStatistics<'a> {
col_page_indexes: &'a Index,
col_offset_indexes: &'a Vec<PageLocation>,
// target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY`
target_type: &'a Option<DataType>,
}

// Extract the min or max value calling `func` from page idex
Expand All @@ -392,16 +405,50 @@ macro_rules! get_min_max_values_for_page_index {
match $self.col_page_indexes {
Index::NONE => None,
Index::INT32(index) => {
let vec = &index.indexes;
Some(Arc::new(Int32Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
match $self.target_type {
// int32 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
let vec = &index.indexes;
if let Ok(arr) = Decimal128Array::from_iter_values(
vec.iter().map(|x| *x.$func().unwrap() as i128),
)
.with_precision_and_scale(*precision, *scale)
{
return Some(Arc::new(arr));
} else {
return None;
}
}
_ => {
let vec = &index.indexes;
Some(Arc::new(Int32Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
}
}
}
Index::INT64(index) => {
let vec = &index.indexes;
Some(Arc::new(Int64Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
match $self.target_type {
// int64 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
let vec = &index.indexes;
if let Ok(arr) = Decimal128Array::from_iter_values(
vec.iter().map(|x| *x.$func().unwrap() as i128),
)
.with_precision_and_scale(*precision, *scale)
{
return Some(Arc::new(arr));
} else {
return None;
}
}
_ => {
let vec = &index.indexes;
Some(Arc::new(Int64Array::from_iter(
vec.iter().map(|x| x.$func().cloned()),
)))
}
}
}
Index::FLOAT(index) => {
let vec = &index.indexes;
Expand Down Expand Up @@ -430,10 +477,28 @@ macro_rules! get_min_max_values_for_page_index {
.collect();
Some(Arc::new(array))
}
Index::INT96(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => {
Index::INT96(_) => {
//Todo support these type
None
}
Index::FIXED_LEN_BYTE_ARRAY(index) => {
match $self.target_type {
// int32 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
let vec = &index.indexes;
if let Ok(array) = Decimal128Array::from_iter_values(
vec.iter().map(|x| from_bytes_to_i128(x.$func().unwrap())),
)
.with_precision_and_scale(*precision, *scale)
{
return Some(Arc::new(array));
} else {
return None;
}
}
_ => None,
}
}
}
}};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ use datafusion_common::Column;
use datafusion_common::ScalarValue;
use log::debug;

use parquet::{
file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
schema::types::ColumnDescriptor,
use parquet::file::{
metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
};

use crate::physical_plan::file_format::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
use crate::{
datasource::listing::FileRange,
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
};
use parquet::basic::{ConvertedType, LogicalType};

use super::ParquetFileMetrics;

Expand Down Expand Up @@ -85,23 +86,6 @@ struct RowGroupPruningStatistics<'a> {
parquet_schema: &'a Schema,
}

// TODO: consolidate code with arrow-rs
// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
// Copy from the arrow-rs
fn from_bytes_to_i128(b: &[u8]) -> i128 {
assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
}
// The bytes array are from parquet file and must be the big-endian.
// The endian is defined by parquet format, and the reference document
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
i128::from_be_bytes(result)
}

/// Extract the min/max statistics from a `ParquetStatistics` object
macro_rules! get_statistic {
($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{
Expand Down Expand Up @@ -217,24 +201,6 @@ macro_rules! get_null_count_values {
}};
}

// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option<DataType> {
let type_ptr = parquet_column.self_type_ptr();
match type_ptr.get_basic_info().logical_type() {
Some(LogicalType::Decimal { scale, precision }) => {
Some(DataType::Decimal128(precision as u8, scale as u8))
}
_ => match type_ptr.get_basic_info().converted_type() {
ConvertedType::DECIMAL => Some(DataType::Decimal128(
type_ptr.get_precision() as u8,
type_ptr.get_scale() as u8,
)),
_ => None,
},
}
}

impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
get_min_max_values!(self, column, min, min_bytes)
Expand Down
28 changes: 28 additions & 0 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,34 @@ async fn single_file_small_data_pages() {
.with_expected_rows(9745)
.run()
.await;

// decimal_price TV=53819 RL=0 DL=0
// ----------------------------------------------------------------------------
// row group 0:
// column index for column decimal_price:
// Boudary order: UNORDERED
// null count min max
// page-0 0 1 9216
// page-1 0 9217 18432
// page-2 0 18433 27648
// page-3 0 27649 36864
// page-4 0 36865 46080
// page-5 0 46081 53819
//
// offset index for column decimal_price:
// offset compressed size first row index
// page-0 5581636 147517 0
// page-1 5729153 147517 9216
TestCase::new(&test_parquet_file)
.with_name("selective_on_decimal")
// predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5
// decimal_price < 9200
.with_filter(col("decimal_price").lt_eq(lit(9200)))
.with_pushdown_expected(PushdownExpected::Some)
.with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
.with_expected_rows(9200)
.run()
.await;
}

/// Expected pushdown behavior
Expand Down
13 changes: 11 additions & 2 deletions test-utils/src/data_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::ops::Range;
use std::sync::Arc;

use arrow::array::{
Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder,
UInt16Builder,
Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder,
TimestampNanosecondBuilder, UInt16Builder,
};
use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
Expand All @@ -43,6 +43,7 @@ struct BatchBuilder {
request_bytes: Int32Builder,
response_bytes: Int32Builder,
response_status: UInt16Builder,
prices_status: Decimal128Builder,

/// optional number of rows produced
row_limit: Option<usize>,
Expand Down Expand Up @@ -73,6 +74,7 @@ impl BatchBuilder {
Field::new("request_bytes", DataType::Int32, true),
Field::new("response_bytes", DataType::Int32, true),
Field::new("response_status", DataType::UInt16, false),
Field::new("decimal_price", DataType::Decimal128(38, 0), false),
]))
}

Expand Down Expand Up @@ -146,6 +148,7 @@ impl BatchBuilder {
.append_option(rng.gen_bool(0.9).then(|| rng.gen()));
self.response_status
.append_value(status[rng.gen_range(0..status.len())]);
self.prices_status.append_value(self.row_count as i128);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the incrementing price makes sense for range testing

}

fn finish(mut self, schema: SchemaRef) -> RecordBatch {
Expand All @@ -166,6 +169,12 @@ impl BatchBuilder {
Arc::new(self.request_bytes.finish()),
Arc::new(self.response_bytes.finish()),
Arc::new(self.response_status.finish()),
Arc::new(
self.prices_status
.finish()
.with_precision_and_scale(38, 0)
.unwrap(),
),
],
)
.unwrap()
Expand Down