Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions arrow-buffer/src/util/bit_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ impl Iterator for BitIndexIterator<'_> {
self.chunk_offset += 64;
}
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let current = self.current_chunk.count_ones() as usize;
let (_, remaining_chunks) = self.iter.size_hint();
let upper = remaining_chunks.map(|chunks| current + chunks.saturating_mul(64));
(current, upper)
}
}

/// An iterator of u32 whose index in a provided bitmask is true
Expand Down Expand Up @@ -375,6 +383,14 @@ impl<'a> Iterator for BitIndexU32Iterator<'a> {
}
}
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let current = self.curr.count_ones() as usize;
let (_, remaining_chunks) = self.iter.size_hint();
let upper = remaining_chunks.map(|chunks| current + chunks.saturating_mul(64));
(current, upper)
}
}

/// Calls the provided closure for each index in the provided null mask that is set,
Expand Down
261 changes: 223 additions & 38 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ use crate::geospatial::statistics::GeospatialStatistics;
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use arrow_array::types::ByteArrayType;
use arrow_array::{
Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
GenericByteArray, LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
};
use arrow_buffer::ArrowNativeType;
use arrow_schema::DataType;

macro_rules! downcast_dict_impl {
Expand Down Expand Up @@ -222,6 +224,71 @@ impl FallbackEncoder {
}
}

/// Encode a contiguous range from an offset-based byte array
fn encode_dense<T>(&mut self, values: &GenericByteArray<T>, offset: usize, len: usize)
where
T: ByteArrayType,
{
self.num_values += len;
let offsets = values.value_offsets();
let data = values.value_data();
let end = offset + len;

let first_byte = offsets[offset].as_usize();
let last_byte = offsets[end].as_usize();
let total_bytes = last_byte - first_byte;

match &mut self.encoder {
FallbackEncoderImpl::Plain { buffer } => {
buffer.reserve(total_bytes.saturating_add(len.saturating_mul(4)));
for idx in offset..end {
let value = dense_byte_value::<T>(offsets, data, idx);
buffer.extend_from_slice((value.len() as u32).as_bytes());
buffer.extend_from_slice(value);
self.variable_length_bytes += value.len() as i64;
}
}
FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
buffer.reserve(total_bytes);
for idx in offset..end {
let value = dense_byte_value::<T>(offsets, data, idx);
lengths.put(&[value.len() as i32]).unwrap();
buffer.extend_from_slice(value);
self.variable_length_bytes += value.len() as i64;
}
}
FallbackEncoderImpl::Delta {
buffer,
last_value,
prefix_lengths,
suffix_lengths,
} => {
buffer.reserve(total_bytes);
for idx in offset..end {
let value = dense_byte_value::<T>(offsets, data, idx);
let mut prefix_length = 0;

while prefix_length < last_value.len()
&& prefix_length < value.len()
&& last_value[prefix_length] == value[prefix_length]
{
prefix_length += 1;
}

let suffix_length = value.len() - prefix_length;

last_value.clear();
last_value.extend_from_slice(value);

buffer.extend_from_slice(&value[prefix_length..]);
prefix_lengths.put(&[prefix_length as i32]).unwrap();
suffix_lengths.put(&[suffix_length as i32]).unwrap();
self.variable_length_bytes += value.len() as i64;
}
}
}
}

/// Returns an estimate of the data page size in bytes
///
/// This includes:
Expand Down Expand Up @@ -358,6 +425,23 @@ impl DictEncoder {
}
}

/// Encode a contiguous range from an offset-based byte array
fn encode_dense<T>(&mut self, values: &GenericByteArray<T>, offset: usize, len: usize)
where
T: ByteArrayType,
{
self.indices.reserve(len);

let offsets = values.value_offsets();
let data = values.value_data();
for idx in offset..offset + len {
let value = dense_byte_value::<T>(offsets, data, idx);
let interned = self.interner.intern(value);
self.indices.push(interned);
self.variable_length_bytes += value.len() as i64;
}
}

fn bit_width(&self) -> u8 {
let length = self.interner.storage().values.len();
num_required_bits(length.saturating_sub(1) as u64)
Expand Down Expand Up @@ -466,8 +550,43 @@ impl ColumnValueEncoder for ByteArrayEncoder {
})
}

fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
unreachable!("should call write_gather instead")
fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()> {
match values.data_type() {
DataType::Utf8 => encode_dense(
values.as_any().downcast_ref::<StringArray>().unwrap(),
offset,
len,
self,
),
DataType::LargeUtf8 => encode_dense(
values.as_any().downcast_ref::<LargeStringArray>().unwrap(),
offset,
len,
self,
),
DataType::Binary => encode_dense(
values.as_any().downcast_ref::<BinaryArray>().unwrap(),
offset,
len,
self,
),
DataType::LargeBinary => encode_dense(
values.as_any().downcast_ref::<LargeBinaryArray>().unwrap(),
offset,
len,
self,
),
_ => {
downcast_op!(
values.data_type(),
values,
encode,
offset..offset + len,
self
);
}
}
Ok(())
}

fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
Expand Down Expand Up @@ -557,6 +676,16 @@ impl ColumnValueEncoder for ByteArrayEncoder {
}
}

#[inline]
fn dense_byte_value<'a, T>(offsets: &[T::Offset], data: &'a [u8], idx: usize) -> &'a [u8]
where
T: ByteArrayType,
{
let start = offsets[idx].as_usize();
let end = offsets[idx + 1].as_usize();
&data[start..end]
}

/// Encodes the provided `values` and `indices` to `encoder`
///
/// This is a free function so it can be used with `downcast_op!`
Expand All @@ -567,24 +696,12 @@ where
I: ExactSizeIterator<Item = usize> + Clone,
{
if encoder.statistics_enabled != EnabledStatistics::None {
if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
update_geo_stats_accumulator(accumulator.as_mut(), values, indices.clone());
} else if let Some((min, max)) = compute_min_max(values, indices.clone()) {
if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
encoder.min_value = Some(min);
}

if encoder.max_value.as_ref().is_none_or(|m| m < &max) {
encoder.max_value = Some(max);
}
}
update_statistics(encoder, byte_values(values, indices.clone()));
}

// encode the values into bloom filter if enabled
if let Some(bloom_filter) = &mut encoder.bloom_filter {
for idx in indices.clone() {
bloom_filter.insert(values.value(idx).as_ref());
}
update_bloom_filter(bloom_filter, byte_values(values, indices.clone()));
}

match &mut encoder.dict_encoder {
Expand All @@ -593,43 +710,111 @@ where
}
}

/// Computes the min and max for the provided array and indices
///
/// This is a free function so it can be used with `downcast_op!`
fn compute_min_max<T>(
array: T,
mut valid: impl Iterator<Item = usize>,
) -> Option<(ByteArray, ByteArray)>
/// Encodes a contiguous range from an offset-based byte array
fn encode_dense<T>(
values: &GenericByteArray<T>,
offset: usize,
len: usize,
encoder: &mut ByteArrayEncoder,
) where
T: ByteArrayType,
{
if len == 0 {
return;
}

if encoder.statistics_enabled != EnabledStatistics::None {
update_statistics(encoder, dense_byte_values(values, offset, len));
}

if let Some(bloom_filter) = &mut encoder.bloom_filter {
update_bloom_filter(bloom_filter, dense_byte_values(values, offset, len));
}

match &mut encoder.dict_encoder {
Some(dict_encoder) => dict_encoder.encode_dense(values, offset, len),
None => encoder.fallback.encode_dense(values, offset, len),
}
}

#[inline]
fn byte_values<T, I>(values: T, indices: I) -> impl Iterator<Item = T::Item>
where
T: ArrayAccessor,
T::Item: Copy + Ord + AsRef<[u8]>,
T: ArrayAccessor + Copy,
I: Iterator<Item = usize>,
{
indices.map(move |idx| values.value(idx))
}

#[inline]
fn dense_byte_values<T>(
values: &GenericByteArray<T>,
offset: usize,
len: usize,
) -> impl ExactSizeIterator<Item = &[u8]>
where
T: ByteArrayType,
{
let offsets = values.value_offsets();
let data = values.value_data();
(offset..offset + len).map(move |idx| dense_byte_value::<T>(offsets, data, idx))
}

#[inline]
fn update_statistics<T>(encoder: &mut ByteArrayEncoder, values: impl Iterator<Item = T>)
where
T: Copy + Ord + AsRef<[u8]>,
{
let first_idx = valid.next()?;
if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
update_geo_stats_accumulator(accumulator.as_mut(), values);
} else if let Some((min, max)) = compute_min_max(values) {
if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
encoder.min_value = Some(min);
}

let first_val = array.value(first_idx);
if encoder.max_value.as_ref().is_none_or(|m| m < &max) {
encoder.max_value = Some(max);
}
}
}

#[inline]
fn update_bloom_filter<T>(bloom_filter: &mut Sbbf, values: impl Iterator<Item = T>)
where
T: AsRef<[u8]>,
{
for value in values {
bloom_filter.insert(value.as_ref());
}
}

/// Computes the min and max for the provided values
#[inline]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you foudn that inlining actually helps in all these cases? We have found that in some cases inlining actually makes the performance worse (as there are a bunch of optimizations in LLVM that are disabled once the function gets too big)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't. For these small methods I tend to default to #[inline] out of habit (and that seems to be common practice in this code base AFAICT.) I'll do some benchmarking to test that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on string/default I can't measure any difference with and without forced inlining. I'm removing it 👍

fn compute_min_max<T>(mut values: impl Iterator<Item = T>) -> Option<(ByteArray, ByteArray)>
where
T: Copy + Ord + AsRef<[u8]>,
{
let first_val = values.next()?;
let mut min = first_val;
let mut max = first_val;
for idx in valid {
let val = array.value(idx);
for val in values {
min = min.min(val);
max = max.max(val);
}
Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
}

/// Updates geospatial statistics for the provided array and indices
/// Updates geospatial statistics for the provided values
#[inline]
fn update_geo_stats_accumulator<T>(
bounder: &mut dyn GeoStatsAccumulator,
array: T,
valid: impl Iterator<Item = usize>,
values: impl Iterator<Item = T>,
) where
T: ArrayAccessor,
T::Item: Copy + Ord + AsRef<[u8]>,
T: AsRef<[u8]>,
{
if bounder.is_valid() {
for idx in valid {
let val = array.value(idx);
bounder.update_wkb(val.as_ref());
for value in values {
bounder.update_wkb(value.as_ref());
}
}
}
Loading
Loading