diff --git a/Cargo.toml b/Cargo.toml index 809fc4f9de..7da16e00d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,10 +18,10 @@ [workspace] resolver = "2" members = [ - "crates/catalog/*", - "crates/examples", - "crates/iceberg", - "crates/test_utils", + "crates/catalog/*", + "crates/examples", + "crates/iceberg", + "crates/test_utils", ] [workspace.package] @@ -37,9 +37,9 @@ rust-version = "1.75.0" anyhow = "1.0.72" apache-avro = "0.16" array-init = "2" -arrow-arith = { version = ">=46" } -arrow-array = { version = ">=46" } -arrow-schema = { version = ">=46" } +arrow-arith = { version = "51" } +arrow-array = { version = "51" } +arrow-schema = { version = "51" } async-stream = "0.3.5" async-trait = "0.1" bimap = "0.6" @@ -61,7 +61,7 @@ murmur3 = "0.5.2" once_cell = "1" opendal = "0.45" ordered-float = "4.0.0" -parquet = "50" +parquet = "51" pilota = "0.10.0" pretty_assertions = "1.4.0" port_scanner = "0.1.5" diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index 7b8deb17d0..4556543ae5 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -17,10 +17,8 @@ use super::TransformFunction; use crate::{Error, ErrorKind, Result}; -use arrow_arith::{ - arity::binary, - temporal::{month_dyn, year_dyn}, -}; +use arrow_arith::temporal::DatePart; +use arrow_arith::{arity::binary, temporal::date_part}; use arrow_array::{ types::Date32Type, Array, ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray, }; @@ -43,8 +41,8 @@ pub struct Year; impl TransformFunction for Year { fn transform(&self, input: ArrayRef) -> Result { - let array = - year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; + let array = date_part(&input, DatePart::Year) + .map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; Ok(Arc::::new( array .as_any() @@ -61,15 +59,15 @@ pub struct Month; impl TransformFunction for Month { fn transform(&self, input: ArrayRef) -> Result { - let year_array = - year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; + let year_array = date_part(&input, DatePart::Year) + .map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; let year_array: Int32Array = year_array .as_any() .downcast_ref::() .unwrap() .unary(|v| 12 * (v - UNIX_EPOCH_YEAR)); - let month_array = - month_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; + let month_array = date_part(&input, DatePart::Month) + .map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; Ok(Arc::::new( binary( month_array.as_any().downcast_ref::().unwrap(), diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index bb4550fabe..3ec1a1b14f 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -18,7 +18,6 @@ //! The module contains the file writer for parquet file format. use std::{ - cmp::max, collections::HashMap, sync::{atomic::AtomicI64, Arc}, }; @@ -43,9 +42,6 @@ use super::{ /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone)] pub struct ParquetWriterBuilder { - /// `buffer_size` determines the initial size of the intermediate buffer. - /// The intermediate buffer will automatically be resized if necessary - init_buffer_size: usize, props: WriterProperties, schema: ArrowSchemaRef, @@ -55,13 +51,9 @@ pub struct ParquetWriterBuilder { } impl ParquetWriterBuilder { - /// To avoid EntiryTooSmall error, we set the minimum buffer size to 8MB if the given buffer size is smaller than it. - const MIN_BUFFER_SIZE: usize = 8 * 1024 * 1024; - /// Create a new `ParquetWriterBuilder` /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field. pub fn new( - init_buffer_size: usize, props: WriterProperties, schema: ArrowSchemaRef, file_io: FileIO, @@ -69,7 +61,6 @@ impl ParquetWriterBuilder { file_name_generator: F, ) -> Self { Self { - init_buffer_size, props, schema, file_io, @@ -112,20 +103,14 @@ impl FileWriterBuilder for ParquetWr .generate_location(&self.file_name_generator.generate_file_name()), )?; let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone()); - let init_buffer_size = max(Self::MIN_BUFFER_SIZE, self.init_buffer_size); - let writer = AsyncArrowWriter::try_new( - inner_writer, - self.schema.clone(), - init_buffer_size, - Some(self.props), - ) - .map_err(|err| { - Error::new( - crate::ErrorKind::Unexpected, - "Failed to build parquet writer.", - ) - .with_source(err) - })?; + let writer = AsyncArrowWriter::try_new(inner_writer, self.schema.clone(), Some(self.props)) + .map_err(|err| { + Error::new( + crate::ErrorKind::Unexpected, + "Failed to build parquet writer.", + ) + .with_source(err) + })?; Ok(ParquetWriter { writer, @@ -311,7 +296,6 @@ mod tests { // write data let mut pw = ParquetWriterBuilder::new( - 0, WriterProperties::builder().build(), to_write.schema(), file_io.clone(), @@ -551,7 +535,6 @@ mod tests { // write data let mut pw = ParquetWriterBuilder::new( - 0, WriterProperties::builder().build(), to_write.schema(), file_io.clone(),