diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 449ce22693d35..1c746a4e98405 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -459,6 +459,14 @@ config_namespace! { /// BLOB instead. pub binary_as_string: bool, default = false + /// (reading) If true, parquet reader will read columns of + /// physical type int96 as originating from a different resolution + /// than nanosecond. This is useful for reading data from systems like Spark + /// which stores microsecond resolution timestamps in an int96 allowing it + /// to write values with a larger date range than 64-bit timestamps with + /// nanosecond resolution. + pub coerce_int96: Option, transform = str::to_lowercase, default = None + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 939cb5e1a3578..3e33466edf505 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -239,6 +239,7 @@ impl ParquetOptions { bloom_filter_on_read: _, // reads not used for writer props schema_force_view_types: _, binary_as_string: _, // not used for writer props + coerce_int96: _, // not used for writer props skip_arrow_metadata: _, } = self; @@ -516,6 +517,7 @@ mod tests { schema_force_view_types: defaults.schema_force_view_types, binary_as_string: defaults.binary_as_string, skip_arrow_metadata: defaults.skip_arrow_metadata, + coerce_int96: None, } } @@ -622,6 +624,7 @@ mod tests { schema_force_view_types: global_options_defaults.schema_force_view_types, binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, + coerce_int96: None, }, column_specific_options, key_value_metadata, diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index a9516aad9e22d..3428d08a6ae52 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -382,6 +382,15 @@ mod tests { let testdata = test_util::arrow_test_data(); let store_root = format!("{testdata}/avro"); let format = AvroFormat {}; - scan_format(state, &format, &store_root, file_name, projection, limit).await + scan_format( + state, + &format, + None, + &store_root, + file_name, + projection, + limit, + ) + .await } } diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 9fa4c00e6af25..323bc28057d43 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -250,6 +250,7 @@ mod tests { let exec = scan_format( &state, &format, + None, root, "aggregate_test_100_with_nulls.csv", projection, @@ -300,6 +301,7 @@ mod tests { let exec = scan_format( &state, &format, + None, root, "aggregate_test_100_with_nulls.csv", projection, @@ -582,7 +584,7 @@ mod tests { ) -> Result> { let root = format!("{}/csv", arrow_test_data()); let format = CsvFormat::default().with_has_header(has_header); - scan_format(state, &format, &root, file_name, projection, limit).await + scan_format(state, &format, None, &root, file_name, projection, limit).await } #[tokio::test] diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index d533dcf7646da..a70a0f51d3307 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -149,7 +149,7 @@ mod tests { ) -> Result> { let filename = "tests/data/2.json"; let format = JsonFormat::default(); - scan_format(state, &format, ".", filename, projection, limit).await + scan_format(state, &format, None, ".", filename, projection, limit).await } #[tokio::test] diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index ad8c0bdb5680e..3a098301f14e3 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -36,19 +36,20 @@ pub use datafusion_datasource::write; #[cfg(test)] pub(crate) mod test_util { - use std::sync::Arc; - + use arrow_schema::SchemaRef; use datafusion_catalog::Session; use datafusion_common::Result; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::{file_format::FileFormat, PartitionedFile}; use datafusion_execution::object_store::ObjectStoreUrl; + use std::sync::Arc; use crate::test::object_store::local_unpartitioned_file; pub async fn scan_format( state: &dyn Session, format: &dyn FileFormat, + schema: Option, store_root: &str, file_name: &str, projection: Option>, @@ -57,9 +58,13 @@ pub(crate) mod test_util { let store = Arc::new(object_store::local::LocalFileSystem::new()) as _; let meta = local_unpartitioned_file(format!("{store_root}/{file_name}")); - let file_schema = format - .infer_schema(state, &store, std::slice::from_ref(&meta)) - .await?; + let file_schema = if let Some(file_schema) = schema { + file_schema + } else { + format + .infer_schema(state, &store, std::slice::from_ref(&meta)) + .await? + }; let statistics = format .infer_stats(state, &store, file_schema.clone(), &meta) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 76009ccd80b05..7b8b99273f4ea 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1075,7 +1075,10 @@ mod tests { .map(|factory| factory.create(state, &Default::default()).unwrap()) .unwrap_or(Arc::new(ParquetFormat::new())); - scan_format(state, &*format, &testdata, file_name, projection, limit).await + scan_format( + state, &*format, None, &testdata, file_name, projection, limit, + ) + .await } /// Test that 0-byte files don't break while reading diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 5986460cb539a..e9bb8b0db3682 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -38,7 +38,7 @@ mod tests { use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use crate::test::object_store::local_unpartitioned_file; use arrow::array::{ - ArrayRef, Date64Array, Int32Array, Int64Array, Int8Array, StringArray, + ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray, StructArray, }; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder}; @@ -1109,6 +1109,7 @@ mod tests { let parquet_exec = scan_format( &state, &ParquetFormat::default(), + None, &testdata, filename, Some(vec![0, 1, 2]), @@ -1141,6 +1142,92 @@ mod tests { Ok(()) } + #[tokio::test] + async fn parquet_exec_with_int96_from_spark() -> Result<()> { + // arrow-rs relies on the chrono library to convert between timestamps and strings, so + // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64 + // anyway, so this should be a zero-copy non-modifying cast at the SchemaAdapter. + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])); + let testdata = datafusion_common::test_util::parquet_test_data(); + let filename = "int96_from_spark.parquet"; + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + + let time_units_and_expected = vec![ + ( + None, // Same as "ns" time_unit + Arc::new(Int64Array::from(vec![ + Some(1704141296123456000), // Reads as nanosecond fine (note 3 extra 0s) + Some(1704070800000000000), // Reads as nanosecond fine (note 3 extra 0s) + Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999) + Some(1735599600000000000), // Reads as nanosecond fine (note 3 extra 0s) + None, + Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000) + ])), + ), + ( + Some("ns".to_string()), + Arc::new(Int64Array::from(vec![ + Some(1704141296123456000), + Some(1704070800000000000), + Some(-4852191831933722624), + Some(1735599600000000000), + None, + Some(-4864435138808946688), + ])), + ), + ( + Some("us".to_string()), + Arc::new(Int64Array::from(vec![ + Some(1704141296123456), + Some(1704070800000000), + Some(253402225200000000), + Some(1735599600000000), + None, + Some(9089380393200000000), + ])), + ), + ]; + + for (time_unit, expected) in time_units_and_expected { + let parquet_exec = scan_format( + &state, + &ParquetFormat::default().with_coerce_int96(time_unit.clone()), + Some(schema.clone()), + &testdata, + filename, + Some(vec![0]), + None, + ) + .await + .unwrap(); + assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); + + let mut results = parquet_exec.execute(0, task_ctx.clone())?; + let batch = results.next().await.unwrap()?; + + assert_eq!(6, batch.num_rows()); + assert_eq!(1, batch.num_columns()); + + assert_eq!(batch.num_columns(), 1); + let column = batch.column(0); + + assert_eq!(column.len(), expected.len()); + + column + .as_primitive::() + .iter() + .zip(expected.iter()) + .for_each(|(lhs, rhs)| { + assert_eq!(lhs, rhs); + }); + } + + Ok(()) + } + #[tokio::test] async fn parquet_exec_with_range() -> Result<()> { fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile { diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 7617d4d70ceed..2ef4f236f2787 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -24,9 +24,18 @@ use std::ops::Range; use std::sync::Arc; use arrow::array::RecordBatch; +use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit}; +use datafusion_datasource::file_compression_type::FileCompressionType; +use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; +use datafusion_datasource::write::{create_writer, get_writer_schema, SharedBuffer}; + +use datafusion_datasource::file_format::{ + FileFormat, FileFormatFactory, FilePushdownSupport, +}; +use datafusion_datasource::write::demux::DemuxedStreamReceiver; + use arrow::compute::sum; use arrow::datatypes::{DataType, Field, FieldRef}; -use arrow::datatypes::{Fields, Schema, SchemaRef}; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; @@ -38,15 +47,8 @@ use datafusion_common::{HashMap, Statistics}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_datasource::display::FileGroupDisplay; use datafusion_datasource::file::FileSource; -use datafusion_datasource::file_compression_type::FileCompressionType; -use datafusion_datasource::file_format::{ - FileFormat, FileFormatFactory, FilePushdownSupport, -}; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; -use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; -use datafusion_datasource::write::demux::DemuxedStreamReceiver; -use datafusion_datasource::write::{create_writer, get_writer_schema, SharedBuffer}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; @@ -76,11 +78,13 @@ use parquet::arrow::arrow_writer::{ }; use parquet::arrow::async_reader::MetadataFetch; use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter}; +use parquet::basic::Type; use parquet::errors::ParquetError; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; +use parquet::schema::types::SchemaDescriptor; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -268,6 +272,15 @@ impl ParquetFormat { self.options.global.binary_as_string = binary_as_string; self } + + pub fn coerce_int96(&self) -> Option { + self.options.global.coerce_int96.clone() + } + + pub fn with_coerce_int96(mut self, time_unit: Option) -> Self { + self.options.global.coerce_int96 = time_unit; + self + } } /// Clears all metadata (Schema level and field level) on an iterator @@ -569,6 +582,46 @@ pub fn apply_file_schema_type_coercions( )) } +/// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96. +pub fn coerce_int96_to_resolution( + parquet_schema: &SchemaDescriptor, + file_schema: &Schema, + time_unit: &TimeUnit, +) -> Option { + let mut transform = false; + let parquet_fields: HashMap<_, _> = parquet_schema + .columns() + .iter() + .map(|f| { + let dt = f.physical_type(); + if dt.eq(&Type::INT96) { + transform = true; + } + (f.name(), dt) + }) + .collect(); + + if !transform { + return None; + } + + let transformed_fields: Vec> = file_schema + .fields + .iter() + .map(|field| match parquet_fields.get(field.name().as_str()) { + Some(Type::INT96) => { + field_with_new_type(field, DataType::Timestamp(*time_unit, None)) + } + _ => Arc::clone(field), + }) + .collect(); + + Some(Schema::new_with_metadata( + transformed_fields, + file_schema.metadata.clone(), + )) +} + /// Coerces the file schema if the table schema uses a view type. #[deprecated( since = "47.0.0", diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 708a8035a4f7b..cfe8213f86e4b 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -22,14 +22,15 @@ use std::sync::Arc; use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_group_filter::RowGroupAccessPlanFilter; use crate::{ - apply_file_schema_type_coercions, row_filter, should_enable_page_index, - ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, + apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, + should_enable_page_index, ParquetAccessPlan, ParquetFileMetrics, + ParquetFileReaderFactory, }; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; -use arrow::datatypes::SchemaRef; +use arrow::datatypes::{SchemaRef, TimeUnit}; use arrow::error::ArrowError; use datafusion_common::{exec_err, Result}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; @@ -79,6 +80,8 @@ pub(super) struct ParquetOpener { pub schema_adapter_factory: Arc, /// Should row group pruning be applied pub enable_row_group_stats_pruning: bool, + /// Coerce INT96 timestamps to specific TimeUnit + pub coerce_int96: Option, } impl FileOpener for ParquetOpener { @@ -111,6 +114,7 @@ impl FileOpener for ParquetOpener { let table_schema = Arc::clone(&self.table_schema); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; + let coerce_int96 = self.coerce_int96; let enable_bloom_filter = self.enable_bloom_filter; let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; let limit = self.limit; @@ -157,6 +161,21 @@ impl FileOpener for ParquetOpener { )?; } + if coerce_int96.is_some() { + if let Some(merged) = coerce_int96_to_resolution( + reader_metadata.parquet_schema(), + &physical_file_schema, + &(coerce_int96.unwrap()), + ) { + physical_file_schema = Arc::new(merged); + options = options.with_schema(Arc::clone(&physical_file_schema)); + reader_metadata = ArrowReaderMetadata::try_new( + Arc::clone(reader_metadata.metadata()), + options.clone(), + )?; + } + } + // Build predicates for this specific file let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates( &predicate, diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a5629e43636a9..6236525fcb9f4 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -32,9 +32,9 @@ use datafusion_datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; use datafusion_common::config::TableParquetOptions; -use datafusion_common::Statistics; +use datafusion_common::{DataFusionError, Statistics}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_physical_expr_common::physical_expr::fmt_sql; @@ -438,6 +438,22 @@ impl ParquetSource { } } +/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit +fn parse_coerce_int96_string(str_setting: &str) -> datafusion_common::Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + + match str_setting_lower { + "ns" => Ok(TimeUnit::Nanosecond), + "us" => Ok(TimeUnit::Microsecond), + "ms" => Ok(TimeUnit::Millisecond), + "s" => Ok(TimeUnit::Second), + _ => Err(DataFusionError::Configuration(format!( + "Unknown or unsupported parquet coerce_int96: \ + {str_setting}. Valid values are: ns, us, ms, and s." + ))), + } +} + impl FileSource for ParquetSource { fn create_file_opener( &self, @@ -458,6 +474,13 @@ impl FileSource for ParquetSource { Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ }); + let coerce_int96 = self + .table_parquet_options + .global + .coerce_int96 + .as_ref() + .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); + Arc::new(ParquetOpener { partition_index: partition, projection: Arc::from(projection), @@ -476,6 +499,7 @@ impl FileSource for ParquetSource { enable_bloom_filter: self.bloom_filter_on_read(), enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, schema_adapter_factory, + coerce_int96, }) } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index bbeea5e1ec237..82f1e91d9c9b4 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -545,6 +545,10 @@ message ParquetOptions { uint64 max_row_group_size = 15; string created_by = 16; + + oneof coerce_int96_opt { + string coerce_int96 = 32; + } } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index da43a97899565..bd969db316872 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -984,6 +984,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, + coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt { + protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v), + }).unwrap_or(None), skip_arrow_metadata: value.skip_arrow_metadata, }) } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b0241fd47a26f..b44b05e9ca296 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4981,6 +4981,9 @@ impl serde::Serialize for ParquetOptions { if self.bloom_filter_ndv_opt.is_some() { len += 1; } + if self.coerce_int96_opt.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetOptions", len)?; if self.enable_page_index { struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?; @@ -5136,6 +5139,13 @@ impl serde::Serialize for ParquetOptions { } } } + if let Some(v) = self.coerce_int96_opt.as_ref() { + match v { + parquet_options::CoerceInt96Opt::CoerceInt96(v) => { + struct_ser.serialize_field("coerceInt96", v)?; + } + } + } struct_ser.end() } } @@ -5203,6 +5213,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterFpp", "bloom_filter_ndv", "bloomFilterNdv", + "coerce_int96", + "coerceInt96", ]; #[allow(clippy::enum_variant_names)] @@ -5237,6 +5249,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { Encoding, BloomFilterFpp, BloomFilterNdv, + CoerceInt96, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -5288,6 +5301,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "encoding" => Ok(GeneratedField::Encoding), "bloomFilterFpp" | "bloom_filter_fpp" => Ok(GeneratedField::BloomFilterFpp), "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), + "coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5337,6 +5351,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut encoding_opt__ = None; let mut bloom_filter_fpp_opt__ = None; let mut bloom_filter_ndv_opt__ = None; + let mut coerce_int96_opt__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::EnablePageIndex => { @@ -5533,6 +5548,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } bloom_filter_ndv_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::BloomFilterNdvOpt::BloomFilterNdv(x.0)); } + GeneratedField::CoerceInt96 => { + if coerce_int96_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("coerceInt96")); + } + coerce_int96_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(parquet_options::CoerceInt96Opt::CoerceInt96); + } } } Ok(ParquetOptions { @@ -5566,6 +5587,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { encoding_opt: encoding_opt__, bloom_filter_fpp_opt: bloom_filter_fpp_opt__, bloom_filter_ndv_opt: bloom_filter_ndv_opt__, + coerce_int96_opt: coerce_int96_opt__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index b6e9bc1379832..e029327d481d1 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -804,6 +804,8 @@ pub struct ParquetOptions { pub bloom_filter_fpp_opt: ::core::option::Option, #[prost(oneof = "parquet_options::BloomFilterNdvOpt", tags = "22")] pub bloom_filter_ndv_opt: ::core::option::Option, + #[prost(oneof = "parquet_options::CoerceInt96Opt", tags = "32")] + pub coerce_int96_opt: ::core::option::Option, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -857,6 +859,11 @@ pub mod parquet_options { #[prost(uint64, tag = "22")] BloomFilterNdv(u64), } + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum CoerceInt96Opt { + #[prost(string, tag = "32")] + CoerceInt96(::prost::alloc::string::String), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index decd0cf630388..28927cad03b4c 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -836,6 +836,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, skip_arrow_metadata: value.skip_arrow_metadata, + coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index b6e9bc1379832..e029327d481d1 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -804,6 +804,8 @@ pub struct ParquetOptions { pub bloom_filter_fpp_opt: ::core::option::Option, #[prost(oneof = "parquet_options::BloomFilterNdvOpt", tags = "22")] pub bloom_filter_ndv_opt: ::core::option::Option, + #[prost(oneof = "parquet_options::CoerceInt96Opt", tags = "32")] + pub coerce_int96_opt: ::core::option::Option, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -857,6 +859,11 @@ pub mod parquet_options { #[prost(uint64, tag = "22")] BloomFilterNdv(u64), } + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum CoerceInt96Opt { + #[prost(string, tag = "32")] + CoerceInt96(::prost::alloc::string::String), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index e22738973284e..5c33277dc9f74 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -415,6 +415,9 @@ impl TableParquetOptionsProto { schema_force_view_types: global_options.global.schema_force_view_types, binary_as_string: global_options.global.binary_as_string, skip_arrow_metadata: global_options.global.skip_arrow_metadata, + coerce_int96_opt: global_options.global.coerce_int96.map(|compression| { + parquet_options::CoerceInt96Opt::CoerceInt96(compression) + }), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -511,6 +514,9 @@ impl From<&ParquetOptionsProto> for ParquetOptions { schema_force_view_types: proto.schema_force_view_types, binary_as_string: proto.binary_as_string, skip_arrow_metadata: proto.skip_arrow_metadata, + coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt { + parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(), + }), } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 496f24abf6ed7..efbafe369467a 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -197,6 +197,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL datafusion.execution.parquet.bloom_filter_ndv NULL datafusion.execution.parquet.bloom_filter_on_read true datafusion.execution.parquet.bloom_filter_on_write false +datafusion.execution.parquet.coerce_int96 NULL datafusion.execution.parquet.column_index_truncate_length 64 datafusion.execution.parquet.compression zstd(3) datafusion.execution.parquet.created_by datafusion @@ -296,6 +297,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting datafusion.execution.parquet.bloom_filter_on_read true (writing) Use any available bloom filters when reading parquet files datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files +datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. datafusion.execution.parquet.created_by datafusion (writing) Sets "created by" property diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 68e21183938b1..a90da66e4b0b7 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -58,6 +58,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | +| datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | diff --git a/parquet-testing b/parquet-testing index f4d7ed772a62a..6e851ddd768d6 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit f4d7ed772a62a95111db50fbcad2460833e8c882 +Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff