From 24017c28db1411baa6b34f10a163e39f7cd78274 Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Wed, 21 Aug 2024 22:37:29 +0930 Subject: [PATCH 01/13] feat(parquet): coerce_types flag for date64 --- parquet-testing | 2 +- .../src/arrow/array_reader/primitive_array.rs | 120 ++++++++++++++++-- parquet/src/arrow/arrow_reader/mod.rs | 115 ++++++++++++++++- parquet/src/arrow/arrow_reader/statistics.rs | 17 +-- parquet/src/arrow/arrow_writer/mod.rs | 14 +- parquet/src/arrow/schema/mod.rs | 55 ++++---- parquet/src/arrow/schema/primitive.rs | 2 + parquet/src/file/properties.rs | 20 +++ parquet/src/schema/types.rs | 16 ++- 9 files changed, 304 insertions(+), 57 deletions(-) diff --git a/parquet-testing b/parquet-testing index 50af3d8ce206..cb7a9674142c 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 50af3d8ce206990d81014b1862e5ce7380dc3e08 +Subproject commit cb7a9674142c137367bf75a01b79c6e214a73199 diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 010e9c2eed3f..0640303d0330 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -208,14 +208,8 @@ where // As there is not always a 1:1 mapping between Arrow and Parquet, there // are datatypes which we must convert explicitly. // These are: - // - date64: we should cast int32 to date32, then date32 to date64. - // - decimal: cast in32 to decimal, int64 to decimal + // - decimal: cast int32 to decimal, int64 to decimal let array = match target_type { - ArrowType::Date64 => { - // this is cheap as it internally reinterprets the data - let a = arrow_cast::cast(&array, &ArrowType::Date32)?; - arrow_cast::cast(&a, target_type)? - } ArrowType::Decimal128(p, s) => { // Apply conversion to all elements regardless of null slots as the conversion // to `i128` is infallible. This improves performance by avoiding a branch in @@ -305,9 +299,9 @@ mod tests { use crate::util::test_common::rand_gen::make_pages; use crate::util::InMemoryPageIterator; use arrow::datatypes::ArrowPrimitiveType; - use arrow_array::{Array, PrimitiveArray}; + use arrow_array::{Array, Date32Array, Date64Array, PrimitiveArray}; - use arrow::datatypes::DataType::Decimal128; + use arrow::datatypes::DataType::{Date32, Date64, Decimal128}; use rand::distributions::uniform::SampleUniform; use std::collections::VecDeque; @@ -545,6 +539,14 @@ mod tests { arrow::datatypes::Int32Type, i32 ); + test_primitive_array_reader_one_type!( + crate::data_type::Int64Type, + PhysicalType::INT64, + "DATE", + arrow::datatypes::Date64Type, + arrow::datatypes::Int64Type, + i64 + ); test_primitive_array_reader_one_type!( crate::data_type::Int32Type, PhysicalType::INT32, @@ -783,4 +785,104 @@ mod tests { assert_ne!(array, &data_decimal_array) } } + + #[test] + fn test_primitive_array_reader_date32_type() { + // parquet `INT32` to date + let message_type = " + message test_schema { + REQUIRED INT32 date1 (DATE); + } + "; + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + let column_desc = schema.column(0); + + // create the array reader + { + let mut data = Vec::new(); + let mut page_lists = Vec::new(); + make_column_chunks::( + column_desc.clone(), + Encoding::PLAIN, + 100, + -99999999, + 99999999, + &mut Vec::new(), + &mut Vec::new(), + &mut data, + &mut page_lists, + true, + 2, + ); + let page_iterator = InMemoryPageIterator::new(page_lists); + + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); + + // read data from the reader + // the data type is date + let array = array_reader.next_batch(50).unwrap(); + assert_eq!(array.data_type(), &Date32); + let array = array.as_any().downcast_ref::().unwrap(); + let data_date_array = data[0..50] + .iter() + .copied() + .map(Some) + .collect::(); + assert_eq!(array, &data_date_array); + } + } + + #[test] + fn test_primitive_array_reader_date64_type() { + // parquet `INT64` to date + let message_type = " + message test_schema { + REQUIRED INT64 date1 (DATE); + } + "; + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + let column_desc = schema.column(0); + + // create the array reader + { + let mut data = Vec::new(); + let mut page_lists = Vec::new(); + make_column_chunks::( + column_desc.clone(), + Encoding::PLAIN, + 100, + -999999999999999999, + 999999999999999999, + &mut Vec::new(), + &mut Vec::new(), + &mut data, + &mut page_lists, + true, + 2, + ); + let page_iterator = InMemoryPageIterator::new(page_lists); + + let mut array_reader = + PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) + .unwrap(); + + // read data from the reader + // the data type is date + let array = array_reader.next_batch(50).unwrap(); + assert_eq!(array.data_type(), &Date64); + let array = array.as_any().downcast_ref::().unwrap(); + let data_date_array = data[0..50] + .iter() + .copied() + .map(Some) + .collect::(); + assert_eq!(array, &data_date_array); + } + } } diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index a109851f72b6..0c931b4defe8 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -932,8 +932,8 @@ mod tests { use arrow_array::builder::*; use arrow_array::cast::AsArray; use arrow_array::types::{ - Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type, - Time32MillisecondType, Time64MicrosecondType, + Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type, + Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType, }; use arrow_array::*; use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime}; @@ -1272,6 +1272,117 @@ mod tests { Ok(()) } + #[test] + fn test_date32_roundtrip() -> Result<()> { + use arrow_array::Date32Array; + + let schema = Arc::new(Schema::new(vec![Field::new( + "date32", + ArrowDataType::Date32, + false, + )])); + + let mut buf = Vec::with_capacity(1024); + + let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?; + + let original = RecordBatch::try_new( + schema, + vec![Arc::new(Date32Array::from(vec![ + -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000, + ]))], + )?; + + writer.write(&original)?; + writer.close()?; + + let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?; + let ret = reader.next().unwrap()?; + assert_eq!(ret, original); + + // Ensure can be downcast to the correct type + ret.column(0).as_primitive::(); + + Ok(()) + } + + #[test] + fn test_date64_roundtrip() -> Result<()> { + use arrow_array::Date64Array; + + let schema = Arc::new(Schema::new(vec![ + Field::new("small-date64", ArrowDataType::Date64, false), + Field::new("big-date64", ArrowDataType::Date64, false), + Field::new("invalid-date64", ArrowDataType::Date64, false), + ])); + + let mut default_buf = Vec::with_capacity(1024); + let mut coerce_buf = Vec::with_capacity(1024); + + let coerce_props = WriterProperties::builder().set_coerce_types(true).build(); + + let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?; + let mut coerce_writer = + ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?; + + static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24; + + let original = RecordBatch::try_new( + schema, + vec![ + // small-date64 + Arc::new(Date64Array::from(vec![ + -1_000_000 * NUM_MILLISECONDS_IN_DAY, + -1_000 * NUM_MILLISECONDS_IN_DAY, + 0, + 1_000 * NUM_MILLISECONDS_IN_DAY, + 1_000_000 * NUM_MILLISECONDS_IN_DAY, + ])), + // big-date64 + Arc::new(Date64Array::from(vec![ + -10_000_000_000 * NUM_MILLISECONDS_IN_DAY, + -1_000_000_000 * NUM_MILLISECONDS_IN_DAY, + 0, + 1_000_000_000 * NUM_MILLISECONDS_IN_DAY, + 10_000_000_000 * NUM_MILLISECONDS_IN_DAY, + ])), + // invalid-date64 + Arc::new(Date64Array::from(vec![ + -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1, + -1_000 * NUM_MILLISECONDS_IN_DAY + 1, + 1, + 1_000 * NUM_MILLISECONDS_IN_DAY + 1, + 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1, + ])), + ], + )?; + + default_writer.write(&original)?; + coerce_writer.write(&original)?; + + default_writer.close()?; + coerce_writer.close()?; + + let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?; + let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?; + + let default_ret = default_reader.next().unwrap()?; + let coerce_ret = coerce_reader.next().unwrap()?; + + // Roundtrip should be successful when default writer used + assert_eq!(default_ret, original); + + // Only small-date64 should roundtrip successfully when coerce_types writer is used + assert_eq!(coerce_ret.column(0), original.column(0)); + assert_ne!(coerce_ret.column(1), original.column(1)); + assert_ne!(coerce_ret.column(2), original.column(2)); + + // Ensure both can be downcast to the correct type + default_ret.column(0).as_primitive::(); + coerce_ret.column(0).as_primitive::(); + + Ok(()) + } struct RandFixedLenGen {} impl RandGen for RandFixedLenGen { diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 8a7511be2afe..79254d1171ad 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -371,8 +371,7 @@ macro_rules! get_statistics { [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()), ))), DataType::Date64 => Ok(Arc::new(Date64Array::from_iter( - [<$stat_type_prefix Int32StatsIterator>]::new($iterator) - .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)), + [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()), ))), DataType::Timestamp(unit, timezone) =>{ let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()); @@ -941,19 +940,7 @@ macro_rules! get_data_page_statistics { }) }, DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Date64 => Ok( - Arc::new( - Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter() - .map(|x| { - x.and_then(|x| i64::try_from(x).ok()) - }) - .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000)) - }).flatten() - ) - ) - ), + DataType::Date64 => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), DataType::Decimal128(precision, scale) => Ok(Arc::new( Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), DataType::Decimal256(precision, scale) => Ok(Arc::new( diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 3ec7a3dfea36..13ea2601ff84 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -180,11 +180,11 @@ impl ArrowWriter { arrow_schema: SchemaRef, options: ArrowWriterOptions, ) -> Result { + let mut props = options.properties; let schema = match options.schema_root { - Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s)?, - None => arrow_to_parquet_schema(&arrow_schema)?, + Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?, + None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?, }; - let mut props = options.properties; if !options.skip_arrow_metadata { // add serialized arrow schema add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); @@ -549,8 +549,8 @@ impl ArrowColumnChunk { /// ])); /// /// // Compute the parquet schema -/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap(); /// let props = Arc::new(WriterProperties::default()); +/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref(), props.coerce_types()).unwrap(); /// /// // Create writers for each of the leaf columns /// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap(); @@ -858,6 +858,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result { match column.data_type() { + ArrowDataType::Date64 => { + let array = arrow_cast::cast(column, &ArrowDataType::Int64)?; + + let array = array.as_primitive::(); + write_primitive(typed, array.values(), levels) + } ArrowDataType::Int64 => { let array = column.as_primitive::(); write_primitive(typed, array.values(), levels) diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 3ed3bd24e0a8..97c9ea1d98b0 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -229,16 +229,16 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut /// /// The name of the root schema element defaults to `"arrow_schema"`, this can be /// overridden with [`arrow_to_parquet_schema_with_root`] -pub fn arrow_to_parquet_schema(schema: &Schema) -> Result { - arrow_to_parquet_schema_with_root(schema, "arrow_schema") +pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) -> Result { + arrow_to_parquet_schema_with_root(schema, "arrow_schema", coerce_types) } /// Convert arrow schema to parquet schema specifying the name of the root schema element -pub fn arrow_to_parquet_schema_with_root(schema: &Schema, root: &str) -> Result { +pub fn arrow_to_parquet_schema_with_root(schema: &Schema, root: &str, coerce_types: bool) -> Result { let fields = schema .fields() .iter() - .map(|field| arrow_to_parquet_type(field).map(Arc::new)) + .map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new)) .collect::>()?; let group = Type::group_type_builder(root).with_fields(fields).build()?; Ok(SchemaDescriptor::new(Arc::new(group))) @@ -298,7 +298,7 @@ pub fn decimal_length_from_precision(precision: u8) -> usize { } /// Convert an arrow field to a parquet `Type` -fn arrow_to_parquet_type(field: &Field) -> Result { +fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { let name = field.name().as_str(); let repetition = if field.is_nullable() { Repetition::OPTIONAL @@ -415,12 +415,17 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_repetition(repetition) .with_id(id) .build(), - // date64 is cast to date32 (#1666) - DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32) - .with_logical_type(Some(LogicalType::Date)) - .with_repetition(repetition) - .with_id(id) - .build(), + DataType::Date64 => { + let physical_type = match coerce_types { + true => PhysicalType::INT32, + false => PhysicalType::INT64, + }; + Type::primitive_type_builder(name, physical_type) + .with_logical_type(Some(LogicalType::Date)) + .with_repetition(repetition) + .with_id(id) + .build() + }, DataType::Time32(TimeUnit::Second) => { // Cannot represent seconds in LogicalType Type::primitive_type_builder(name, PhysicalType::INT32) @@ -518,7 +523,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { Type::group_type_builder(name) .with_fields(vec![Arc::new( Type::group_type_builder("list") - .with_fields(vec![Arc::new(arrow_to_parquet_type(f)?)]) + .with_fields(vec![Arc::new(arrow_to_parquet_type(f, coerce_types)?)]) .with_repetition(Repetition::REPEATED) .build()?, )]) @@ -537,7 +542,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { // recursively convert children to types/nodes let fields = fields .iter() - .map(|f| arrow_to_parquet_type(f).map(Arc::new)) + .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new)) .collect::>()?; Type::group_type_builder(name) .with_fields(fields) @@ -551,8 +556,8 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_fields(vec![Arc::new( Type::group_type_builder(field.name()) .with_fields(vec![ - Arc::new(arrow_to_parquet_type(&struct_fields[0])?), - Arc::new(arrow_to_parquet_type(&struct_fields[1])?), + Arc::new(arrow_to_parquet_type(&struct_fields[0], coerce_types)?), + Arc::new(arrow_to_parquet_type(&struct_fields[1], coerce_types)?), ]) .with_repetition(Repetition::REPEATED) .build()?, @@ -571,7 +576,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { DataType::Dictionary(_, ref value) => { // Dictionary encoding not handled at the schema level let dict_field = field.clone().with_data_type(value.as_ref().clone()); - arrow_to_parquet_type(&dict_field) + arrow_to_parquet_type(&dict_field, coerce_types) } DataType::RunEndEncoded(_, _) => Err(arrow_err!( "Converting RunEndEncodedType to parquet not supported", @@ -1325,7 +1330,8 @@ mod tests { OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16); OPTIONAL BINARY string (UTF8); REPEATED BOOLEAN bools; - OPTIONAL INT32 date (DATE); + OPTIONAL INT32 date32 (DATE); + OPTIONAL INT64 date64 (DATE); OPTIONAL INT32 time_milli (TIME_MILLIS); OPTIONAL INT64 time_micro (TIME_MICROS); OPTIONAL INT64 time_nano (TIME(NANOS,false)); @@ -1366,7 +1372,8 @@ mod tests { Field::new("bools", DataType::Boolean, false), false, ), - Field::new("date", DataType::Date32, true), + Field::new("date32", DataType::Date32, true), + Field::new("date64", DataType::Date64, true), Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true), Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true), Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true), @@ -1431,7 +1438,8 @@ mod tests { REQUIRED BOOLEAN element; } } - OPTIONAL INT32 date (DATE); + OPTIONAL INT32 date32 (DATE); + OPTIONAL INT64 date64 (DATE); OPTIONAL INT32 time_milli (TIME(MILLIS,false)); OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true)); OPTIONAL INT64 time_micro (TIME_MICROS); @@ -1484,7 +1492,8 @@ mod tests { Field::new("element", DataType::Boolean, false), false, ), - Field::new("date", DataType::Date32, true), + Field::new("date32", DataType::Date32, true), + Field::new("date64", DataType::Date64, true), Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true), Field::new( "time_milli_utc", @@ -1557,7 +1566,7 @@ mod tests { Field::new("decimal256", DataType::Decimal256(39, 2), false), ]; let arrow_schema = Schema::new(arrow_fields); - let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema).unwrap(); + let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap(); assert_eq!( parquet_schema.columns().len(), @@ -1594,7 +1603,7 @@ mod tests { false, )]; let arrow_schema = Schema::new(arrow_fields); - let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema); + let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true); assert!(converted_arrow_schema.is_err()); converted_arrow_schema.unwrap(); @@ -1868,7 +1877,7 @@ mod tests { // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?; - let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?; + let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema, true)?; let parq_fields = parq_schema_descr.root_schema().get_fields(); assert_eq!(parq_fields.len(), 2); assert_eq!(parq_fields[0].get_basic_info().id(), 1); diff --git a/parquet/src/arrow/schema/primitive.rs b/parquet/src/arrow/schema/primitive.rs index 17dd7862f3dc..9c373ee7b32f 100644 --- a/parquet/src/arrow/schema/primitive.rs +++ b/parquet/src/arrow/schema/primitive.rs @@ -201,6 +201,7 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result Ok(DataType::Int64), false => Ok(DataType::UInt64), }, + (Some(LogicalType::Date), _) => Ok(DataType::Date64), (Some(LogicalType::Time { unit, .. }), _) => match unit { ParquetTimeUnit::MILLIS(_) => { Err(arrow_err!("Cannot create INT64 from MILLIS time unit",)) @@ -228,6 +229,7 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result Ok(DataType::Int64), (None, ConvertedType::UINT_64) => Ok(DataType::UInt64), + (None, ConvertedType::DATE) => Ok(DataType::Date64), (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)), (None, ConvertedType::TIMESTAMP_MILLIS) => Ok(DataType::Timestamp( TimeUnit::Millisecond, diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index efcb63258f99..db8c5fa0cbd2 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -57,6 +57,8 @@ pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05; pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64; /// Default values for [`WriterProperties::statistics_truncate_length`] pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option = None; +/// Default values for [`WriterProperties::coerce_types`] +pub const DEFAULT_COERCE_TYPES: bool = false; /// Parquet writer version. /// @@ -163,6 +165,7 @@ pub struct WriterProperties { sorting_columns: Option>, column_index_truncate_length: Option, statistics_truncate_length: Option, + coerce_types: bool, } impl Default for WriterProperties { @@ -281,6 +284,13 @@ impl WriterProperties { self.statistics_truncate_length } + /// Returns `coerce_types` boolean + /// + /// `true` if type coercion enabled. + pub fn coerce_types(&self) -> bool { + self.coerce_types + } + /// Returns encoding for a data page, when dictionary encoding is enabled. /// This is not configurable. #[inline] @@ -377,6 +387,7 @@ pub struct WriterPropertiesBuilder { sorting_columns: Option>, column_index_truncate_length: Option, statistics_truncate_length: Option, + coerce_types: bool, } impl WriterPropertiesBuilder { @@ -397,6 +408,7 @@ impl WriterPropertiesBuilder { sorting_columns: None, column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH, + coerce_types: DEFAULT_COERCE_TYPES, } } @@ -417,6 +429,7 @@ impl WriterPropertiesBuilder { sorting_columns: self.sorting_columns, column_index_truncate_length: self.column_index_truncate_length, statistics_truncate_length: self.statistics_truncate_length, + coerce_types: self.coerce_types, } } @@ -767,6 +780,13 @@ impl WriterPropertiesBuilder { self.statistics_truncate_length = max_length; self } + + /// Sets flag to enable/disable type coercion. + /// Takes precedence over globally defined settings. + pub fn set_coerce_types(mut self, coerce_types: bool) -> Self { + self.coerce_types = coerce_types; + self + } } /// Controls the level of statistics to be computed by the writer and stored in diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 39d2fa28c627..d869bdda18ad 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -400,7 +400,6 @@ impl<'a> PrimitiveTypeBuilder<'a> { } } } - match self.converted_type { ConvertedType::NONE => {} ConvertedType::UTF8 | ConvertedType::BSON | ConvertedType::JSON => { @@ -415,8 +414,18 @@ impl<'a> PrimitiveTypeBuilder<'a> { ConvertedType::DECIMAL => { self.check_decimal_precision_scale()?; } - ConvertedType::DATE - | ConvertedType::TIME_MILLIS + ConvertedType::DATE => { + if !(self.physical_type == PhysicalType::INT32 + || self.physical_type == PhysicalType::INT64) + { + return Err(general_err!( + "{} cannot annotate field '{}' because it is not a INT32 or a INT64 field", + self.converted_type, + self.name + )); + } + } + ConvertedType::TIME_MILLIS | ConvertedType::UINT_8 | ConvertedType::UINT_16 | ConvertedType::UINT_32 @@ -452,6 +461,7 @@ impl<'a> PrimitiveTypeBuilder<'a> { )); } } + ConvertedType::ENUM => { if self.physical_type != PhysicalType::BYTE_ARRAY { return Err(general_err!( From 6fd9146dc6e204ca284dc23fcd7d9ed3e246b4ca Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Fri, 27 Sep 2024 15:36:14 +0930 Subject: [PATCH 02/13] fix: use ARROW:schema instead of LogicalType to embed Date64 type --- .../src/arrow/array_reader/primitive_array.rs | 68 +++---------------- parquet/src/arrow/schema/mod.rs | 25 ++++--- parquet/src/arrow/schema/primitive.rs | 2 - parquet/src/schema/types.rs | 14 +--- 4 files changed, 22 insertions(+), 87 deletions(-) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 0640303d0330..df3c6ffd1de2 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -208,8 +208,14 @@ where // As there is not always a 1:1 mapping between Arrow and Parquet, there // are datatypes which we must convert explicitly. // These are: + // - date64: cast int32 to date32, then date32 to date64. // - decimal: cast int32 to decimal, int64 to decimal let array = match target_type { + ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => { + // this is cheap as it internally reinterprets the data + let a = arrow_cast::cast(&array, &ArrowType::Date32)?; + arrow_cast::cast(&a, target_type)? + } ArrowType::Decimal128(p, s) => { // Apply conversion to all elements regardless of null slots as the conversion // to `i128` is infallible. This improves performance by avoiding a branch in @@ -299,9 +305,9 @@ mod tests { use crate::util::test_common::rand_gen::make_pages; use crate::util::InMemoryPageIterator; use arrow::datatypes::ArrowPrimitiveType; - use arrow_array::{Array, Date32Array, Date64Array, PrimitiveArray}; + use arrow_array::{Array, Date32Array, PrimitiveArray}; - use arrow::datatypes::DataType::{Date32, Date64, Decimal128}; + use arrow::datatypes::DataType::{Date32, Decimal128}; use rand::distributions::uniform::SampleUniform; use std::collections::VecDeque; @@ -539,14 +545,6 @@ mod tests { arrow::datatypes::Int32Type, i32 ); - test_primitive_array_reader_one_type!( - crate::data_type::Int64Type, - PhysicalType::INT64, - "DATE", - arrow::datatypes::Date64Type, - arrow::datatypes::Int64Type, - i64 - ); test_primitive_array_reader_one_type!( crate::data_type::Int32Type, PhysicalType::INT32, @@ -835,54 +833,4 @@ mod tests { assert_eq!(array, &data_date_array); } } - - #[test] - fn test_primitive_array_reader_date64_type() { - // parquet `INT64` to date - let message_type = " - message test_schema { - REQUIRED INT64 date1 (DATE); - } - "; - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - let column_desc = schema.column(0); - - // create the array reader - { - let mut data = Vec::new(); - let mut page_lists = Vec::new(); - make_column_chunks::( - column_desc.clone(), - Encoding::PLAIN, - 100, - -999999999999999999, - 999999999999999999, - &mut Vec::new(), - &mut Vec::new(), - &mut data, - &mut page_lists, - true, - 2, - ); - let page_iterator = InMemoryPageIterator::new(page_lists); - - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc, None) - .unwrap(); - - // read data from the reader - // the data type is date - let array = array_reader.next_batch(50).unwrap(); - assert_eq!(array.data_type(), &Date64); - let array = array.as_any().downcast_ref::().unwrap(); - let data_date_array = data[0..50] - .iter() - .copied() - .map(Some) - .collect::(); - assert_eq!(array, &data_date_array); - } - } } diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 97c9ea1d98b0..63eac9bb31d4 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -416,15 +416,18 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { .with_id(id) .build(), DataType::Date64 => { - let physical_type = match coerce_types { - true => PhysicalType::INT32, - false => PhysicalType::INT64, - }; - Type::primitive_type_builder(name, physical_type) - .with_logical_type(Some(LogicalType::Date)) - .with_repetition(repetition) - .with_id(id) - .build() + if coerce_types { + Type::primitive_type_builder(name, PhysicalType::INT32) + .with_logical_type(Some(LogicalType::Date)) + .with_repetition(repetition) + .with_id(id) + .build() + } else { + Type::primitive_type_builder(name, PhysicalType::INT64) + .with_repetition(repetition) + .with_id(id) + .build() + } }, DataType::Time32(TimeUnit::Second) => { // Cannot represent seconds in LogicalType @@ -1331,7 +1334,6 @@ mod tests { OPTIONAL BINARY string (UTF8); REPEATED BOOLEAN bools; OPTIONAL INT32 date32 (DATE); - OPTIONAL INT64 date64 (DATE); OPTIONAL INT32 time_milli (TIME_MILLIS); OPTIONAL INT64 time_micro (TIME_MICROS); OPTIONAL INT64 time_nano (TIME(NANOS,false)); @@ -1373,7 +1375,6 @@ mod tests { false, ), Field::new("date32", DataType::Date32, true), - Field::new("date64", DataType::Date64, true), Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true), Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true), Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true), @@ -1439,7 +1440,6 @@ mod tests { } } OPTIONAL INT32 date32 (DATE); - OPTIONAL INT64 date64 (DATE); OPTIONAL INT32 time_milli (TIME(MILLIS,false)); OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true)); OPTIONAL INT64 time_micro (TIME_MICROS); @@ -1493,7 +1493,6 @@ mod tests { false, ), Field::new("date32", DataType::Date32, true), - Field::new("date64", DataType::Date64, true), Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true), Field::new( "time_milli_utc", diff --git a/parquet/src/arrow/schema/primitive.rs b/parquet/src/arrow/schema/primitive.rs index 9c373ee7b32f..17dd7862f3dc 100644 --- a/parquet/src/arrow/schema/primitive.rs +++ b/parquet/src/arrow/schema/primitive.rs @@ -201,7 +201,6 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result Ok(DataType::Int64), false => Ok(DataType::UInt64), }, - (Some(LogicalType::Date), _) => Ok(DataType::Date64), (Some(LogicalType::Time { unit, .. }), _) => match unit { ParquetTimeUnit::MILLIS(_) => { Err(arrow_err!("Cannot create INT64 from MILLIS time unit",)) @@ -229,7 +228,6 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result Ok(DataType::Int64), (None, ConvertedType::UINT_64) => Ok(DataType::UInt64), - (None, ConvertedType::DATE) => Ok(DataType::Date64), (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)), (None, ConvertedType::TIMESTAMP_MILLIS) => Ok(DataType::Timestamp( TimeUnit::Millisecond, diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index d869bdda18ad..1c9b622c10fa 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -414,18 +414,8 @@ impl<'a> PrimitiveTypeBuilder<'a> { ConvertedType::DECIMAL => { self.check_decimal_precision_scale()?; } - ConvertedType::DATE => { - if !(self.physical_type == PhysicalType::INT32 - || self.physical_type == PhysicalType::INT64) - { - return Err(general_err!( - "{} cannot annotate field '{}' because it is not a INT32 or a INT64 field", - self.converted_type, - self.name - )); - } - } - ConvertedType::TIME_MILLIS + ConvertedType::DATE + | ConvertedType::TIME_MILLIS | ConvertedType::UINT_8 | ConvertedType::UINT_16 | ConvertedType::UINT_32 From 9a0f516f65007c20de510638cc18961470559b18 Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Fri, 27 Sep 2024 15:52:46 +0930 Subject: [PATCH 03/13] chore: lint --- parquet/src/arrow/array_reader/primitive_array.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index df3c6ffd1de2..a952e00e12ef 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -783,7 +783,7 @@ mod tests { assert_ne!(array, &data_decimal_array) } } - + #[test] fn test_primitive_array_reader_date32_type() { // parquet `INT32` to date From 3bd8af8294f1bb64fff9871391b7d5b5138dda26 Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Fri, 27 Sep 2024 15:57:01 +0930 Subject: [PATCH 04/13] chore: lint --- parquet/src/arrow/schema/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 63eac9bb31d4..c6d6acce41af 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -428,7 +428,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { .with_id(id) .build() } - }, + } DataType::Time32(TimeUnit::Second) => { // Cannot represent seconds in LogicalType Type::primitive_type_builder(name, PhysicalType::INT32) From e0fb77c8a5bae382ef4b56630bb6efb247b5050a Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Fri, 27 Sep 2024 16:00:09 +0930 Subject: [PATCH 05/13] chore: lint --- parquet/src/arrow/schema/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index c6d6acce41af..fa1ad3162eee 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -234,7 +234,11 @@ pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) -> Result Result { +pub fn arrow_to_parquet_schema_with_root( + schema: &Schema, + root: &str, + coerce_types: bool, +) -> Result { let fields = schema .fields() .iter() From 169ba01b1a878e2c983609ba9cf0570ae748cb7c Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Sun, 6 Oct 2024 12:59:29 +1030 Subject: [PATCH 06/13] chore: add physical_type to StatisticsConverter to account for coerce_types --- parquet/src/arrow/arrow_reader/statistics.rs | 29 +++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 79254d1171ad..c72ddf17d3f6 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -21,6 +21,7 @@ /// `arrow-rs/parquet/tests/arrow_reader/statistics.rs`. use crate::arrow::buffer::bit_util::sign_extend_be; use crate::arrow::parquet_column; +use crate::basic::Type as PhysicalType; use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData}; @@ -318,7 +319,7 @@ make_decimal_stats_iterator!( /// data_type: The data type of the statistics (e.g. `DataType::Int32`) /// iterator: The iterator of [`ParquetStatistics`] to extract the statistics from. macro_rules! get_statistics { - ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { + ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => { paste! { match $data_type { DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter( @@ -370,9 +371,11 @@ macro_rules! get_statistics { DataType::Date32 => Ok(Arc::new(Date32Array::from_iter( [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()), ))), - DataType::Date64 => Ok(Arc::new(Date64Array::from_iter( - [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()), - ))), + DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => Ok(Arc::new(Date64Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator) + .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000))))), + DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter( + [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))), DataType::Timestamp(unit, timezone) =>{ let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()); Ok(match unit { @@ -486,7 +489,7 @@ macro_rules! get_statistics { Ok(Arc::new(arr)) }, DataType::Dictionary(_, value_type) => { - [<$stat_type_prefix:lower _ statistics>](value_type, $iterator) + [<$stat_type_prefix:lower _ statistics>](value_type, $iterator, $physical_type) }, DataType::Utf8View => { let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator); @@ -523,6 +526,7 @@ macro_rules! get_statistics { DataType::Map(_,_) | DataType::Duration(_) | DataType::Interval(_) | + DataType::Date64 | // required to cover $physical_type match guard DataType::Null | DataType::List(_) | DataType::ListView(_) | @@ -1054,8 +1058,9 @@ macro_rules! get_data_page_statistics { fn min_statistics<'a, I: Iterator>>( data_type: &DataType, iterator: I, + physical_type: Option, ) -> Result { - get_statistics!(Min, data_type, iterator) + get_statistics!(Min, data_type, iterator, physical_type) } /// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] @@ -1064,8 +1069,9 @@ fn min_statistics<'a, I: Iterator>>( fn max_statistics<'a, I: Iterator>>( data_type: &DataType, iterator: I, + physical_type: Option, ) -> Result { - get_statistics!(Max, data_type, iterator) + get_statistics!(Max, data_type, iterator, physical_type) } /// Extracts the min statistics from an iterator @@ -1164,6 +1170,8 @@ pub struct StatisticsConverter<'a> { arrow_field: &'a Field, /// treat missing null_counts as 0 nulls missing_null_counts_as_zero: bool, + /// The physical type of the matched column in the Parquet schema + physical_type: Option, } impl<'a> StatisticsConverter<'a> { @@ -1291,6 +1299,7 @@ impl<'a> StatisticsConverter<'a> { parquet_column_index: parquet_index, arrow_field, missing_null_counts_as_zero: true, + physical_type: parquet_index.map(|idx| parquet_schema.column(idx).physical_type()), }) } @@ -1333,7 +1342,7 @@ impl<'a> StatisticsConverter<'a> { /// // get the minimum value for the column "foo" in the parquet file /// let min_values: ArrayRef = converter /// .row_group_mins(metadata.row_groups().iter()) - /// .unwrap(); + /// .unwrap(); /// // if "foo" is a Float64 value, the returned array will contain Float64 values /// assert_eq!(min_values, Arc::new(Float64Array::from(vec![Some(1.0), Some(2.0)])) as _); /// ``` @@ -1350,7 +1359,7 @@ impl<'a> StatisticsConverter<'a> { let iter = metadatas .into_iter() .map(|x| x.column(parquet_index).statistics()); - min_statistics(data_type, iter) + min_statistics(data_type, iter, self.physical_type) } /// Extract the maximum values from row group statistics in [`RowGroupMetaData`] @@ -1369,7 +1378,7 @@ impl<'a> StatisticsConverter<'a> { let iter = metadatas .into_iter() .map(|x| x.column(parquet_index).statistics()); - max_statistics(data_type, iter) + max_statistics(data_type, iter, self.physical_type) } /// Extract the null counts from row group statistics in [`RowGroupMetaData`] From 74384b3b0452a817f9d7728074a11b3256e1f4b9 Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Sun, 6 Oct 2024 13:15:04 +1030 Subject: [PATCH 07/13] chore: blank line changes --- parquet/src/schema/types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 1c9b622c10fa..39d2fa28c627 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -400,6 +400,7 @@ impl<'a> PrimitiveTypeBuilder<'a> { } } } + match self.converted_type { ConvertedType::NONE => {} ConvertedType::UTF8 | ConvertedType::BSON | ConvertedType::JSON => { @@ -451,7 +452,6 @@ impl<'a> PrimitiveTypeBuilder<'a> { )); } } - ConvertedType::ENUM => { if self.physical_type != PhysicalType::BYTE_ARRAY { return Err(general_err!( From 74f00f6f6c5244af717f7dcd9e3f3729b0a868b0 Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Sun, 6 Oct 2024 13:20:07 +1030 Subject: [PATCH 08/13] chore: revert minor test changes --- parquet/src/arrow/schema/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index fa1ad3162eee..5db85129c9f3 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -1337,7 +1337,7 @@ mod tests { OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16); OPTIONAL BINARY string (UTF8); REPEATED BOOLEAN bools; - OPTIONAL INT32 date32 (DATE); + OPTIONAL INT32 date (DATE); OPTIONAL INT32 time_milli (TIME_MILLIS); OPTIONAL INT64 time_micro (TIME_MICROS); OPTIONAL INT64 time_nano (TIME(NANOS,false)); @@ -1378,7 +1378,7 @@ mod tests { Field::new("bools", DataType::Boolean, false), false, ), - Field::new("date32", DataType::Date32, true), + Field::new("date", DataType::Date32, true), Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true), Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true), Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true), @@ -1443,7 +1443,7 @@ mod tests { REQUIRED BOOLEAN element; } } - OPTIONAL INT32 date32 (DATE); + OPTIONAL INT32 date (DATE); OPTIONAL INT32 time_milli (TIME(MILLIS,false)); OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true)); OPTIONAL INT64 time_micro (TIME_MICROS); @@ -1496,7 +1496,7 @@ mod tests { Field::new("element", DataType::Boolean, false), false, ), - Field::new("date32", DataType::Date32, true), + Field::new("date", DataType::Date32, true), Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true), Field::new( "time_milli_utc", From 0fb1c9dd702e258c7b59dac55556b85a29fa7676 Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Sun, 6 Oct 2024 13:21:23 +1030 Subject: [PATCH 09/13] chore: update to latest parquet-testing --- parquet-testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-testing b/parquet-testing index cb7a9674142c..50af3d8ce206 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit cb7a9674142c137367bf75a01b79c6e214a73199 +Subproject commit 50af3d8ce206990d81014b1862e5ce7380dc3e08 From 10bdfd2b049e073ecee47dc18ef4b14889363e8f Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Sun, 6 Oct 2024 13:28:50 +1030 Subject: [PATCH 10/13] chore: add physical_type fix for get_data_page_statistics macro --- parquet/src/arrow/arrow_reader/statistics.rs | 40 +++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index c72ddf17d3f6..09f8ec7cc274 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -793,7 +793,7 @@ get_decimal_page_stats_iterator!( ); macro_rules! get_data_page_statistics { - ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { + ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => { paste! { match $data_type { DataType::Boolean => { @@ -932,7 +932,7 @@ macro_rules! get_data_page_statistics { Ok(Arc::new(builder.finish())) }, DataType::Dictionary(_, value_type) => { - [<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator) + [<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator, $physical_type) }, DataType::Timestamp(unit, timezone) => { let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(); @@ -944,7 +944,20 @@ macro_rules! get_data_page_statistics { }) }, DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Date64 => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), + DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> Ok( + Arc::new( + Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) + .map(|x| { + x.into_iter() + .map(|x| { + x.and_then(|x| i64::try_from(x).ok()) + }) + .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000)) + }).flatten() + ) + ) + ), + DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), DataType::Decimal128(precision, scale) => Ok(Arc::new( Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), DataType::Decimal256(precision, scale) => Ok(Arc::new( @@ -1031,6 +1044,7 @@ macro_rules! get_data_page_statistics { } Ok(Arc::new(builder.finish())) }, + DataType::Date64 | // required to cover $physical_type match guard DataType::Null | DataType::Duration(_) | DataType::Interval(_) | @@ -1076,20 +1090,28 @@ fn max_statistics<'a, I: Iterator>>( /// Extracts the min statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] -pub(crate) fn min_page_statistics<'a, I>(data_type: &DataType, iterator: I) -> Result +pub(crate) fn min_page_statistics<'a, I>( + data_type: &DataType, + iterator: I, + physical_type: Option, +) -> Result where I: Iterator, { - get_data_page_statistics!(Min, data_type, iterator) + get_data_page_statistics!(Min, data_type, iterator, physical_type) } /// Extracts the max statistics from an iterator /// of parquet page [`Index`]'es to an [`ArrayRef`] -pub(crate) fn max_page_statistics<'a, I>(data_type: &DataType, iterator: I) -> Result +pub(crate) fn max_page_statistics<'a, I>( + data_type: &DataType, + iterator: I, + physical_type: Option, +) -> Result where I: Iterator, { - get_data_page_statistics!(Max, data_type, iterator) + get_data_page_statistics!(Max, data_type, iterator, physical_type) } /// Extracts the null count statistics from an iterator @@ -1486,7 +1508,7 @@ impl<'a> StatisticsConverter<'a> { (*num_data_pages, column_page_index_per_row_group_per_column) }); - min_page_statistics(data_type, iter) + min_page_statistics(data_type, iter, self.physical_type) } /// Extract the maximum values from Data Page statistics. @@ -1517,7 +1539,7 @@ impl<'a> StatisticsConverter<'a> { (*num_data_pages, column_page_index_per_row_group_per_column) }); - max_page_statistics(data_type, iter) + max_page_statistics(data_type, iter, self.physical_type) } /// Returns a [`UInt64Array`] with null counts for each data page. From b311be23aebd4c9c5637110830d758ce2bcf556d Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Sun, 6 Oct 2024 13:48:32 +1030 Subject: [PATCH 11/13] docs: add docs for coerce_types --- parquet/src/file/properties.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index db8c5fa0cbd2..5bd978cd151a 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -286,7 +286,13 @@ impl WriterProperties { /// Returns `coerce_types` boolean /// - /// `true` if type coercion enabled. + /// Some Arrow types do not have a corresponding Parquet logical type. + /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`. + /// Writers have the option to coerce these into native Parquet types. Type + /// coercion allows for meaningful representations that do not require + /// downstream readers to consider the embedded Arrow schema. However, type + /// coercion also prevents the data from being round-tripped. This method + /// returns `true` if type coercion enabled. pub fn coerce_types(&self) -> bool { self.coerce_types } From f2b9b466223c7a57d88924be9d0cb084056e1af4 Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Sun, 6 Oct 2024 14:21:21 +1030 Subject: [PATCH 12/13] chore: cargo fmt --all --- parquet/src/file/properties.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 5bd978cd151a..d2e5cd9bce8e 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -287,9 +287,9 @@ impl WriterProperties { /// Returns `coerce_types` boolean /// /// Some Arrow types do not have a corresponding Parquet logical type. - /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`. + /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`. /// Writers have the option to coerce these into native Parquet types. Type - /// coercion allows for meaningful representations that do not require + /// coercion allows for meaningful representations that do not require /// downstream readers to consider the embedded Arrow schema. However, type /// coercion also prevents the data from being round-tripped. This method /// returns `true` if type coercion enabled. From e1f27759e7920062a0196b50cf3daa78f80a712b Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Tue, 8 Oct 2024 20:03:55 +1030 Subject: [PATCH 13/13] docs: coerce_types lossless round trip Co-authored-by: Ed Seidl --- parquet/src/file/properties.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index d2e5cd9bce8e..9a2da09c47fb 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -291,7 +291,7 @@ impl WriterProperties { /// Writers have the option to coerce these into native Parquet types. Type /// coercion allows for meaningful representations that do not require /// downstream readers to consider the embedded Arrow schema. However, type - /// coercion also prevents the data from being round-tripped. This method + /// coercion also prevents the data from being losslessly round-tripped. This method /// returns `true` if type coercion enabled. pub fn coerce_types(&self) -> bool { self.coerce_types