diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index f0ba360f2a02..0765da13c586 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -81,6 +81,6 @@ name = "arrow_writer" harness = false [[bench]] -name = "arrow_array_reader" +name = "arrow_reader" required-features = ["test_common", "experimental"] harness = false diff --git a/parquet/benches/arrow_array_reader.rs b/parquet/benches/arrow_reader.rs similarity index 74% rename from parquet/benches/arrow_array_reader.rs rename to parquet/benches/arrow_reader.rs index 9db5b4ca3e18..1bd6fc61a37b 100644 --- a/parquet/benches/arrow_array_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -288,17 +288,6 @@ fn bench_array_reader(mut array_reader: Box) -> usize { total_count } -fn create_int32_arrow_array_reader( - page_iterator: impl PageIterator + 'static, - column_desc: ColumnDescPtr, -) -> Box { - use parquet::arrow::arrow_array_reader::{ArrowArrayReader, PrimitiveArrayConverter}; - let converter = PrimitiveArrayConverter::::new(); - let reader = - ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap(); - Box::new(reader) -} - fn create_int32_primitive_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, @@ -314,17 +303,6 @@ fn create_int32_primitive_array_reader( Box::new(reader) } -fn create_string_arrow_array_reader( - page_iterator: impl PageIterator + 'static, - column_desc: ColumnDescPtr, -) -> Box { - use parquet::arrow::arrow_array_reader::{ArrowArrayReader, StringArrayConverter}; - let converter = StringArrayConverter::new(); - let reader = - ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap(); - Box::new(reader) -} - fn create_string_byte_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, @@ -394,66 +372,32 @@ fn add_benches(c: &mut Criterion) { mandatory_int32_column_desc.clone(), 0.0, ); - group.bench_function( - "read Int32Array, plain encoded, mandatory, no NULLs - old", - |b| { - b.iter(|| { - let array_reader = create_int32_primitive_array_reader( - plain_int32_no_null_data.clone(), - mandatory_int32_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); - - group.bench_function( - "read Int32Array, plain encoded, mandatory, no NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_int32_arrow_array_reader( - plain_int32_no_null_data.clone(), - mandatory_int32_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + group.bench_function("read Int32Array, plain encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = create_int32_primitive_array_reader( + plain_int32_no_null_data.clone(), + mandatory_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator( schema.clone(), optional_int32_column_desc.clone(), 0.0, ); - group.bench_function( - "read Int32Array, plain encoded, optional, no NULLs - old", - |b| { - b.iter(|| { - let array_reader = create_int32_primitive_array_reader( - plain_int32_no_null_data.clone(), - optional_int32_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); - - group.bench_function( - "read Int32Array, plain encoded, optional, no NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_int32_arrow_array_reader( - plain_int32_no_null_data.clone(), - optional_int32_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + group.bench_function("read Int32Array, plain encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = create_int32_primitive_array_reader( + plain_int32_no_null_data.clone(), + optional_int32_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); // int32, plain encoded, half NULLs let plain_int32_half_null_data = build_plain_encoded_int32_page_iterator( @@ -462,7 +406,7 @@ fn add_benches(c: &mut Criterion) { 0.5, ); group.bench_function( - "read Int32Array, plain encoded, optional, half NULLs - old", + "read Int32Array, plain encoded, optional, half NULLs", |b| { b.iter(|| { let array_reader = create_int32_primitive_array_reader( @@ -475,20 +419,6 @@ fn add_benches(c: &mut Criterion) { }, ); - group.bench_function( - "read Int32Array, plain encoded, optional, half NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_int32_arrow_array_reader( - plain_int32_half_null_data.clone(), - optional_int32_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); - // int32, dictionary encoded, no NULLs let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator( schema.clone(), @@ -496,7 +426,7 @@ fn add_benches(c: &mut Criterion) { 0.0, ); group.bench_function( - "read Int32Array, dictionary encoded, mandatory, no NULLs - old", + "read Int32Array, dictionary encoded, mandatory, no NULLs", |b| { b.iter(|| { let array_reader = create_int32_primitive_array_reader( @@ -509,27 +439,13 @@ fn add_benches(c: &mut Criterion) { }, ); - group.bench_function( - "read Int32Array, dictionary encoded, mandatory, no NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_int32_arrow_array_reader( - dictionary_int32_no_null_data.clone(), - mandatory_int32_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); - let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator( schema.clone(), optional_int32_column_desc.clone(), 0.0, ); group.bench_function( - "read Int32Array, dictionary encoded, optional, no NULLs - old", + "read Int32Array, dictionary encoded, optional, no NULLs", |b| { b.iter(|| { let array_reader = create_int32_primitive_array_reader( @@ -542,20 +458,6 @@ fn add_benches(c: &mut Criterion) { }, ); - group.bench_function( - "read Int32Array, dictionary encoded, optional, no NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_int32_arrow_array_reader( - dictionary_int32_no_null_data.clone(), - optional_int32_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); - // int32, dictionary encoded, half NULLs let dictionary_int32_half_null_data = build_dictionary_encoded_int32_page_iterator( schema.clone(), @@ -563,7 +465,7 @@ fn add_benches(c: &mut Criterion) { 0.5, ); group.bench_function( - "read Int32Array, dictionary encoded, optional, half NULLs - old", + "read Int32Array, dictionary encoded, optional, half NULLs", |b| { b.iter(|| { let array_reader = create_int32_primitive_array_reader( @@ -576,20 +478,6 @@ fn add_benches(c: &mut Criterion) { }, ); - group.bench_function( - "read Int32Array, dictionary encoded, optional, half NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_int32_arrow_array_reader( - dictionary_int32_half_null_data.clone(), - optional_int32_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); - // string benchmarks //============================== @@ -600,7 +488,7 @@ fn add_benches(c: &mut Criterion) { 0.0, ); group.bench_function( - "read StringArray, plain encoded, mandatory, no NULLs - old", + "read StringArray, plain encoded, mandatory, no NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_reader( @@ -613,52 +501,21 @@ fn add_benches(c: &mut Criterion) { }, ); - group.bench_function( - "read StringArray, plain encoded, mandatory, no NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_string_arrow_array_reader( - plain_string_no_null_data.clone(), - mandatory_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); - let plain_string_no_null_data = build_plain_encoded_string_page_iterator( schema.clone(), optional_string_column_desc.clone(), 0.0, ); - group.bench_function( - "read StringArray, plain encoded, optional, no NULLs - old", - |b| { - b.iter(|| { - let array_reader = create_string_byte_array_reader( - plain_string_no_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); - - group.bench_function( - "read StringArray, plain encoded, optional, no NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_string_arrow_array_reader( - plain_string_no_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + group.bench_function("read StringArray, plain encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = create_string_byte_array_reader( + plain_string_no_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); // string, plain encoded, half NULLs let plain_string_half_null_data = build_plain_encoded_string_page_iterator( @@ -667,7 +524,7 @@ fn add_benches(c: &mut Criterion) { 0.5, ); group.bench_function( - "read StringArray, plain encoded, optional, half NULLs - old", + "read StringArray, plain encoded, optional, half NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_reader( @@ -680,20 +537,6 @@ fn add_benches(c: &mut Criterion) { }, ); - group.bench_function( - "read StringArray, plain encoded, optional, half NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_string_arrow_array_reader( - plain_string_half_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); - // string, dictionary encoded, no NULLs let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator( schema.clone(), @@ -701,7 +544,7 @@ fn add_benches(c: &mut Criterion) { 0.0, ); group.bench_function( - "read StringArray, dictionary encoded, mandatory, no NULLs - old", + "read StringArray, dictionary encoded, mandatory, no NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_reader( @@ -714,27 +557,13 @@ fn add_benches(c: &mut Criterion) { }, ); - group.bench_function( - "read StringArray, dictionary encoded, mandatory, no NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_string_arrow_array_reader( - dictionary_string_no_null_data.clone(), - mandatory_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); - let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator( schema.clone(), optional_string_column_desc.clone(), 0.0, ); group.bench_function( - "read StringArray, dictionary encoded, optional, no NULLs - old", + "read StringArray, dictionary encoded, optional, no NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_reader( @@ -747,20 +576,6 @@ fn add_benches(c: &mut Criterion) { }, ); - group.bench_function( - "read StringArray, dictionary encoded, optional, no NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_string_arrow_array_reader( - dictionary_string_no_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); - // string, dictionary encoded, half NULLs let dictionary_string_half_null_data = build_dictionary_encoded_string_page_iterator( schema.clone(), @@ -768,7 +583,7 @@ fn add_benches(c: &mut Criterion) { 0.5, ); group.bench_function( - "read StringArray, dictionary encoded, optional, half NULLs - old", + "read StringArray, dictionary encoded, optional, half NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_reader( @@ -781,20 +596,6 @@ fn add_benches(c: &mut Criterion) { }, ); - group.bench_function( - "read StringArray, dictionary encoded, optional, half NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_string_arrow_array_reader( - dictionary_string_half_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); - group.bench_function( "read StringDictionary, dictionary encoded, mandatory, no NULLs - old", |b| { diff --git a/parquet/src/arrow/arrow_array_reader.rs b/parquet/src/arrow/arrow_array_reader.rs deleted file mode 100644 index d814f7f6e6d1..000000000000 --- a/parquet/src/arrow/arrow_array_reader.rs +++ /dev/null @@ -1,1922 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use super::array_reader::ArrayReader; -use crate::arrow::schema::parquet_to_arrow_field; -use crate::basic::Encoding; -use crate::data_type::{ByteArray, ByteArrayType}; -use crate::encodings::decoding::{Decoder, DeltaByteArrayDecoder}; -use crate::errors::{ParquetError, Result}; -use crate::{ - column::page::{Page, PageIterator}, - schema::types::{ColumnDescPtr, ColumnDescriptor}, - util::memory::ByteBufferPtr, -}; -use arrow::{ - array::{ArrayRef, Int16Array}, - buffer::MutableBuffer, - datatypes::{DataType as ArrowType, ToByteSlice}, -}; -use std::{any::Any, collections::VecDeque, marker::PhantomData}; -use std::{cell::RefCell, rc::Rc}; - -struct UnzipIter { - shared_state: Rc>, - select_item_buffer: fn(&mut State) -> &mut VecDeque, - consume_source_item: fn(source_item: Source, state: &mut State) -> Target, -} - -impl UnzipIter { - fn new( - shared_state: Rc>, - item_buffer_selector: fn(&mut State) -> &mut VecDeque, - source_item_consumer: fn(source_item: Source, state: &mut State) -> Target, - ) -> Self { - Self { - shared_state, - select_item_buffer: item_buffer_selector, - consume_source_item: source_item_consumer, - } - } -} - -trait UnzipIterState { - type SourceIter: Iterator; - fn source_iter(&mut self) -> &mut Self::SourceIter; -} - -impl> Iterator - for UnzipIter -{ - type Item = Target; - - fn next(&mut self) -> Option { - let mut inner = self.shared_state.borrow_mut(); - // try to get one from the stored data - (self.select_item_buffer)(&mut *inner) - .pop_front() - .or_else(|| - // nothing stored, we need a new element. - inner.source_iter().next().map(|s| { - (self.consume_source_item)(s, &mut inner) - })) - } -} - -struct PageBufferUnzipIterState { - iter: It, - value_iter_buffer: VecDeque, - def_level_iter_buffer: VecDeque, - rep_level_iter_buffer: VecDeque, -} - -impl> UnzipIterState<(V, L, L)> - for PageBufferUnzipIterState -{ - type SourceIter = It; - - #[inline] - fn source_iter(&mut self) -> &mut Self::SourceIter { - &mut self.iter - } -} - -type ValueUnzipIter = - UnzipIter<(V, L, L), V, PageBufferUnzipIterState>; -type LevelUnzipIter = - UnzipIter<(V, L, L), L, PageBufferUnzipIterState>; -type PageUnzipResult = ( - ValueUnzipIter, - LevelUnzipIter, - LevelUnzipIter, -); - -fn unzip_iter>(it: It) -> PageUnzipResult { - let shared_data = Rc::new(RefCell::new(PageBufferUnzipIterState { - iter: it, - value_iter_buffer: VecDeque::new(), - def_level_iter_buffer: VecDeque::new(), - rep_level_iter_buffer: VecDeque::new(), - })); - - let value_iter = UnzipIter::new( - shared_data.clone(), - |state| &mut state.value_iter_buffer, - |(v, d, r), state| { - state.def_level_iter_buffer.push_back(d); - state.rep_level_iter_buffer.push_back(r); - v - }, - ); - - let def_level_iter = UnzipIter::new( - shared_data.clone(), - |state| &mut state.def_level_iter_buffer, - |(v, d, r), state| { - state.value_iter_buffer.push_back(v); - state.rep_level_iter_buffer.push_back(r); - d - }, - ); - - let rep_level_iter = UnzipIter::new( - shared_data, - |state| &mut state.rep_level_iter_buffer, - |(v, d, r), state| { - state.value_iter_buffer.push_back(v); - state.def_level_iter_buffer.push_back(d); - r - }, - ); - - (value_iter, def_level_iter, rep_level_iter) -} - -pub trait ArrayConverter { - fn convert_value_bytes( - &self, - value_decoder: &mut impl ValueDecoder, - num_values: usize, - ) -> Result; -} - -pub struct ArrowArrayReader<'a, C: ArrayConverter + 'a> { - column_desc: ColumnDescPtr, - data_type: ArrowType, - def_level_decoder: Box, - rep_level_decoder: Box, - value_decoder: Box, - last_def_levels: Option, - last_rep_levels: Option, - array_converter: C, -} - -pub(crate) struct ColumnChunkContext { - dictionary_values: Option>, -} - -impl ColumnChunkContext { - fn new() -> Self { - Self { - dictionary_values: None, - } - } - - fn set_dictionary(&mut self, dictionary_values: Vec) { - self.dictionary_values = Some(dictionary_values); - } -} - -type PageDecoderTuple = ( - Box, - Box, - Box, -); - -impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { - pub fn try_new( - column_chunk_iterator: P, - column_desc: ColumnDescPtr, - array_converter: C, - arrow_type: Option, - ) -> Result { - let data_type = match arrow_type { - Some(t) => t, - None => parquet_to_arrow_field(column_desc.as_ref())? - .data_type() - .clone(), - }; - type PageIteratorItem = Result<(Page, Rc>)>; - let page_iter = column_chunk_iterator - // build iterator of pages across column chunks - .flat_map(|x| -> Box> { - // attach column chunk context - let context = Rc::new(RefCell::new(ColumnChunkContext::new())); - match x { - Ok(page_reader) => Box::new( - page_reader.map(move |pr| pr.map(|p| (p, context.clone()))), - ), - // errors from reading column chunks / row groups are propagated to page level - Err(e) => Box::new(std::iter::once(Err(e))), - } - }); - // capture a clone of column_desc in closure so that it can outlive current function - let map_page_fn_factory = |column_desc: ColumnDescPtr| { - move |x: Result<(Page, Rc>)>| { - x.and_then(|(page, context)| { - Self::map_page(page, context, column_desc.as_ref()) - }) - } - }; - let map_page_fn = map_page_fn_factory(column_desc.clone()); - // map page iterator into tuple of buffer iterators for (values, def levels, rep levels) - // errors from lower levels are surfaced through the value decoder iterator - let decoder_iter = page_iter.map(map_page_fn).map(|x| match x { - Ok(iter_tuple) => iter_tuple, - // errors from reading pages are propagated to decoder iterator level - Err(e) => Self::map_page_error(e), - }); - // split tuple iterator into separate iterators for (values, def levels, rep levels) - let (value_iter, def_level_iter, rep_level_iter) = unzip_iter(decoder_iter); - - Ok(Self { - column_desc, - data_type, - def_level_decoder: Box::new(CompositeValueDecoder::new(def_level_iter)), - rep_level_decoder: Box::new(CompositeValueDecoder::new(rep_level_iter)), - value_decoder: Box::new(CompositeValueDecoder::new(value_iter)), - last_def_levels: None, - last_rep_levels: None, - array_converter, - }) - } - - #[inline] - fn def_levels_available(column_desc: &ColumnDescriptor) -> bool { - column_desc.max_def_level() > 0 - } - - #[inline] - fn rep_levels_available(column_desc: &ColumnDescriptor) -> bool { - column_desc.max_rep_level() > 0 - } - - fn map_page_error(err: ParquetError) -> PageDecoderTuple { - ( - Box::new(::once(Err(err.clone()))), - Box::new(::once(Err(err.clone()))), - Box::new(::once(Err(err))), - ) - } - - // Split Result into Result<(Iterator, Iterator, Iterator)> - // this method could fail, e.g. if the page encoding is not supported - fn map_page( - page: Page, - column_chunk_context: Rc>, - column_desc: &ColumnDescriptor, - ) -> Result { - use crate::encodings::levels::LevelDecoder; - match page { - Page::DictionaryPage { - buf, - num_values, - encoding, - .. - } => { - let mut column_chunk_context = column_chunk_context.borrow_mut(); - if column_chunk_context.dictionary_values.is_some() { - return Err(general_err!( - "Column chunk cannot have more than one dictionary" - )); - } - // create plain decoder for dictionary values - let mut dict_decoder = Self::get_dictionary_page_decoder( - buf, - num_values as usize, - encoding, - column_desc, - )?; - // decode and cache dictionary values - let dictionary_values = dict_decoder.read_dictionary_values()?; - column_chunk_context.set_dictionary(dictionary_values); - - // a dictionary page doesn't return any values - Ok(( - Box::new(::empty()), - Box::new(::empty()), - Box::new(::empty()), - )) - } - Page::DataPage { - buf, - num_values, - encoding, - def_level_encoding, - rep_level_encoding, - statistics: _, - } => { - let mut buffer_ptr = buf; - // create rep level decoder iterator - let rep_level_iter: Box = - if Self::rep_levels_available(column_desc) { - let mut rep_decoder = LevelDecoder::v1( - rep_level_encoding, - column_desc.max_rep_level(), - ); - let rep_level_byte_len = - rep_decoder.set_data(num_values as usize, buffer_ptr.all()); - // advance buffer pointer - buffer_ptr = buffer_ptr.start_from(rep_level_byte_len); - Box::new(LevelValueDecoder::new(rep_decoder)) - } else { - Box::new(::once(Err(ParquetError::General( - "rep levels are not available".to_string(), - )))) - }; - // create def level decoder iterator - let (def_level_iter, value_count): (Box, usize) = - if Self::def_levels_available(column_desc) { - // calculate actual value count - let mut def_decoder = LevelDecoder::v1( - def_level_encoding, - column_desc.max_def_level(), - ); - def_decoder.set_data(num_values as usize, buffer_ptr.all()); - let value_count = Self::count_def_level_values( - column_desc, - def_decoder, - num_values as usize, - )?; - // create def level decoder - def_decoder = LevelDecoder::v1( - def_level_encoding, - column_desc.max_def_level(), - ); - let def_levels_byte_len = - def_decoder.set_data(num_values as usize, buffer_ptr.all()); - // advance buffer pointer - buffer_ptr = buffer_ptr.start_from(def_levels_byte_len); - (Box::new(LevelValueDecoder::new(def_decoder)), value_count) - } else { - ( - Box::new(::once(Err( - ParquetError::General( - "def levels are not available".to_string(), - ), - ))), - num_values as usize, - ) - }; - // create value decoder iterator - let value_iter = Self::get_value_decoder( - buffer_ptr, - value_count, - encoding, - column_desc, - column_chunk_context, - )?; - Ok((value_iter, def_level_iter, rep_level_iter)) - } - Page::DataPageV2 { - buf, - num_values, - encoding, - num_nulls: _, - num_rows: _, - def_levels_byte_len, - rep_levels_byte_len, - is_compressed: _, - statistics: _, - } => { - let mut offset = 0; - // create rep level decoder iterator - let rep_level_iter: Box = - if Self::rep_levels_available(column_desc) { - let rep_levels_byte_len = rep_levels_byte_len as usize; - let mut rep_decoder = - LevelDecoder::v2(column_desc.max_rep_level()); - rep_decoder.set_data_range( - num_values as usize, - &buf, - offset, - rep_levels_byte_len, - ); - offset += rep_levels_byte_len; - Box::new(LevelValueDecoder::new(rep_decoder)) - } else { - Box::new(::once(Err(ParquetError::General( - "rep levels are not available".to_string(), - )))) - }; - // create def level decoder iterator - let (def_level_iter, value_count): (Box, usize) = - if Self::def_levels_available(column_desc) { - let def_levels_byte_len = def_levels_byte_len as usize; - // calculate actual value count - let mut def_decoder = - LevelDecoder::v2(column_desc.max_def_level()); - def_decoder.set_data_range( - num_values as usize, - &buf, - offset, - def_levels_byte_len, - ); - let value_count = Self::count_def_level_values( - column_desc, - def_decoder, - num_values as usize, - )?; - // create def level decoder - def_decoder = LevelDecoder::v2(column_desc.max_def_level()); - def_decoder.set_data_range( - num_values as usize, - &buf, - offset, - def_levels_byte_len, - ); - offset += def_levels_byte_len; - (Box::new(LevelValueDecoder::new(def_decoder)), value_count) - } else { - ( - Box::new(::once(Err( - ParquetError::General( - "def levels are not available".to_string(), - ), - ))), - num_values as usize, - ) - }; - - // create value decoder iterator - let values_buffer = buf.start_from(offset); - let value_iter = Self::get_value_decoder( - values_buffer, - value_count, - encoding, - column_desc, - column_chunk_context, - )?; - Ok((value_iter, def_level_iter, rep_level_iter)) - } - } - } - - fn count_def_level_values( - column_desc: &ColumnDescriptor, - level_decoder: crate::encodings::levels::LevelDecoder, - num_values: usize, - ) -> Result { - let mut def_level_decoder = LevelValueDecoder::new(level_decoder); - let def_level_array = - Self::build_level_array(&mut def_level_decoder, num_values)?; - let def_level_count = def_level_array.len(); - // use eq_scalar to efficiently build null bitmap array from def levels - let null_bitmap_array = - arrow::compute::eq_scalar(&def_level_array, column_desc.max_def_level())?; - // efficiently calculate values to read - Ok(null_bitmap_array - .values() - .count_set_bits_offset(0, def_level_count)) - } - - fn get_dictionary_page_decoder( - values_buffer: ByteBufferPtr, - num_values: usize, - mut encoding: Encoding, - column_desc: &ColumnDescriptor, - ) -> Result> { - if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY { - encoding = Encoding::RLE_DICTIONARY - } - - if encoding == Encoding::RLE_DICTIONARY { - Ok( - Self::get_plain_value_decoder(values_buffer, num_values, column_desc) - .into_dictionary_decoder(), - ) - } else { - Err(nyi_err!( - "Invalid/Unsupported encoding type for dictionary: {}", - encoding - )) - } - } - - fn get_value_decoder( - values_buffer: ByteBufferPtr, - num_values: usize, - mut encoding: Encoding, - column_desc: &ColumnDescriptor, - column_chunk_context: Rc>, - ) -> Result> { - if encoding == Encoding::PLAIN_DICTIONARY { - encoding = Encoding::RLE_DICTIONARY; - } - - match encoding { - Encoding::PLAIN => { - Ok( - Self::get_plain_value_decoder(values_buffer, num_values, column_desc) - .into_value_decoder(), - ) - } - Encoding::RLE_DICTIONARY => { - if column_chunk_context.borrow().dictionary_values.is_some() { - let value_bit_len = Self::get_column_physical_bit_len(column_desc); - let dictionary_decoder: Box = if value_bit_len == 0 - { - Box::new(VariableLenDictionaryDecoder::new( - column_chunk_context, - values_buffer, - num_values, - )) - } else { - Box::new(FixedLenDictionaryDecoder::new( - column_chunk_context, - values_buffer, - num_values, - value_bit_len, - )) - }; - Ok(dictionary_decoder) - } else { - Err(general_err!("Dictionary values have not been initialized.")) - } - } - // Encoding::RLE => Box::new(RleValueDecoder::new()), - // Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()), - // Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()), - Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayValueDecoder::new( - values_buffer, - num_values, - )?)), - e => return Err(nyi_err!("Encoding {} is not supported", e)), - } - } - - fn get_column_physical_bit_len(column_desc: &ColumnDescriptor) -> usize { - use crate::basic::Type as PhysicalType; - // parquet only supports a limited number of physical types - // later converters cast to a more specific arrow / logical type if necessary - match column_desc.physical_type() { - PhysicalType::BOOLEAN => 1, - PhysicalType::INT32 | PhysicalType::FLOAT => 32, - PhysicalType::INT64 | PhysicalType::DOUBLE => 64, - PhysicalType::INT96 => 96, - PhysicalType::BYTE_ARRAY => 0, - PhysicalType::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize * 8, - } - } - - fn get_plain_value_decoder( - values_buffer: ByteBufferPtr, - num_values: usize, - column_desc: &ColumnDescriptor, - ) -> Box { - let value_bit_len = Self::get_column_physical_bit_len(column_desc); - if value_bit_len == 0 { - Box::new(VariableLenPlainDecoder::new(values_buffer, num_values)) - } else { - Box::new(FixedLenPlainDecoder::new( - values_buffer, - num_values, - value_bit_len, - )) - } - } - - fn build_level_array( - level_decoder: &mut impl ValueDecoder, - batch_size: usize, - ) -> Result { - use arrow::datatypes::Int16Type; - let level_converter = PrimitiveArrayConverter::::new(); - let array_data = - level_converter.convert_value_bytes(level_decoder, batch_size)?; - Ok(Int16Array::from(array_data)) - } -} - -impl ArrayReader for ArrowArrayReader<'static, C> { - fn as_any(&self) -> &dyn Any { - self - } - - fn get_data_type(&self) -> &ArrowType { - &self.data_type - } - - fn next_batch(&mut self, batch_size: usize) -> Result { - if Self::rep_levels_available(&self.column_desc) { - // read rep levels if available - let rep_level_array = - Self::build_level_array(&mut self.rep_level_decoder, batch_size)?; - self.last_rep_levels = Some(rep_level_array); - } - - // check if def levels are available - let (values_to_read, null_bitmap_array) = - if !Self::def_levels_available(&self.column_desc) { - // if no def levels - just read (up to) batch_size values - (batch_size, None) - } else { - // if def levels are available - they determine how many values will be read - // decode def levels, return first error if any - let def_level_array = - Self::build_level_array(&mut self.def_level_decoder, batch_size)?; - let def_level_count = def_level_array.len(); - // use eq_scalar to efficiently build null bitmap array from def levels - let null_bitmap_array = arrow::compute::eq_scalar( - &def_level_array, - self.column_desc.max_def_level(), - )?; - self.last_def_levels = Some(def_level_array); - // efficiently calculate values to read - let values_to_read = null_bitmap_array - .values() - .count_set_bits_offset(0, def_level_count); - let maybe_null_bitmap = if values_to_read != null_bitmap_array.len() { - Some(null_bitmap_array) - } else { - // shortcut if no NULLs - None - }; - (values_to_read, maybe_null_bitmap) - }; - - // read a batch of values - // converter only creates a no-null / all value array data - let mut value_array_data = self - .array_converter - .convert_value_bytes(&mut self.value_decoder, values_to_read)?; - - if let Some(null_bitmap_array) = null_bitmap_array { - // Only if def levels are available - insert null values efficiently using MutableArrayData. - // This will require value bytes to be copied again, but converter requirements are reduced. - // With a small number of NULLs, this will only be a few copies of large byte sequences. - let actual_batch_size = null_bitmap_array.len(); - // use_nulls is false, because null_bitmap_array is already calculated and re-used - let mut mutable = arrow::array::MutableArrayData::new( - vec![&value_array_data], - false, - actual_batch_size, - ); - // SlicesIterator slices only the true values, NULLs are inserted to fill any gaps - arrow::compute::SlicesIterator::new(&null_bitmap_array).for_each( - |(start, end)| { - // the gap needs to be filled with NULLs - if start > mutable.len() { - let nulls_to_add = start - mutable.len(); - mutable.extend_nulls(nulls_to_add); - } - // fill values, adjust start and end with NULL count so far - let nulls_added = mutable.null_count(); - mutable.extend(0, start - nulls_added, end - nulls_added); - }, - ); - // any remaining part is NULLs - if mutable.len() < actual_batch_size { - let nulls_to_add = actual_batch_size - mutable.len(); - mutable.extend_nulls(nulls_to_add); - } - - value_array_data = unsafe { - mutable - .into_builder() - .null_bit_buffer(null_bitmap_array.values().clone()) - .build_unchecked() - }; - } - let mut array = arrow::array::make_array(value_array_data); - if array.data_type() != &self.data_type { - // cast array to self.data_type if necessary - array = arrow::compute::cast(&array, &self.data_type)? - } - Ok(array) - } - - fn get_def_levels(&self) -> Option<&[i16]> { - self.last_def_levels.as_ref().map(|x| x.values()) - } - - fn get_rep_levels(&self) -> Option<&[i16]> { - self.last_rep_levels.as_ref().map(|x| x.values()) - } -} - -use crate::encodings::rle::RleDecoder; - -pub trait ValueDecoder { - fn read_value_bytes( - &mut self, - num_values: usize, - read_bytes: &mut dyn FnMut(&[u8], usize), - ) -> Result; -} - -trait DictionaryValueDecoder { - fn read_dictionary_values(&mut self) -> Result>; -} - -trait PlainValueDecoder: ValueDecoder + DictionaryValueDecoder { - fn into_value_decoder(self: Box) -> Box; - fn into_dictionary_decoder(self: Box) -> Box; -} - -impl PlainValueDecoder for T -where - T: ValueDecoder + DictionaryValueDecoder + 'static, -{ - fn into_value_decoder(self: Box) -> Box { - self - } - - fn into_dictionary_decoder(self: Box) -> Box { - self - } -} - -impl dyn ValueDecoder { - fn empty() -> impl ValueDecoder { - SingleValueDecoder::new(Ok(0)) - } - - fn once(value: Result) -> impl ValueDecoder { - SingleValueDecoder::new(value) - } -} - -impl ValueDecoder for Box { - #[inline] - fn read_value_bytes( - &mut self, - num_values: usize, - read_bytes: &mut dyn FnMut(&[u8], usize), - ) -> Result { - self.as_mut().read_value_bytes(num_values, read_bytes) - } -} - -struct SingleValueDecoder { - value: Result, -} - -impl SingleValueDecoder { - fn new(value: Result) -> Self { - Self { value } - } -} - -impl ValueDecoder for SingleValueDecoder { - fn read_value_bytes( - &mut self, - _num_values: usize, - _read_bytes: &mut dyn FnMut(&[u8], usize), - ) -> Result { - self.value.clone() - } -} - -struct CompositeValueDecoder>> { - current_decoder: Option>, - decoder_iter: I, -} - -impl>> CompositeValueDecoder { - fn new(mut decoder_iter: I) -> Self { - let current_decoder = decoder_iter.next(); - Self { - current_decoder, - decoder_iter, - } - } -} - -impl>> ValueDecoder - for CompositeValueDecoder -{ - fn read_value_bytes( - &mut self, - num_values: usize, - read_bytes: &mut dyn FnMut(&[u8], usize), - ) -> Result { - let mut values_to_read = num_values; - while values_to_read > 0 { - let value_decoder = match self.current_decoder.as_mut() { - Some(d) => d, - // no more decoders - None => break, - }; - while values_to_read > 0 { - let values_read = - value_decoder.read_value_bytes(values_to_read, read_bytes)?; - if values_read > 0 { - values_to_read -= values_read; - } else { - // no more values in current decoder - self.current_decoder = self.decoder_iter.next(); - break; - } - } - } - - Ok(num_values - values_to_read) - } -} - -struct LevelValueDecoder { - level_decoder: crate::encodings::levels::LevelDecoder, - level_value_buffer: Vec, -} - -impl LevelValueDecoder { - fn new(level_decoder: crate::encodings::levels::LevelDecoder) -> Self { - Self { - level_decoder, - level_value_buffer: vec![0i16; 2048], - } - } -} - -impl ValueDecoder for LevelValueDecoder { - fn read_value_bytes( - &mut self, - num_values: usize, - read_bytes: &mut dyn FnMut(&[u8], usize), - ) -> Result { - let value_size = std::mem::size_of::(); - let mut total_values_read = 0; - while total_values_read < num_values { - let values_to_read = std::cmp::min( - num_values - total_values_read, - self.level_value_buffer.len(), - ); - let values_read = match self - .level_decoder - .get(&mut self.level_value_buffer[..values_to_read]) - { - Ok(values_read) => values_read, - Err(e) => return Err(e), - }; - if values_read > 0 { - let level_value_bytes = - &self.level_value_buffer.to_byte_slice()[..values_read * value_size]; - read_bytes(level_value_bytes, values_read); - total_values_read += values_read; - } else { - break; - } - } - Ok(total_values_read) - } -} - -pub(crate) struct FixedLenPlainDecoder { - data: ByteBufferPtr, - num_values: usize, - value_bit_len: usize, -} - -impl FixedLenPlainDecoder { - pub(crate) fn new( - data: ByteBufferPtr, - num_values: usize, - value_bit_len: usize, - ) -> Self { - Self { - data, - num_values, - value_bit_len, - } - } -} - -impl DictionaryValueDecoder for FixedLenPlainDecoder { - fn read_dictionary_values(&mut self) -> Result> { - let value_byte_len = self.value_bit_len / 8; - let available_values = self.data.len() / value_byte_len; - let values_to_read = std::cmp::min(available_values, self.num_values); - let byte_len = values_to_read * value_byte_len; - let values = vec![self.data.range(0, byte_len)]; - self.num_values = 0; - self.data.set_range(self.data.start(), 0); - Ok(values) - } -} - -impl ValueDecoder for FixedLenPlainDecoder { - fn read_value_bytes( - &mut self, - num_values: usize, - read_bytes: &mut dyn FnMut(&[u8], usize), - ) -> Result { - let available_values = self.data.len() * 8 / self.value_bit_len; - if available_values > 0 { - let values_to_read = std::cmp::min(available_values, num_values); - let byte_len = values_to_read * self.value_bit_len / 8; - read_bytes(&self.data.data()[..byte_len], values_to_read); - self.data - .set_range(self.data.start() + byte_len, self.data.len() - byte_len); - Ok(values_to_read) - } else { - Ok(0) - } - } -} - -pub(crate) struct VariableLenPlainDecoder { - data: ByteBufferPtr, - num_values: usize, - position: usize, -} - -impl VariableLenPlainDecoder { - pub(crate) fn new(data: ByteBufferPtr, num_values: usize) -> Self { - Self { - data, - num_values, - position: 0, - } - } -} - -impl DictionaryValueDecoder for VariableLenPlainDecoder { - fn read_dictionary_values(&mut self) -> Result> { - const LEN_SIZE: usize = std::mem::size_of::(); - let data = self.data.data(); - let data_len = data.len(); - let values_to_read = self.num_values; - let mut values = Vec::with_capacity(values_to_read); - let mut values_read = 0; - while self.position < data_len && values_read < values_to_read { - let len: usize = - read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize; - self.position += LEN_SIZE; - if data_len < self.position + len { - return Err(eof_err!("Not enough bytes to decode")); - } - values.push(self.data.range(self.position, len)); - self.position += len; - values_read += 1; - } - self.num_values -= values_read; - Ok(values) - } -} - -impl ValueDecoder for VariableLenPlainDecoder { - fn read_value_bytes( - &mut self, - num_values: usize, - read_bytes: &mut dyn FnMut(&[u8], usize), - ) -> Result { - const LEN_SIZE: usize = std::mem::size_of::(); - let data = self.data.data(); - let data_len = data.len(); - let values_to_read = std::cmp::min(self.num_values, num_values); - let mut values_read = 0; - while self.position < data_len && values_read < values_to_read { - let len: usize = - read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize; - self.position += LEN_SIZE; - if data_len < self.position + len { - return Err(eof_err!("Not enough bytes to decode")); - } - read_bytes(&data[self.position..][..len], 1); - self.position += len; - values_read += 1; - } - self.num_values -= values_read; - Ok(values_read) - } -} - -pub(crate) struct FixedLenDictionaryDecoder { - context_ref: Rc>, - key_data_bufer: ByteBufferPtr, - num_values: usize, - rle_decoder: RleDecoder, - value_byte_len: usize, - keys_buffer: Vec, -} - -impl FixedLenDictionaryDecoder { - pub(crate) fn new( - column_chunk_context: Rc>, - key_data_bufer: ByteBufferPtr, - num_values: usize, - value_bit_len: usize, - ) -> Self { - assert!( - value_bit_len % 8 == 0, - "value_bit_size must be a multiple of 8" - ); - // First byte in `data` is bit width - let bit_width = key_data_bufer.data()[0]; - let mut rle_decoder = RleDecoder::new(bit_width); - rle_decoder.set_data(key_data_bufer.start_from(1)); - - Self { - context_ref: column_chunk_context, - key_data_bufer, - num_values, - rle_decoder, - value_byte_len: value_bit_len / 8, - keys_buffer: vec![0; 2048], - } - } -} - -impl ValueDecoder for FixedLenDictionaryDecoder { - fn read_value_bytes( - &mut self, - num_values: usize, - read_bytes: &mut dyn FnMut(&[u8], usize), - ) -> Result { - if self.num_values == 0 { - return Ok(0); - } - let context = self.context_ref.borrow(); - let values = context.dictionary_values.as_ref().unwrap(); - let input_value_bytes = values[0].data(); - // read no more than available values or requested values - let values_to_read = std::cmp::min(self.num_values, num_values); - let mut values_read = 0; - while values_read < values_to_read { - // read values in batches of up to self.keys_buffer.len() - let keys_to_read = - std::cmp::min(values_to_read - values_read, self.keys_buffer.len()); - let keys_read = match self - .rle_decoder - .get_batch(&mut self.keys_buffer[..keys_to_read]) - { - Ok(keys_read) => keys_read, - Err(e) => return Err(e), - }; - if keys_read == 0 { - self.num_values = 0; - return Ok(values_read); - } - for i in 0..keys_read { - let key = self.keys_buffer[i] as usize; - read_bytes( - &input_value_bytes[key * self.value_byte_len..] - [..self.value_byte_len], - 1, - ); - } - values_read += keys_read; - } - self.num_values -= values_read; - Ok(values_read) - } -} - -pub(crate) struct VariableLenDictionaryDecoder { - context_ref: Rc>, - key_data_bufer: ByteBufferPtr, - num_values: usize, - rle_decoder: RleDecoder, - keys_buffer: Vec, -} - -impl VariableLenDictionaryDecoder { - pub(crate) fn new( - column_chunk_context: Rc>, - key_data_bufer: ByteBufferPtr, - num_values: usize, - ) -> Self { - // First byte in `data` is bit width - let bit_width = key_data_bufer.data()[0]; - let mut rle_decoder = RleDecoder::new(bit_width); - rle_decoder.set_data(key_data_bufer.start_from(1)); - - Self { - context_ref: column_chunk_context, - key_data_bufer, - num_values, - rle_decoder, - keys_buffer: vec![0; 2048], - } - } -} - -impl ValueDecoder for VariableLenDictionaryDecoder { - fn read_value_bytes( - &mut self, - num_values: usize, - read_bytes: &mut dyn FnMut(&[u8], usize), - ) -> Result { - if self.num_values == 0 { - return Ok(0); - } - let context = self.context_ref.borrow(); - let values = context.dictionary_values.as_ref().unwrap(); - let values_to_read = std::cmp::min(self.num_values, num_values); - let mut values_read = 0; - while values_read < values_to_read { - // read values in batches of up to self.keys_buffer.len() - let keys_to_read = - std::cmp::min(values_to_read - values_read, self.keys_buffer.len()); - let keys_read = match self - .rle_decoder - .get_batch(&mut self.keys_buffer[..keys_to_read]) - { - Ok(keys_read) => keys_read, - Err(e) => return Err(e), - }; - if keys_read == 0 { - self.num_values = 0; - return Ok(values_read); - } - for i in 0..keys_read { - let key = self.keys_buffer[i] as usize; - read_bytes(values[key].data(), 1); - } - values_read += keys_read; - } - self.num_values -= values_read; - Ok(values_read) - } -} - -pub(crate) struct DeltaByteArrayValueDecoder { - decoder: DeltaByteArrayDecoder, -} - -impl DeltaByteArrayValueDecoder { - pub fn new(data: ByteBufferPtr, num_values: usize) -> Result { - let mut decoder = DeltaByteArrayDecoder::new(); - decoder.set_data(data, num_values)?; - Ok(Self { decoder }) - } -} - -impl ValueDecoder for DeltaByteArrayValueDecoder { - fn read_value_bytes( - &mut self, - mut num_values: usize, - read_bytes: &mut dyn FnMut(&[u8], usize), - ) -> Result { - num_values = std::cmp::min(num_values, self.decoder.values_left()); - let mut values_read = 0; - let mut buf = [ByteArray::new()]; - while values_read < num_values { - let num_read = self.decoder.get(&mut buf)?; - debug_assert_eq!(num_read, 1); - - read_bytes(buf[0].data(), 1); - - values_read += 1; - } - Ok(values_read) - } -} - -use arrow::datatypes::ArrowPrimitiveType; - -pub struct PrimitiveArrayConverter { - _phantom_data: PhantomData, -} - -impl PrimitiveArrayConverter { - pub fn new() -> Self { - Self { - _phantom_data: PhantomData, - } - } -} - -impl ArrayConverter for PrimitiveArrayConverter { - fn convert_value_bytes( - &self, - value_decoder: &mut impl ValueDecoder, - num_values: usize, - ) -> Result { - let value_size = T::get_byte_width(); - let values_byte_capacity = num_values * value_size; - let mut values_buffer = MutableBuffer::new(values_byte_capacity); - - value_decoder.read_value_bytes(num_values, &mut |value_bytes, _| { - values_buffer.extend_from_slice(value_bytes); - })?; - - // calculate actual data_len, which may be different from the iterator's upper bound - let value_count = values_buffer.len() / value_size; - let array_data = arrow::array::ArrayData::builder(T::DATA_TYPE) - .len(value_count) - .add_buffer(values_buffer.into()); - let array_data = unsafe { array_data.build_unchecked() }; - Ok(array_data) - } -} - -pub struct StringArrayConverter {} - -impl StringArrayConverter { - pub fn new() -> Self { - Self {} - } -} - -impl ArrayConverter for StringArrayConverter { - fn convert_value_bytes( - &self, - value_decoder: &mut impl ValueDecoder, - num_values: usize, - ) -> Result { - use arrow::datatypes::ArrowNativeType; - let offset_size = std::mem::size_of::(); - let mut offsets_buffer = MutableBuffer::new((num_values + 1) * offset_size); - // allocate initial capacity of 1 byte for each item - let values_byte_capacity = num_values; - let mut values_buffer = MutableBuffer::new(values_byte_capacity); - - let mut length_so_far = i32::default(); - offsets_buffer.push(length_so_far); - - value_decoder.read_value_bytes(num_values, &mut |value_bytes, values_read| { - debug_assert_eq!( - values_read, 1, - "offset length value buffers can only contain bytes for a single value" - ); - length_so_far += - ::from_usize(value_bytes.len()).unwrap(); - // this should be safe because a ValueDecoder should not read more than num_values - unsafe { - offsets_buffer.push_unchecked(length_so_far); - } - values_buffer.extend_from_slice(value_bytes); - })?; - // calculate actual data_len, which may be different from the iterator's upper bound - let data_len = (offsets_buffer.len() / offset_size) - 1; - let array_data = arrow::array::ArrayData::builder(ArrowType::Utf8) - .len(data_len) - .add_buffer(offsets_buffer.into()) - .add_buffer(values_buffer.into()); - let array_data = unsafe { array_data.build_unchecked() }; - Ok(array_data) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::arrow::{ArrowReader, ParquetFileArrowReader}; - use crate::basic::ConvertedType; - use crate::column::page::Page; - use crate::column::writer::ColumnWriter; - use crate::data_type::ByteArray; - use crate::data_type::ByteArrayType; - use crate::encodings::encoding::{DictEncoder, Encoder}; - use crate::file::properties::WriterProperties; - use crate::file::reader::SerializedFileReader; - use crate::file::serialized_reader::SliceableCursor; - use crate::file::writer::{FileWriter, SerializedFileWriter, TryClone}; - use crate::util::memory::MemTracker; - use crate::schema::parser::parse_message_type; - use crate::schema::types::SchemaDescriptor; - use crate::util::test_common::page_util::{ - DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, - }; - use crate::{ - basic::Encoding, column::page::PageReader, schema::types::SchemaDescPtr, - }; - use arrow::array::{PrimitiveArray, StringArray}; - use arrow::datatypes::Int32Type as ArrowInt32; - use rand::{distributions::uniform::SampleUniform, thread_rng, Rng}; - use std::io::{Cursor, Seek, SeekFrom, Write}; - use std::sync::{Arc, Mutex}; - - /// Iterator for testing reading empty columns - struct EmptyPageIterator { - schema: SchemaDescPtr, - } - - impl EmptyPageIterator { - fn new(schema: SchemaDescPtr) -> Self { - EmptyPageIterator { schema } - } - } - - impl Iterator for EmptyPageIterator { - type Item = Result>; - - fn next(&mut self) -> Option { - None - } - } - - impl PageIterator for EmptyPageIterator { - fn schema(&mut self) -> Result { - Ok(self.schema.clone()) - } - - fn column_schema(&mut self) -> Result { - Ok(self.schema.column(0)) - } - } - - #[test] - fn test_array_reader_empty_pages() { - // Construct column schema - let message_type = " - message test_schema { - REQUIRED INT32 leaf; - } - "; - - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - - let column_desc = schema.column(0); - let page_iterator = EmptyPageIterator::new(schema); - - let converter = PrimitiveArrayConverter::::new(); - let mut array_reader = - ArrowArrayReader::try_new(page_iterator, column_desc, converter, None) - .unwrap(); - - // expect no values to be read - let array = array_reader.next_batch(50).unwrap(); - assert!(array.is_empty()); - } - - fn make_column_chunks( - column_desc: ColumnDescPtr, - encoding: Encoding, - num_levels: usize, - min_value: T::T, - max_value: T::T, - def_levels: &mut Vec, - rep_levels: &mut Vec, - values: &mut Vec, - page_lists: &mut Vec>, - use_v2: bool, - num_chunks: usize, - ) where - T::T: PartialOrd + SampleUniform + Copy, - { - for _i in 0..num_chunks { - let mut pages = VecDeque::new(); - let mut data = Vec::new(); - let mut page_def_levels = Vec::new(); - let mut page_rep_levels = Vec::new(); - - crate::util::test_common::make_pages::( - column_desc.clone(), - encoding, - 1, - num_levels, - min_value, - max_value, - &mut page_def_levels, - &mut page_rep_levels, - &mut data, - &mut pages, - use_v2, - ); - - def_levels.append(&mut page_def_levels); - rep_levels.append(&mut page_rep_levels); - values.append(&mut data); - page_lists.push(Vec::from(pages)); - } - } - - #[test] - fn test_primitive_array_reader_data() { - // Construct column schema - let message_type = " - message test_schema { - REQUIRED INT32 leaf; - } - "; - - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - - let column_desc = schema.column(0); - - // Construct page iterator - { - let mut data = Vec::new(); - let mut page_lists = Vec::new(); - make_column_chunks::( - column_desc.clone(), - Encoding::PLAIN, - 100, - 1, - 200, - &mut Vec::new(), - &mut Vec::new(), - &mut data, - &mut page_lists, - true, - 2, - ); - let page_iterator = - InMemoryPageIterator::new(schema, column_desc.clone(), page_lists); - - let converter = PrimitiveArrayConverter::::new(); - let mut array_reader = - ArrowArrayReader::try_new(page_iterator, column_desc, converter, None) - .unwrap(); - - // Read first 50 values, which are all from the first column chunk - let array = array_reader.next_batch(50).unwrap(); - let array = array - .as_any() - .downcast_ref::>() - .unwrap(); - - assert_eq!( - &PrimitiveArray::::from(data[0..50].to_vec()), - array - ); - - // Read next 100 values, the first 50 ones are from the first column chunk, - // and the last 50 ones are from the second column chunk - let array = array_reader.next_batch(100).unwrap(); - let array = array - .as_any() - .downcast_ref::>() - .unwrap(); - - assert_eq!( - &PrimitiveArray::::from(data[50..150].to_vec()), - array - ); - - // Try to read 100 values, however there are only 50 values - let array = array_reader.next_batch(100).unwrap(); - let array = array - .as_any() - .downcast_ref::>() - .unwrap(); - - assert_eq!( - &PrimitiveArray::::from(data[150..200].to_vec()), - array - ); - } - } - - #[test] - fn test_primitive_array_reader_def_and_rep_levels() { - // Construct column schema - let message_type = " - message test_schema { - REPEATED Group test_mid { - OPTIONAL INT32 leaf; - } - } - "; - - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - - let column_desc = schema.column(0); - - // Construct page iterator - { - let mut def_levels = Vec::new(); - let mut rep_levels = Vec::new(); - let mut page_lists = Vec::new(); - make_column_chunks::( - column_desc.clone(), - Encoding::PLAIN, - 100, - 1, - 200, - &mut def_levels, - &mut rep_levels, - &mut Vec::new(), - &mut page_lists, - true, - 2, - ); - - let page_iterator = - InMemoryPageIterator::new(schema, column_desc.clone(), page_lists); - - let converter = PrimitiveArrayConverter::::new(); - let mut array_reader = - ArrowArrayReader::try_new(page_iterator, column_desc, converter, None) - .unwrap(); - - let mut accu_len: usize = 0; - - // Read first 50 values, which are all from the first column chunk - let array = array_reader.next_batch(50).unwrap(); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - accu_len += array.len(); - - // Read next 100 values, the first 50 ones are from the first column chunk, - // and the last 50 ones are from the second column chunk - let array = array_reader.next_batch(100).unwrap(); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - accu_len += array.len(); - - // Try to read 100 values, however there are only 50 values - let array = array_reader.next_batch(100).unwrap(); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - - assert_eq!(accu_len + array.len(), 200); - } - } - - #[test] - fn test_arrow_array_reader_string() { - // Construct column schema - let message_type = " - message test_schema { - REPEATED Group test_mid { - OPTIONAL BYTE_ARRAY leaf (UTF8); - } - } - "; - let num_pages = 2; - let values_per_page = 100; - let str_base = "Hello World"; - - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - let column_desc = schema.column(0); - let max_def_level = column_desc.max_def_level(); - let max_rep_level = column_desc.max_rep_level(); - - assert_eq!(max_def_level, 2); - assert_eq!(max_rep_level, 1); - - let mut rng = thread_rng(); - let mut pages: Vec> = Vec::new(); - - let mut rep_levels = Vec::with_capacity(num_pages * values_per_page); - let mut def_levels = Vec::with_capacity(num_pages * values_per_page); - let mut all_values = Vec::with_capacity(num_pages * values_per_page); - - for i in 0..num_pages { - let mut values = Vec::with_capacity(values_per_page); - - for _ in 0..values_per_page { - let def_level = rng.gen_range(0..max_def_level + 1); - let rep_level = rng.gen_range(0..max_rep_level + 1); - if def_level == max_def_level { - let len = rng.gen_range(1..str_base.len()); - let slice = &str_base[..len]; - values.push(ByteArray::from(slice)); - all_values.push(Some(slice.to_string())); - } else { - all_values.push(None) - } - rep_levels.push(rep_level); - def_levels.push(def_level) - } - - let range = i * values_per_page..(i + 1) * values_per_page; - let mut pb = - DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); - - pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]); - pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]); - pb.add_values::(Encoding::PLAIN, values.as_slice()); - - let data_page = pb.consume(); - pages.push(vec![data_page]); - } - - let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); - let converter = StringArrayConverter::new(); - let mut array_reader = - ArrowArrayReader::try_new(page_iterator, column_desc, converter, None) - .unwrap(); - - let mut accu_len: usize = 0; - - let array = array_reader.next_batch(values_per_page / 2).unwrap(); - assert_eq!(array.len(), values_per_page / 2); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - accu_len += array.len(); - - // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, - // and the last values_per_page/2 ones are from the second column chunk - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), values_per_page); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - let strings = array.as_any().downcast_ref::().unwrap(); - for i in 0..array.len() { - if array.is_valid(i) { - assert_eq!( - all_values[i + accu_len].as_ref().unwrap().as_str(), - strings.value(i) - ) - } else { - assert_eq!(all_values[i + accu_len], None) - } - } - accu_len += array.len(); - - // Try to read values_per_page values, however there are only values_per_page/2 values - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), values_per_page / 2); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - } - - #[test] - fn test_arrow_array_reader_dict_enc_string() { - // Construct column schema - let message_type = " - message test_schema { - REPEATED Group test_mid { - OPTIONAL BYTE_ARRAY leaf (UTF8); - } - } - "; - let num_pages = 2; - let values_per_page = 100; - let str_base = "Hello World"; - - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - let column_desc = schema.column(0); - let max_def_level = column_desc.max_def_level(); - let max_rep_level = column_desc.max_rep_level(); - - assert_eq!(max_def_level, 2); - assert_eq!(max_rep_level, 1); - - let mut rng = thread_rng(); - let mut pages: Vec> = Vec::new(); - - let mut rep_levels = Vec::with_capacity(num_pages * values_per_page); - let mut def_levels = Vec::with_capacity(num_pages * values_per_page); - let mut all_values = Vec::with_capacity(num_pages * values_per_page); - - for i in 0..num_pages { - let mem_tracker = Arc::new(MemTracker::new()); - let mut dict_encoder = - DictEncoder::::new(column_desc.clone(), mem_tracker); - // add data page - let mut values = Vec::with_capacity(values_per_page); - - for _ in 0..values_per_page { - let def_level = rng.gen_range(0..max_def_level + 1); - let rep_level = rng.gen_range(0..max_rep_level + 1); - if def_level == max_def_level { - let len = rng.gen_range(1..str_base.len()); - let slice = &str_base[..len]; - values.push(ByteArray::from(slice)); - all_values.push(Some(slice.to_string())); - } else { - all_values.push(None) - } - rep_levels.push(rep_level); - def_levels.push(def_level) - } - - let range = i * values_per_page..(i + 1) * values_per_page; - let mut pb = - DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); - pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]); - pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]); - let _ = dict_encoder.put(&values); - let indices = dict_encoder - .write_indices() - .expect("write_indices() should be OK"); - pb.add_indices(indices); - let data_page = pb.consume(); - // for each page log num_values vs actual values in page - // println!("page num_values: {}, values.len(): {}", data_page.num_values(), values.len()); - // add dictionary page - let dict = dict_encoder - .write_dict() - .expect("write_dict() should be OK"); - let dict_page = Page::DictionaryPage { - buf: dict, - num_values: dict_encoder.num_entries() as u32, - encoding: Encoding::RLE_DICTIONARY, - is_sorted: false, - }; - pages.push(vec![dict_page, data_page]); - } - - let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); - let converter = StringArrayConverter::new(); - let mut array_reader = - ArrowArrayReader::try_new(page_iterator, column_desc, converter, None) - .unwrap(); - - let mut accu_len: usize = 0; - - // println!("---------- reading a batch of {} values ----------", values_per_page / 2); - let array = array_reader.next_batch(values_per_page / 2).unwrap(); - assert_eq!(array.len(), values_per_page / 2); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - accu_len += array.len(); - - // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, - // and the last values_per_page/2 ones are from the second column chunk - // println!("---------- reading a batch of {} values ----------", values_per_page); - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), values_per_page); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - let strings = array.as_any().downcast_ref::().unwrap(); - for i in 0..array.len() { - if array.is_valid(i) { - assert_eq!( - all_values[i + accu_len].as_ref().unwrap().as_str(), - strings.value(i) - ) - } else { - assert_eq!(all_values[i + accu_len], None) - } - } - accu_len += array.len(); - - // Try to read values_per_page values, however there are only values_per_page/2 values - // println!("---------- reading a batch of {} values ----------", values_per_page); - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), values_per_page / 2); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - } - - /// Allows to write parquet into memory. Intended only for use in tests. - #[derive(Clone)] - struct VecWriter { - data: Arc>>>, - } - - impl VecWriter { - pub fn new() -> VecWriter { - VecWriter { - data: Arc::new(Mutex::new(Cursor::new(Vec::new()))), - } - } - - pub fn consume(self) -> Vec { - Arc::try_unwrap(self.data) - .unwrap() - .into_inner() - .unwrap() - .into_inner() - } - } - - impl TryClone for VecWriter { - fn try_clone(&self) -> std::io::Result { - Ok(self.clone()) - } - } - - impl Seek for VecWriter { - fn seek(&mut self, pos: SeekFrom) -> std::io::Result { - self.data.lock().unwrap().seek(pos) - } - - fn stream_position(&mut self) -> std::io::Result { - self.data.lock().unwrap().stream_position() - } - } - - impl Write for VecWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.data.lock().unwrap().write(buf) - } - - fn flush(&mut self) -> std::io::Result<()> { - self.data.lock().unwrap().flush() - } - } - - #[test] - fn test_string_delta_byte_array() { - use crate::basic; - use crate::schema::types::Type; - - let data = VecWriter::new(); - let schema = Arc::new( - Type::group_type_builder("string_test") - .with_fields(&mut vec![Arc::new( - Type::primitive_type_builder("c", basic::Type::BYTE_ARRAY) - .with_converted_type(ConvertedType::UTF8) - .build() - .unwrap(), - )]) - .build() - .unwrap(), - ); - // Disable dictionary and use the fallback encoding. - let p = Arc::new( - WriterProperties::builder() - .set_dictionary_enabled(false) - .set_encoding(Encoding::DELTA_BYTE_ARRAY) - .build(), - ); - // Write a few strings. - let mut w = SerializedFileWriter::new(data.clone(), schema, p).unwrap(); - let mut rg = w.next_row_group().unwrap(); - let mut c = rg.next_column().unwrap().unwrap(); - match &mut c { - ColumnWriter::ByteArrayColumnWriter(c) => { - c.write_batch( - &[ByteArray::from("foo"), ByteArray::from("bar")], - Some(&[0, 1, 0, 0, 1, 0]), - Some(&[0, 0, 0, 0, 0, 0]), - ) - .unwrap(); - } - _ => panic!("unexpected column"), - }; - rg.close_column(c).unwrap(); - w.close_row_group(rg).unwrap(); - w.close().unwrap(); - std::mem::drop(w); - - // Check we can read them back. - let r = SerializedFileReader::new(SliceableCursor::new(Arc::new(data.consume()))) - .unwrap(); - let mut r = ParquetFileArrowReader::new(Arc::new(r)); - - let batch = r - .get_record_reader_by_columns([0], 1024) - .unwrap() - .next() - .unwrap() - .unwrap(); - assert_eq!(batch.columns().len(), 1); - - let strings = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!( - strings.into_iter().collect::>(), - vec![None, Some("foo"), None, None, Some("bar"), None] - ); - } -} diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index fbbf65593182..a2f885a8d116 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -119,7 +119,6 @@ //! ``` experimental_mod!(array_reader); -experimental_mod!(arrow_array_reader); pub mod arrow_reader; pub mod arrow_writer; mod bit_util;