diff --git a/arrow-buffer/src/util/bit_iterator.rs b/arrow-buffer/src/util/bit_iterator.rs index a6504b0ca858..8c3e71ab08b7 100644 --- a/arrow-buffer/src/util/bit_iterator.rs +++ b/arrow-buffer/src/util/bit_iterator.rs @@ -318,6 +318,14 @@ impl Iterator for BitIndexIterator<'_> { self.chunk_offset += 64; } } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + let current = self.current_chunk.count_ones() as usize; + let (_, remaining_chunks) = self.iter.size_hint(); + let upper = remaining_chunks.map(|chunks| current + chunks.saturating_mul(64)); + (current, upper) + } } /// An iterator of u32 whose index in a provided bitmask is true @@ -375,6 +383,14 @@ impl<'a> Iterator for BitIndexU32Iterator<'a> { } } } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + let current = self.curr.count_ones() as usize; + let (_, remaining_chunks) = self.iter.size_hint(); + let upper = remaining_chunks.map(|chunks| current + chunks.saturating_mul(64)); + (current, upper) + } } /// Calls the provided closure for each index in the provided null mask that is set, diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 9cb0718b4d84..8d88dd585905 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -30,10 +30,12 @@ use crate::geospatial::statistics::GeospatialStatistics; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; +use arrow_array::types::ByteArrayType; use arrow_array::{ Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray, - LargeBinaryArray, LargeStringArray, StringArray, StringViewArray, + GenericByteArray, LargeBinaryArray, LargeStringArray, StringArray, StringViewArray, }; +use arrow_buffer::ArrowNativeType; use arrow_schema::DataType; macro_rules! downcast_dict_impl { @@ -222,6 +224,71 @@ impl FallbackEncoder { } } + /// Encode a contiguous range from an offset-based byte array + fn encode_dense(&mut self, values: &GenericByteArray, offset: usize, len: usize) + where + T: ByteArrayType, + { + self.num_values += len; + let offsets = values.value_offsets(); + let data = values.value_data(); + let end = offset + len; + + let first_byte = offsets[offset].as_usize(); + let last_byte = offsets[end].as_usize(); + let total_bytes = last_byte - first_byte; + + match &mut self.encoder { + FallbackEncoderImpl::Plain { buffer } => { + buffer.reserve(total_bytes.saturating_add(len.saturating_mul(4))); + for idx in offset..end { + let value = dense_byte_value::(offsets, data, idx); + buffer.extend_from_slice((value.len() as u32).as_bytes()); + buffer.extend_from_slice(value); + self.variable_length_bytes += value.len() as i64; + } + } + FallbackEncoderImpl::DeltaLength { buffer, lengths } => { + buffer.reserve(total_bytes); + for idx in offset..end { + let value = dense_byte_value::(offsets, data, idx); + lengths.put(&[value.len() as i32]).unwrap(); + buffer.extend_from_slice(value); + self.variable_length_bytes += value.len() as i64; + } + } + FallbackEncoderImpl::Delta { + buffer, + last_value, + prefix_lengths, + suffix_lengths, + } => { + buffer.reserve(total_bytes); + for idx in offset..end { + let value = dense_byte_value::(offsets, data, idx); + let mut prefix_length = 0; + + while prefix_length < last_value.len() + && prefix_length < value.len() + && last_value[prefix_length] == value[prefix_length] + { + prefix_length += 1; + } + + let suffix_length = value.len() - prefix_length; + + last_value.clear(); + last_value.extend_from_slice(value); + + buffer.extend_from_slice(&value[prefix_length..]); + prefix_lengths.put(&[prefix_length as i32]).unwrap(); + suffix_lengths.put(&[suffix_length as i32]).unwrap(); + self.variable_length_bytes += value.len() as i64; + } + } + } + } + /// Returns an estimate of the data page size in bytes /// /// This includes: @@ -358,6 +425,23 @@ impl DictEncoder { } } + /// Encode a contiguous range from an offset-based byte array + fn encode_dense(&mut self, values: &GenericByteArray, offset: usize, len: usize) + where + T: ByteArrayType, + { + self.indices.reserve(len); + + let offsets = values.value_offsets(); + let data = values.value_data(); + for idx in offset..offset + len { + let value = dense_byte_value::(offsets, data, idx); + let interned = self.interner.intern(value); + self.indices.push(interned); + self.variable_length_bytes += value.len() as i64; + } + } + fn bit_width(&self) -> u8 { let length = self.interner.storage().values.len(); num_required_bits(length.saturating_sub(1) as u64) @@ -466,8 +550,43 @@ impl ColumnValueEncoder for ByteArrayEncoder { }) } - fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> { - unreachable!("should call write_gather instead") + fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()> { + match values.data_type() { + DataType::Utf8 => encode_dense( + values.as_any().downcast_ref::().unwrap(), + offset, + len, + self, + ), + DataType::LargeUtf8 => encode_dense( + values.as_any().downcast_ref::().unwrap(), + offset, + len, + self, + ), + DataType::Binary => encode_dense( + values.as_any().downcast_ref::().unwrap(), + offset, + len, + self, + ), + DataType::LargeBinary => encode_dense( + values.as_any().downcast_ref::().unwrap(), + offset, + len, + self, + ), + _ => { + downcast_op!( + values.data_type(), + values, + encode, + offset..offset + len, + self + ); + } + } + Ok(()) } fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> { @@ -557,6 +676,16 @@ impl ColumnValueEncoder for ByteArrayEncoder { } } +#[inline] +fn dense_byte_value<'a, T>(offsets: &[T::Offset], data: &'a [u8], idx: usize) -> &'a [u8] +where + T: ByteArrayType, +{ + let start = offsets[idx].as_usize(); + let end = offsets[idx + 1].as_usize(); + &data[start..end] +} + /// Encodes the provided `values` and `indices` to `encoder` /// /// This is a free function so it can be used with `downcast_op!` @@ -567,24 +696,12 @@ where I: ExactSizeIterator + Clone, { if encoder.statistics_enabled != EnabledStatistics::None { - if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() { - update_geo_stats_accumulator(accumulator.as_mut(), values, indices.clone()); - } else if let Some((min, max)) = compute_min_max(values, indices.clone()) { - if encoder.min_value.as_ref().is_none_or(|m| m > &min) { - encoder.min_value = Some(min); - } - - if encoder.max_value.as_ref().is_none_or(|m| m < &max) { - encoder.max_value = Some(max); - } - } + update_statistics(encoder, byte_values(values, indices.clone())); } // encode the values into bloom filter if enabled if let Some(bloom_filter) = &mut encoder.bloom_filter { - for idx in indices.clone() { - bloom_filter.insert(values.value(idx).as_ref()); - } + update_bloom_filter(bloom_filter, byte_values(values, indices.clone())); } match &mut encoder.dict_encoder { @@ -593,43 +710,111 @@ where } } -/// Computes the min and max for the provided array and indices -/// -/// This is a free function so it can be used with `downcast_op!` -fn compute_min_max( - array: T, - mut valid: impl Iterator, -) -> Option<(ByteArray, ByteArray)> +/// Encodes a contiguous range from an offset-based byte array +fn encode_dense( + values: &GenericByteArray, + offset: usize, + len: usize, + encoder: &mut ByteArrayEncoder, +) where + T: ByteArrayType, +{ + if len == 0 { + return; + } + + if encoder.statistics_enabled != EnabledStatistics::None { + update_statistics(encoder, dense_byte_values(values, offset, len)); + } + + if let Some(bloom_filter) = &mut encoder.bloom_filter { + update_bloom_filter(bloom_filter, dense_byte_values(values, offset, len)); + } + + match &mut encoder.dict_encoder { + Some(dict_encoder) => dict_encoder.encode_dense(values, offset, len), + None => encoder.fallback.encode_dense(values, offset, len), + } +} + +#[inline] +fn byte_values(values: T, indices: I) -> impl Iterator where - T: ArrayAccessor, - T::Item: Copy + Ord + AsRef<[u8]>, + T: ArrayAccessor + Copy, + I: Iterator, +{ + indices.map(move |idx| values.value(idx)) +} + +#[inline] +fn dense_byte_values( + values: &GenericByteArray, + offset: usize, + len: usize, +) -> impl ExactSizeIterator +where + T: ByteArrayType, +{ + let offsets = values.value_offsets(); + let data = values.value_data(); + (offset..offset + len).map(move |idx| dense_byte_value::(offsets, data, idx)) +} + +#[inline] +fn update_statistics(encoder: &mut ByteArrayEncoder, values: impl Iterator) +where + T: Copy + Ord + AsRef<[u8]>, { - let first_idx = valid.next()?; + if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() { + update_geo_stats_accumulator(accumulator.as_mut(), values); + } else if let Some((min, max)) = compute_min_max(values) { + if encoder.min_value.as_ref().is_none_or(|m| m > &min) { + encoder.min_value = Some(min); + } - let first_val = array.value(first_idx); + if encoder.max_value.as_ref().is_none_or(|m| m < &max) { + encoder.max_value = Some(max); + } + } +} + +#[inline] +fn update_bloom_filter(bloom_filter: &mut Sbbf, values: impl Iterator) +where + T: AsRef<[u8]>, +{ + for value in values { + bloom_filter.insert(value.as_ref()); + } +} + +/// Computes the min and max for the provided values +#[inline] +fn compute_min_max(mut values: impl Iterator) -> Option<(ByteArray, ByteArray)> +where + T: Copy + Ord + AsRef<[u8]>, +{ + let first_val = values.next()?; let mut min = first_val; let mut max = first_val; - for idx in valid { - let val = array.value(idx); + for val in values { min = min.min(val); max = max.max(val); } Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into())) } -/// Updates geospatial statistics for the provided array and indices +/// Updates geospatial statistics for the provided values +#[inline] fn update_geo_stats_accumulator( bounder: &mut dyn GeoStatsAccumulator, - array: T, - valid: impl Iterator, + values: impl Iterator, ) where - T: ArrayAccessor, - T::Item: Copy + Ord + AsRef<[u8]>, + T: AsRef<[u8]>, { if bounder.is_valid() { - for idx in valid { - let val = array.value(idx); - bounder.update_wkb(val.as_ref()); + for value in values { + bounder.update_wkb(value.as_ref()); } } } diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index 10f90f707c08..1fe357f67d02 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -41,7 +41,7 @@ //! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding) use crate::column::chunker::CdcChunk; -use crate::column::writer::LevelDataRef; +use crate::column::writer::{LevelDataRef, ValueSelectionRef}; use crate::errors::{ParquetError, Result}; use arrow_array::cast::AsArray; use arrow_array::{Array, ArrayRef, OffsetSizeTrait}; @@ -670,63 +670,72 @@ impl LevelInfoBuilder { // Fast path: entire leaf array is null if let Some(nulls) = &info.logical_nulls { - if !matches!(info.def_levels, LevelData::Absent) && nulls.null_count() == nulls.len() { + if info.max_def_level > 0 && nulls.null_count() == nulls.len() { info.extend_uniform_levels(info.max_def_level - 1, info.max_rep_level, len); return; } } - if matches!(info.def_levels, LevelData::Absent) { - info.non_null_indices.extend(range.clone()); - } else { - let max_def_level = info.max_def_level; - match &info.logical_nulls { - Some(nulls) => { - assert!(range.end <= nulls.len()); - // Bulk-fill is profitable only on null-heavy ranges long enough to - // amortize the slice/popcount cost; see `BULK_FILL_MIN_LEN` and the - // PR description for the threshold sweep. The gate uses the cached - // buffer-wide `null_count` (O(1)) to stay cheap on the cold path. - if len >= BULK_FILL_MIN_LEN && nulls.null_count() * 2 >= nulls.len() { - let range_nulls = nulls.slice(range.start, len); - let valid_in_range = len - range_nulls.null_count(); - let null_def_level = max_def_level - 1; - let buf = info - .def_levels - .materialize_mut() - .expect("definition levels present"); - let base = buf.len(); - buf.resize(base + len, null_def_level); - for i in range_nulls.valid_indices() { - buf[base + i] = max_def_level; - } - info.non_null_indices.reserve(valid_in_range); - info.non_null_indices - .extend(range_nulls.valid_indices().map(|i| i + range.start)); - } else { - let bits = nulls.inner(); - info.def_levels.extend_from_iter(range.clone().map(|i| { - // Safety: range.end was asserted to be in bounds earlier - let valid = unsafe { bits.value_unchecked(i) }; - max_def_level - (!valid as i16) - })); - info.non_null_indices.reserve(len); - info.non_null_indices.extend( - BitIndexIterator::new(bits.inner(), bits.offset() + range.start, len) - .map(|i| i + range.start), - ); + // Cheap Arc clone: releases the shared borrow on `info` so the arms can call &mut self methods. + match info.logical_nulls.clone() { + Some(_) if info.max_def_level == 0 => { + info.append_value_range(range); + } + Some(nulls) if matches!(info.def_levels, LevelData::Absent) => { + assert!(range.end <= nulls.len()); + info.extend_value_indices( + BitIndexIterator::new( + nulls.inner().values(), + nulls.offset() + range.start, + len, + ) + .map(|i| i + range.start), + ); + } + Some(nulls) => { + assert!(range.end <= nulls.len()); + let max_def_level = info.max_def_level; + // Bulk-fill is profitable only on null-heavy ranges long enough to + // amortize the slice/popcount cost; see `BULK_FILL_MIN_LEN` and the + // PR description for the threshold sweep. The gate uses the cached + // buffer-wide `null_count` (O(1)) to stay cheap on the cold path. + if len >= BULK_FILL_MIN_LEN && nulls.null_count() * 2 >= nulls.len() { + let range_nulls = nulls.slice(range.start, len); + let null_def_level = max_def_level - 1; + let buf = info + .def_levels + .materialize_mut() + .expect("definition levels present"); + let base = buf.len(); + buf.resize(base + len, null_def_level); + for i in range_nulls.valid_indices() { + buf[base + i] = max_def_level; } - } - None => { - info.append_def_level_run(max_def_level, len); - info.non_null_indices.reserve(len); - info.non_null_indices.extend(range.clone()); + info.extend_value_indices(range_nulls.valid_indices().map(|i| i + range.start)); + } else { + let bits = nulls.inner(); + info.extend_def_levels(range.clone().map(|i| { + // Safety: range.end was asserted to be in bounds earlier + let valid = unsafe { bits.value_unchecked(i) }; + max_def_level - (!valid as i16) + })); + info.extend_value_indices( + BitIndexIterator::new(bits.values(), bits.offset() + range.start, len) + .map(|i| i + range.start), + ); } } + None if matches!(info.def_levels, LevelData::Absent) => { + info.append_value_range(range); + } + None => { + info.append_def_level_run(info.max_def_level, len); + info.append_value_range(range); + } } if !matches!(info.rep_levels, LevelData::Absent) { - info.append_rep_level_run(info.max_rep_level, len); + info.append_rep_level_run(info.max_rep_level, len) } } @@ -849,17 +858,6 @@ impl LevelData { } } - pub(crate) fn slice(&self, offset: usize, len: usize) -> Self { - match self { - Self::Absent => Self::Absent, - Self::Materialized(values) => Self::Materialized(values[offset..offset + len].to_vec()), - Self::Uniform { value, .. } => Self::Uniform { - value: *value, - count: len, - }, - } - } - fn append_run(&mut self, value: i16, count: usize) { if count == 0 { return; @@ -920,6 +918,236 @@ impl LevelData { } } +#[derive(Debug, Clone)] +pub(crate) enum ValueSelection { + Empty, + Dense { offset: usize, len: usize }, + Sparse(Vec), +} + +impl PartialEq for ValueSelection { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Empty, Self::Empty) => true, + ( + Self::Dense { + offset: o1, + len: l1, + }, + Self::Dense { + offset: o2, + len: l2, + }, + ) => o1 == o2 && l1 == l2, + (Self::Sparse(a), Self::Sparse(b)) => a == b, + // Dense and explicit selections are equal when the explicit indices + // form the same contiguous range in order. + (Self::Dense { offset, len }, Self::Sparse(indices)) + | (Self::Sparse(indices), Self::Dense { offset, len }) => { + indices.len() == *len + && indices.first().copied() == Some(*offset) + && indices.windows(2).all(|w| w[1] == w[0] + 1) + } + _ => false, + } + } +} + +impl Eq for ValueSelection {} + +impl ValueSelection { + pub(crate) fn as_ref(&self) -> ValueSelectionRef<'_> { + match self { + Self::Empty => ValueSelectionRef::Empty, + Self::Dense { offset, len } => ValueSelectionRef::Dense { + offset: *offset, + len: *len, + }, + Self::Sparse(indices) => ValueSelectionRef::Sparse(indices), + } + } + + pub(crate) fn slice_ref(&self, offset: usize, len: usize) -> ValueSelectionRef<'_> { + self.as_ref().slice(offset, len) + } + + #[cfg(test)] + fn from_indices(indices: Vec) -> Self { + Self::from_sparse_indices(indices) + } + + fn from_sparse_indices(indices: Vec) -> Self { + match Self::contiguous_selection(&indices) { + None if indices.is_empty() => Self::Empty, + Some(offset) => Self::Dense { + offset, + len: indices.len(), + }, + None => Self::Sparse(indices), + } + } + + fn contiguous_selection(indices: &[usize]) -> Option { + let offset = indices.first().copied()?; + let mut expected = offset; + for &index in indices { + if index != expected { + return None; + } + expected = expected.checked_add(1)?; + } + Some(offset) + } + + fn append_range(&mut self, range: Range) { + if range.is_empty() { + return; + } + let range_len = range.end - range.start; + + match self { + Self::Empty => { + *self = Self::Dense { + offset: range.start, + len: range_len, + }; + } + Self::Dense { offset, len } if *offset + *len == range.start => { + *len += range_len; + } + Self::Dense { .. } => { + let mut indices = self.materialized_indices(); + indices.extend(range); + *self = Self::Sparse(indices); + } + Self::Sparse(indices) => indices.extend(range), + } + } + + fn extend_indices(&mut self, iter: I) + where + I: IntoIterator, + { + let mut iter = iter.into_iter(); + let (lower, upper) = iter.size_hint(); + let hinted = upper.unwrap_or(lower).max(1); + let Some(first) = iter.next() else { + return; + }; + + match self { + Self::Empty => { + let mut indices = Vec::with_capacity(hinted); + indices.extend(std::iter::once(first).chain(iter)); + *self = Self::from_sparse_indices(indices); + } + Self::Dense { offset, len } => { + if let Some(selection) = Self::extend_dense(*offset, len, first, iter, hinted) { + *self = selection; + } + } + Self::Sparse(indices) => indices.extend(std::iter::once(first).chain(iter)), + } + } + + fn extend_dense( + offset: usize, + len: &mut usize, + first: usize, + iter: I, + hinted: usize, + ) -> Option + where + I: Iterator, + { + let mut iter = iter; + let mut expected = offset + *len; + if first == expected { + *len += 1; + expected += 1; + + for index in iter.by_ref() { + if index == expected { + *len += 1; + expected += 1; + } else { + let selection = + Self::extend_materialized_dense(offset, *len, index, iter.by_ref(), hinted); + return Some(selection); + } + } + + return None; + } + + Some(Self::extend_materialized_dense( + offset, *len, first, iter, hinted, + )) + } + + fn extend_materialized_dense( + offset: usize, + len: usize, + first: usize, + iter: I, + hinted: usize, + ) -> Self + where + I: Iterator, + { + let mut indices = Vec::with_capacity(len + hinted); + indices.extend(offset..offset + len); + indices.extend(std::iter::once(first).chain(iter)); + Self::Sparse(indices) + } + + fn materialized_indices(&self) -> Vec { + match self { + Self::Empty => Vec::new(), + Self::Dense { offset, len } => (*offset..*offset + *len).collect(), + Self::Sparse(indices) => indices.clone(), + } + } +} + +#[derive(Clone, Copy)] +pub(crate) struct ArrayLevelsView<'a> { + array: &'a (dyn Array + 'static), + def_levels: LevelDataRef<'a>, + rep_levels: LevelDataRef<'a>, + values: ValueSelectionRef<'a>, +} + +impl<'a> ArrayLevelsView<'a> { + pub(crate) fn array(&self) -> &'a (dyn Array + 'static) { + self.array + } + + pub(crate) fn def_level_data(&self) -> LevelDataRef<'a> { + self.def_levels + } + + pub(crate) fn rep_level_data(&self) -> LevelDataRef<'a> { + self.rep_levels + } + + pub(crate) fn value_selection(&self) -> ValueSelectionRef<'a> { + self.values + } + + pub(crate) fn with_array<'b>(self, array: &'b (dyn Array + 'static)) -> ArrayLevelsView<'b> + where + 'a: 'b, + { + ArrayLevelsView { + array, + def_levels: self.def_levels, + rep_levels: self.rep_levels, + values: self.values, + } + } +} + #[derive(Debug, Clone)] pub(crate) struct ArrayLevels { /// Array's definition levels @@ -934,7 +1162,7 @@ pub(crate) struct ArrayLevels { /// The corresponding array identifying non-null slices of data /// from the primitive array - non_null_indices: Vec, + values: ValueSelection, /// The maximum definition level for this leaf column max_def_level: i16, @@ -953,7 +1181,7 @@ impl PartialEq for ArrayLevels { fn eq(&self, other: &Self) -> bool { self.def_levels == other.def_levels && self.rep_levels == other.rep_levels - && self.non_null_indices == other.non_null_indices + && self.values == other.values && self.max_def_level == other.max_def_level && self.max_rep_level == other.max_rep_level && self.array.as_ref() == other.array.as_ref() @@ -975,7 +1203,7 @@ impl ArrayLevels { Self { def_levels: LevelData::new(max_def_level != 0), rep_levels: LevelData::new(max_rep_level != 0), - non_null_indices: vec![], + values: ValueSelection::Empty, max_def_level, max_rep_level, array, @@ -995,41 +1223,35 @@ impl ArrayLevels { &self.rep_levels } - pub fn non_null_indices(&self) -> &[usize] { - &self.non_null_indices + pub(crate) fn value_selection(&self) -> &ValueSelection { + &self.values } - /// Create a sliced view of this `ArrayLevels` for a CDC chunk. - /// - /// The chunk's `value_offset`/`num_values` select the relevant slice of - /// `non_null_indices`. The array is sliced to the range covered by - /// those indices, and they are shifted to be relative to the slice. - pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self { - let def_levels = self.def_levels.slice(chunk.level_offset, chunk.num_levels); - let rep_levels = self.rep_levels.slice(chunk.level_offset, chunk.num_levels); - - // Select the non-null indices for this chunk. - let nni = &self.non_null_indices[chunk.value_offset..chunk.value_offset + chunk.num_values]; - // Compute the array range spanned by the non-null indices. - // When nni is empty (all-null chunk), start=0, end=0 → zero-length - // array slice; write_batch_internal will process only the def/rep - // levels and write no values. - let start = nni.first().copied().unwrap_or(0); - let end = nni.last().map_or(0, |&i| i + 1); - // Shift indices to be relative to the sliced array. - let non_null_indices = nni.iter().map(|&idx| idx - start).collect(); - // Slice the array to the computed range. - let array = self.array.slice(start, end - start); - let logical_nulls = array.logical_nulls(); + pub(crate) fn view(&self) -> ArrayLevelsView<'_> { + ArrayLevelsView { + array: self.array.as_ref(), + def_levels: self.def_levels.as_ref(), + rep_levels: self.rep_levels.as_ref(), + values: self.values.as_ref(), + } + } - Self { - def_levels, - rep_levels, - non_null_indices, - max_def_level: self.max_def_level, - max_rep_level: self.max_rep_level, - array, - logical_nulls, + /// Create a borrowed view of this `ArrayLevels` for a CDC chunk. + /// + /// Only the level streams and value selection are sliced. The Arrow leaf + /// array remains the original array so value indices stay absolute. + pub(crate) fn chunk_view(&self, chunk: &CdcChunk) -> ArrayLevelsView<'_> { + ArrayLevelsView { + array: self.array.as_ref(), + def_levels: self + .def_levels + .as_ref() + .slice(chunk.level_offset, chunk.num_levels), + rep_levels: self + .rep_levels + .as_ref() + .slice(chunk.level_offset, chunk.num_levels), + values: self.values.slice_ref(chunk.value_offset, chunk.num_values), } } @@ -1039,6 +1261,17 @@ impl ArrayLevels { self.rep_levels.append_run(rep_val, count); } + fn append_value_range(&mut self, range: Range) { + self.values.append_range(range); + } + + fn extend_value_indices(&mut self, iter: I) + where + I: IntoIterator, + { + self.values.extend_indices(iter); + } + fn append_def_level_run(&mut self, value: i16, count: usize) { self.def_levels.append_run(value, count); } @@ -1046,6 +1279,22 @@ impl ArrayLevels { fn append_rep_level_run(&mut self, value: i16, count: usize) { self.rep_levels.append_run(value, count); } + + fn extend_def_levels(&mut self, iter: I) + where + I: IntoIterator, + { + self.def_levels.extend_from_iter(iter); + } + + #[cfg(test)] + fn materialized_indices(&self) -> Vec { + match &self.values { + ValueSelection::Empty => Vec::new(), + ValueSelection::Dense { offset, len } => (*offset..*offset + *len).collect(), + ValueSelection::Sparse(indices) => indices.clone(), + } + } } #[cfg(test)] @@ -1098,7 +1347,7 @@ mod tests { let expected = ArrayLevels { def_levels: LevelData::Materialized(vec![2; 10]), rep_levels: LevelData::Materialized(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]), - non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + values: ValueSelection::from_indices(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), max_def_level: 2, max_rep_level: 2, array: Arc::new(primitives), @@ -1119,7 +1368,7 @@ mod tests { let expected_levels = ArrayLevels { def_levels: LevelData::Absent, rep_levels: LevelData::Absent, - non_null_indices: (0..10).collect(), + values: ValueSelection::from_indices((0..10).collect()), max_def_level: 0, max_rep_level: 0, array, @@ -1147,7 +1396,7 @@ mod tests { let expected_levels = ArrayLevels { def_levels: LevelData::Materialized(vec![1, 0, 1, 1, 0]), rep_levels: LevelData::Absent, - non_null_indices: vec![0, 2, 3], + values: ValueSelection::from_indices(vec![0, 2, 3]), max_def_level: 1, max_rep_level: 0, array, @@ -1156,6 +1405,26 @@ mod tests { assert_eq!(&levels[0], &expected_levels); } + #[test] + fn test_calculate_one_level_nullable_no_nulls_uses_uniform_dense() { + let array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let field = Field::new_list_field(DataType::Int32, true); + + let levels = calculate_array_levels(&array, &field).unwrap(); + assert_eq!(levels.len(), 1); + + let expected_levels = ArrayLevels { + def_levels: LevelData::Uniform { value: 1, count: 3 }, + rep_levels: LevelData::Absent, + values: ValueSelection::Dense { offset: 0, len: 3 }, + max_def_level: 1, + max_rep_level: 0, + array, + logical_nulls: None, + }; + assert_eq!(&levels[0], &expected_levels); + } + #[test] fn test_calculate_array_levels_1() { let leaf_field = Field::new_list_field(DataType::Int32, false); @@ -1182,7 +1451,7 @@ mod tests { let expected_levels = ArrayLevels { def_levels: LevelData::Materialized(vec![1; 5]), rep_levels: LevelData::Materialized(vec![0; 5]), - non_null_indices: (0..5).collect(), + values: ValueSelection::from_indices((0..5).collect()), max_def_level: 1, max_rep_level: 1, array: Arc::new(leaf_array), @@ -1216,7 +1485,7 @@ mod tests { let expected_levels = ArrayLevels { def_levels: LevelData::Materialized(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]), rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]), - non_null_indices: (0..11).collect(), + values: ValueSelection::from_indices((0..11).collect()), max_def_level: 2, max_rep_level: 1, array: Arc::new(leaf_array), @@ -1225,6 +1494,39 @@ mod tests { assert_eq!(&levels[0], &expected_levels); } + #[test] + fn test_write_list_interleaved_null_empty() { + let leaf_field = Field::new_list_field(DataType::Int32, false); + let list_type = DataType::List(Arc::new(leaf_field)); + + let leaf_array = Int32Array::from(vec![1, 2, 3]); + let offsets = Buffer::from_iter([0_i32, 0, 0, 2, 2, 2, 2, 3, 3]); + let null_bitmap = Buffer::from([0b11100110_u8]); + let list = ArrayDataBuilder::new(list_type.clone()) + .len(8) + .add_buffer(offsets) + .add_child_data(leaf_array.to_data()) + .null_bit_buffer(Some(null_bitmap)) + .build() + .unwrap(); + let list = make_array(list); + + let list_field = Field::new("list", list_type, true); + let levels = calculate_array_levels(&list, &list_field).unwrap(); + assert_eq!(levels.len(), 1); + let levels = &levels[0]; + + assert_eq!( + levels.def_level_data(), + &LevelData::Materialized(vec![0, 1, 2, 2, 0, 0, 1, 2, 1]), + ); + assert_eq!( + levels.rep_level_data(), + &LevelData::Materialized(vec![0, 0, 0, 1, 0, 0, 0, 0, 0]), + ); + assert_eq!(levels.materialized_indices(), vec![0, 1, 2]); + } + #[test] fn test_calculate_array_levels_2() { // If some values are null @@ -1266,7 +1568,7 @@ mod tests { let expected_levels = ArrayLevels { def_levels: LevelData::Materialized(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]), rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]), - non_null_indices: (4..11).collect(), + values: ValueSelection::from_indices((4..11).collect()), max_def_level: 3, max_rep_level: 1, array: Arc::new(leaf), @@ -1317,7 +1619,7 @@ mod tests { rep_levels: LevelData::Materialized(vec![ 0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2, 1, 2, ]), - non_null_indices: (0..22).collect(), + values: ValueSelection::from_indices((0..22).collect()), max_def_level: 5, max_rep_level: 2, array: Arc::new(leaf), @@ -1355,7 +1657,7 @@ mod tests { let expected_levels = ArrayLevels { def_levels: LevelData::Materialized(vec![1; 4]), rep_levels: LevelData::Materialized(vec![0; 4]), - non_null_indices: (0..4).collect(), + values: ValueSelection::from_indices((0..4).collect()), max_def_level: 1, max_rep_level: 1, array: Arc::new(leaf), @@ -1388,7 +1690,7 @@ mod tests { let expected_levels = ArrayLevels { def_levels: LevelData::Materialized(vec![1, 3, 3, 3, 3, 3, 3, 3]), rep_levels: LevelData::Materialized(vec![0, 0, 1, 1, 0, 1, 0, 1]), - non_null_indices: (0..7).collect(), + values: ValueSelection::from_indices((0..7).collect()), max_def_level: 3, max_rep_level: 1, array: Arc::new(leaf), @@ -1441,7 +1743,7 @@ mod tests { rep_levels: LevelData::Materialized(vec![ 0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2, ]), - non_null_indices: (0..15).collect(), + values: ValueSelection::from_indices((0..15).collect()), max_def_level: 5, max_rep_level: 2, array: Arc::new(leaf), @@ -1482,7 +1784,7 @@ mod tests { let expected_levels = ArrayLevels { def_levels: LevelData::Materialized(vec![3, 2, 3, 1, 0, 3]), rep_levels: LevelData::Absent, - non_null_indices: vec![0, 2, 5], + values: ValueSelection::from_indices(vec![0, 2, 5]), max_def_level: 3, max_rep_level: 0, array: leaf, @@ -1522,7 +1824,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![0, 3, 3, 3]), rep_levels: LevelData::Materialized(vec![0, 0, 1, 1]), - non_null_indices: vec![3, 4, 5], + values: ValueSelection::from_indices(vec![3, 4, 5]), max_def_level: 3, max_rep_level: 1, array: Arc::new(a_values), @@ -1615,7 +1917,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Absent, rep_levels: LevelData::Absent, - non_null_indices: vec![0, 1, 2, 3, 4], + values: ValueSelection::from_indices(vec![0, 1, 2, 3, 4]), max_def_level: 0, max_rep_level: 0, array: Arc::new(a), @@ -1630,7 +1932,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![1, 0, 0, 1, 1]), rep_levels: LevelData::Absent, - non_null_indices: vec![0, 3, 4], + values: ValueSelection::from_indices(vec![0, 3, 4]), max_def_level: 1, max_rep_level: 0, array: Arc::new(b), @@ -1645,7 +1947,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![1, 1, 1, 2, 1]), rep_levels: LevelData::Absent, - non_null_indices: vec![3], + values: ValueSelection::from_indices(vec![3]), max_def_level: 2, max_rep_level: 0, array: Arc::new(d), @@ -1660,7 +1962,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![3, 2, 3, 2, 3]), rep_levels: LevelData::Absent, - non_null_indices: vec![0, 2, 4], + values: ValueSelection::from_indices(vec![0, 2, 4]), max_def_level: 3, max_rep_level: 0, array: Arc::new(f), @@ -1768,7 +2070,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![1; 7]), rep_levels: LevelData::Materialized(vec![0, 1, 0, 1, 0, 1, 1]), - non_null_indices: vec![0, 1, 2, 3, 4, 5, 6], + values: ValueSelection::from_indices(vec![0, 1, 2, 3, 4, 5, 6]), max_def_level: 1, max_rep_level: 1, array: map.keys().clone(), @@ -1783,7 +2085,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![2, 2, 2, 1, 2, 1, 2]), rep_levels: LevelData::Materialized(vec![0, 1, 0, 1, 0, 1, 1]), - non_null_indices: vec![0, 1, 2, 4, 6], + values: ValueSelection::from_indices(vec![0, 1, 2, 4, 6]), max_def_level: 2, max_rep_level: 1, array: map.values().clone(), @@ -1870,7 +2172,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![4, 1, 0, 2, 2, 3, 4]), rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 1, 0, 0]), - non_null_indices: vec![0, 4], + values: ValueSelection::from_indices(vec![0, 4]), max_def_level: 4, max_rep_level: 1, array: values, @@ -1912,7 +2214,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]), rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]), - non_null_indices: vec![0, 1, 5, 6], + values: ValueSelection::from_indices(vec![0, 1, 5, 6]), max_def_level: 4, max_rep_level: 1, array: values, @@ -1999,7 +2301,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![0, 0, 1, 6, 5, 2, 3, 1]), rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 2, 0, 1, 0]), - non_null_indices: vec![1], + values: ValueSelection::from_indices(vec![1]), max_def_level: 6, max_rep_level: 2, array: a1_values, @@ -2012,7 +2314,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![0, 0, 1, 3, 2, 4, 1]), rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 0, 1, 0]), - non_null_indices: vec![4], + values: ValueSelection::from_indices(vec![4]), max_def_level: 4, max_rep_level: 1, array: a2_values, @@ -2052,7 +2354,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![0, 0, 3, 3]), rep_levels: LevelData::Materialized(vec![0, 0, 0, 1]), - non_null_indices: vec![6, 7], + values: ValueSelection::from_indices(vec![6, 7]), max_def_level: 3, max_rep_level: 1, array: values, @@ -2204,7 +2506,7 @@ mod tests { let expected_a = ArrayLevels { def_levels: LevelData::Materialized(vec![4, 2, 0, 2, 2, 3, 4]), rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1]), - non_null_indices: vec![0, 7], + values: ValueSelection::from_indices(vec![0, 7]), max_def_level: 4, max_rep_level: 1, array: values_a, @@ -2215,7 +2517,7 @@ mod tests { let expected_b = ArrayLevels { def_levels: LevelData::Materialized(vec![3, 2, 0, 2, 2, 3, 3]), rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1]), - non_null_indices: vec![0, 6, 7], + values: ValueSelection::from_indices(vec![0, 6, 7]), max_def_level: 3, max_rep_level: 1, array: values_b, @@ -2248,7 +2550,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![1, 0, 1]), rep_levels: LevelData::Materialized(vec![0, 0, 0]), - non_null_indices: vec![], + values: ValueSelection::from_indices(vec![]), max_def_level: 3, max_rep_level: 1, array: values, @@ -2285,7 +2587,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]), rep_levels: LevelData::Materialized(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]), - non_null_indices: vec![0, 2, 3, 4, 5], + values: ValueSelection::from_indices(vec![0, 2, 3, 4, 5]), max_def_level: 5, max_rep_level: 2, array: values, @@ -2318,7 +2620,7 @@ mod tests { let expected_level = ArrayLevels { def_levels: LevelData::Materialized(vec![0, 0, 1, 1]), rep_levels: LevelData::Absent, - non_null_indices: vec![2, 3], + values: ValueSelection::from_indices(vec![2, 3]), max_def_level: 1, max_rep_level: 0, array: Arc::new(dict), @@ -2348,38 +2650,44 @@ mod tests { } #[test] - fn test_slice_for_chunk_flat() { + fn test_chunk_view_flat() { // Case 1: required field (max_def_level=0, no def/rep levels stored). - // Array has 6 values; all are non-null so non_null_indices covers every position. - // value_offset=2, num_values=3 → non_null_indices[2..5] = [2,3,4]. - // Array is sliced (no def_levels → write_batch_internal uses values.len()). + // Array has 6 values; all are non-null so Dense{0,6} covers every position. + // value_offset=2, num_values=3 narrows the value selection to [2,3,4], + // while preserving the original leaf array. let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])); let logical_nulls = array.logical_nulls(); let levels = ArrayLevels { def_levels: LevelData::Absent, rep_levels: LevelData::Absent, - non_null_indices: vec![0, 1, 2, 3, 4, 5], + values: ValueSelection::from_indices(vec![0, 1, 2, 3, 4, 5]), max_def_level: 0, max_rep_level: 0, array, logical_nulls, }; - let sliced = levels.slice_for_chunk(&CdcChunk { + let view = levels.chunk_view(&CdcChunk { level_offset: 0, num_levels: 0, value_offset: 2, num_values: 3, }); - assert!(matches!(sliced.def_levels, LevelData::Absent)); - assert!(matches!(sliced.rep_levels, LevelData::Absent)); - assert_eq!(sliced.non_null_indices, vec![0, 1, 2]); - assert_eq!(sliced.array.len(), 3); + assert_eq!(view.def_level_data(), LevelDataRef::Absent); + assert_eq!(view.rep_level_data(), LevelDataRef::Absent); + assert_eq!( + view.value_selection(), + ValueSelectionRef::Dense { offset: 2, len: 3 } + ); + assert_eq!(view.array().len(), 6); // Case 2: optional field (max_def_level=1, def levels present, no rep levels). // Array: [Some(1), None, Some(3), None, Some(5), Some(6)] - // non_null_indices: [0, 2, 4, 5] - // value_offset=1, num_values=1 → non_null_indices[1..2] = [2]. - // Array is not sliced (def_levels present → num_levels from def_levels.len()). + // values: Sparse([0, 2, 4, 5]) (array positions of the four non-null values) + // def_levels: [1, 0, 1, 0, 1, 1] + // + // Chunk: level_offset=1, num_levels=3, value_offset=1, num_values=1. + // - sel = values[1..2] = [2] → non-null value at array position 2 + // - original array is preserved let array: ArrayRef = Arc::new(Int32Array::from(vec![ Some(1), None, @@ -2392,26 +2700,29 @@ mod tests { let levels = ArrayLevels { def_levels: LevelData::Materialized(vec![1, 0, 1, 0, 1, 1]), rep_levels: LevelData::Absent, - non_null_indices: vec![0, 2, 4, 5], + values: ValueSelection::from_indices(vec![0, 2, 4, 5]), max_def_level: 1, max_rep_level: 0, array, logical_nulls, }; - let sliced = levels.slice_for_chunk(&CdcChunk { + let view = levels.chunk_view(&CdcChunk { level_offset: 1, num_levels: 3, value_offset: 1, num_values: 1, }); - assert_eq!(sliced.def_levels, LevelData::Materialized(vec![0, 1, 0])); - assert!(matches!(sliced.rep_levels, LevelData::Absent)); - assert_eq!(sliced.non_null_indices, vec![0]); // [2] shifted by -2 (nni[0]) - assert_eq!(sliced.array.len(), 1); + assert_eq!( + view.def_level_data(), + LevelDataRef::Materialized(&[0, 1, 0]) + ); + assert_eq!(view.rep_level_data(), LevelDataRef::Absent); + assert_eq!(view.value_selection(), ValueSelectionRef::Sparse(&[2])); + assert_eq!(view.array().len(), 6); } #[test] - fn test_slice_for_chunk_nested_with_nulls() { + fn test_chunk_view_nested_with_nulls() { // Regression test for https://github.com/apache/arrow-rs/issues/9637 // // Simulates a List where null list entries have non-zero child @@ -2428,7 +2739,7 @@ mod tests { // // def_levels: [3, 0, 3, 2, 0, 3, 3] // rep_levels: [0, 0, 0, 1, 0, 0, 1] - // non_null_indices: [0, 3, 8, 9] + // value_selection: [0, 3, 8, 9] // gaps in array: 0→3 (skip 1,2), 3→8 (skip 5,6,7) let array: ArrayRef = Arc::new(Int32Array::from(vec![ Some(1), // 0: row 0 @@ -2446,72 +2757,189 @@ mod tests { let levels = ArrayLevels { def_levels: LevelData::Materialized(vec![3, 0, 3, 2, 0, 3, 3]), rep_levels: LevelData::Materialized(vec![0, 0, 0, 1, 0, 0, 1]), - non_null_indices: vec![0, 3, 8, 9], + values: ValueSelection::from_indices(vec![0, 3, 8, 9]), max_def_level: 3, max_rep_level: 1, array, logical_nulls, }; - // Chunk 0: rows 0-1, nni=[0] → array sliced to [0..1] - let chunk0 = levels.slice_for_chunk(&CdcChunk { + // Chunk 0: rows 0-1, value[0]=1 → sel[0]=[0] + let chunk0 = levels.chunk_view(&CdcChunk { level_offset: 0, num_levels: 2, value_offset: 0, num_values: 1, }); - assert_eq!(chunk0.non_null_indices, vec![0]); - assert_eq!(chunk0.array.len(), 1); + assert_eq!(chunk0.value_selection(), ValueSelectionRef::Sparse(&[0])); + assert_eq!(chunk0.def_level_data(), LevelDataRef::Materialized(&[3, 0])); + assert_eq!(chunk0.rep_level_data(), LevelDataRef::Materialized(&[0, 0])); + assert_eq!(chunk0.array().len(), 10); - // Chunk 1: rows 2-3, nni=[3] → array sliced to [3..4] - let chunk1 = levels.slice_for_chunk(&CdcChunk { + // Chunk 1: rows 2-3, value[1]=2 → sel[1]=[3] + let chunk1 = levels.chunk_view(&CdcChunk { level_offset: 2, num_levels: 3, value_offset: 1, num_values: 1, }); - assert_eq!(chunk1.non_null_indices, vec![0]); - assert_eq!(chunk1.array.len(), 1); + assert_eq!(chunk1.value_selection(), ValueSelectionRef::Sparse(&[3])); + assert_eq!( + chunk1.def_level_data(), + LevelDataRef::Materialized(&[3, 2, 0]) + ); + assert_eq!( + chunk1.rep_level_data(), + LevelDataRef::Materialized(&[0, 1, 0]) + ); + assert_eq!(chunk1.array().len(), 10); - // Chunk 2: row 4, nni=[8, 9] → array sliced to [8..10] - let chunk2 = levels.slice_for_chunk(&CdcChunk { + // Chunk 2: row 4, values[2..4]=[8,9] + let chunk2 = levels.chunk_view(&CdcChunk { level_offset: 5, num_levels: 2, value_offset: 2, num_values: 2, }); - assert_eq!(chunk2.non_null_indices, vec![0, 1]); - assert_eq!(chunk2.array.len(), 2); + assert_eq!(chunk2.value_selection(), ValueSelectionRef::Sparse(&[8, 9])); + assert_eq!(chunk2.def_level_data(), LevelDataRef::Materialized(&[3, 3])); + assert_eq!(chunk2.rep_level_data(), LevelDataRef::Materialized(&[0, 1])); + assert_eq!(chunk2.array().len(), 10); } #[test] - fn test_slice_for_chunk_all_null() { - // All-null chunk: num_values=0 → empty nni slice → zero-length array. + fn test_chunk_view_preserves_sparse_list_view_order() { + let leaf_field = Arc::new(Field::new_list_field(DataType::Int32, false)); + let values = Arc::new(Int32Array::from(vec![10, 11, 12, 13])); + let list_view = ListViewArray::new( + leaf_field.clone(), + vec![0, 2, 1, 3].into(), + vec![1, 1, 1, 1].into(), + values.clone(), + None, + ); + + let list_field = Field::new("list", DataType::ListView(leaf_field), false); + let levels = + calculate_array_levels(&(Arc::new(list_view) as ArrayRef), &list_field).unwrap(); + assert_eq!(levels.len(), 1); + let levels = &levels[0]; + + assert_eq!(levels.materialized_indices(), vec![0, 2, 1, 3]); + match levels.value_selection() { + ValueSelection::Sparse(indices) => assert_eq!(indices, &[0, 2, 1, 3]), + selection => panic!("expected Sparse, got {selection:?}"), + } + + let chunk = levels.chunk_view(&CdcChunk { + level_offset: 0, + num_levels: 4, + value_offset: 0, + num_values: 4, + }); + + assert_eq!( + chunk.value_selection(), + ValueSelectionRef::Sparse(&[0, 2, 1, 3]) + ); + assert_eq!(chunk.array().len(), 4); + } + + #[test] + fn test_chunk_view_all_null() { + // All-null chunk: num_values=0 keeps the original array and an empty selection. let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, None, Some(4)])); let logical_nulls = array.logical_nulls(); let levels = ArrayLevels { def_levels: LevelData::Materialized(vec![1, 0, 0, 1]), rep_levels: LevelData::Absent, - non_null_indices: vec![0, 3], + values: ValueSelection::from_indices(vec![0, 3]), max_def_level: 1, max_rep_level: 0, array, logical_nulls, }; // Chunk covering only the two null rows (levels 1..3), zero non-null values. - let sliced = levels.slice_for_chunk(&CdcChunk { + let view = levels.chunk_view(&CdcChunk { level_offset: 1, num_levels: 2, value_offset: 1, num_values: 0, }); - assert_eq!(sliced.def_levels, LevelData::Materialized(vec![0, 0])); - assert_eq!(sliced.non_null_indices, Vec::::new()); - assert_eq!(sliced.array.len(), 0); + assert_eq!(view.def_level_data(), LevelDataRef::Materialized(&[0, 0])); + assert_eq!(view.value_selection(), ValueSelectionRef::Sparse(&[])); + assert_eq!(view.array().len(), 4); + } + + #[test] + fn test_chunk_view_uniform_levels_and_dense_values() { + let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])); + let logical_nulls = array.logical_nulls(); + let levels = ArrayLevels { + def_levels: LevelData::Uniform { value: 1, count: 6 }, + rep_levels: LevelData::Absent, + values: ValueSelection::Dense { offset: 0, len: 6 }, + max_def_level: 1, + max_rep_level: 0, + array, + logical_nulls, + }; + + let view = levels.chunk_view(&CdcChunk { + level_offset: 2, + num_levels: 3, + value_offset: 2, + num_values: 3, + }); + + assert_eq!( + view.def_level_data(), + LevelDataRef::Uniform { value: 1, count: 3 } + ); + assert_eq!(view.rep_level_data(), LevelDataRef::Absent); + assert_eq!( + view.value_selection(), + ValueSelectionRef::Dense { offset: 2, len: 3 } + ); + assert_eq!(view.array().len(), 6); } #[test] fn test_all_null_list() { + // A list where every slot is null — hits the all-null fast path in write_list. + let leaf_field = Field::new_list_field(DataType::Int32, false); + let list_type = DataType::List(Arc::new(leaf_field)); + + let leaf_array = Int32Array::from(Vec::::new()); + let offsets = Buffer::from_iter([0_i32, 0, 0, 0]); + let null_bitmap = Buffer::from([0b00000000_u8]); // all null + let list = ArrayDataBuilder::new(list_type.clone()) + .len(3) + .add_buffer(offsets) + .add_child_data(leaf_array.to_data()) + .null_bit_buffer(Some(null_bitmap)) + .build() + .unwrap(); + let list = make_array(list); + + let list_field = Field::new("list", list_type, true); + let levels = calculate_array_levels(&list, &list_field).unwrap(); + assert_eq!(levels.len(), 1); + + let expected = ArrayLevels { + def_levels: LevelData::Uniform { value: 0, count: 3 }, + rep_levels: LevelData::Uniform { value: 0, count: 3 }, + values: ValueSelection::Empty, + max_def_level: 2, + max_rep_level: 1, + array: Arc::new(leaf_array), + logical_nulls: None, + }; + assert_eq!(&levels[0], &expected); + } + + #[test] + fn test_all_null_list_nullable_item() { // List where every list slot is null. // Schema: list (nullable) -> item (int32, nullable) // Data: [null, null, null, null] @@ -2530,7 +2958,7 @@ mod tests { let expected = ArrayLevels { def_levels: LevelData::Uniform { value: 0, count: 4 }, rep_levels: LevelData::Uniform { value: 0, count: 4 }, - non_null_indices: vec![], + values: ValueSelection::Empty, max_def_level: 3, max_rep_level: 1, array: values, @@ -2540,7 +2968,7 @@ mod tests { } #[test] - fn test_all_null_fixed_size_list() { + fn test_all_null_fixed_size_list_nullable_item() { // FixedSizeList where every list slot is null. // Schema: list (nullable) -> item (int32, nullable) // Data: [null, null, null] @@ -2559,7 +2987,7 @@ mod tests { let expected = ArrayLevels { def_levels: LevelData::Uniform { value: 0, count: 3 }, rep_levels: LevelData::Uniform { value: 0, count: 3 }, - non_null_indices: vec![], + values: ValueSelection::Empty, max_def_level: 3, max_rep_level: 1, array: values, @@ -2589,7 +3017,7 @@ mod tests { let expected = ArrayLevels { def_levels: LevelData::Uniform { value: 0, count: 4 }, rep_levels: LevelData::Absent, - non_null_indices: vec![], + values: ValueSelection::Empty, max_def_level: 2, max_rep_level: 0, array: leaf, @@ -2598,6 +3026,120 @@ mod tests { assert_eq!(&levels[0], &expected); } + #[test] + fn test_all_null_fixed_size_list() { + // A fixed-size list where every slot is null. Hits the all-null fast path + // in write_fixed_size_list. + let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 2); + builder.values().append_slice(&[0, 0]); + builder.append(false); + builder.values().append_slice(&[0, 0]); + builder.append(false); + builder.values().append_slice(&[0, 0]); + builder.append(false); + let a = builder.finish(); + let values = a.values().clone(); + + let item_field = Field::new_list_field(a.data_type().clone(), true); + let levels = calculate_array_levels(&(Arc::new(a) as ArrayRef), &item_field).unwrap(); + assert_eq!(levels.len(), 1); + + let logical_nulls = values.logical_nulls(); + let expected = ArrayLevels { + def_levels: LevelData::Uniform { value: 0, count: 3 }, + rep_levels: LevelData::Uniform { value: 0, count: 3 }, + values: ValueSelection::Empty, + max_def_level: 3, + max_rep_level: 1, + array: values, + logical_nulls, + }; + assert_eq!(&levels[0], &expected); + } + + #[test] + fn test_non_nullable_field_with_nulls_in_array() { + // A field declared non-nullable but the Arrow array physically has nulls. + // This produces def_levels: Absent (max_def_level == 0) with logical_nulls: Some. + // Preserve the historical required-field behavior by writing all slots, + // including slots marked null in the Arrow validity bitmap. + let array = Arc::new(Int32Array::from_iter([Some(1), None, Some(3)])) as ArrayRef; + let field = Field::new("item", DataType::Int32, false); + + let logical_nulls = array.logical_nulls(); + let levels = calculate_array_levels(&array, &field).unwrap(); + assert_eq!(levels.len(), 1); + + let expected = ArrayLevels { + def_levels: LevelData::Absent, + rep_levels: LevelData::Absent, + values: ValueSelection::Dense { offset: 0, len: 3 }, + max_def_level: 0, + max_rep_level: 0, + array, + logical_nulls, + }; + assert_eq!(&levels[0], &expected); + } + + #[test] + fn test_list_view_nullable() { + // [[1, 2], null, [], [3]] + let leaf_field = Arc::new(Field::new_list_field(DataType::Int32, false)); + let values = Arc::new(Int32Array::from(vec![1, 2, 3])); + let a = ListViewArray::new( + leaf_field.clone(), + vec![0, 0, 0, 2].into(), + vec![2, 0, 0, 1].into(), + values.clone(), + Some(vec![true, false, true, true].into()), + ); + + let list_field = Field::new("list", DataType::ListView(leaf_field), true); + let levels = calculate_array_levels(&(Arc::new(a) as ArrayRef), &list_field).unwrap(); + assert_eq!(levels.len(), 1); + + let expected = ArrayLevels { + def_levels: LevelData::Materialized(vec![2, 2, 0, 1, 2]), + rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 0]), + values: ValueSelection::from_indices(vec![0, 1, 2]), + max_def_level: 2, + max_rep_level: 1, + array: values as ArrayRef, + logical_nulls: None, + }; + assert_eq!(&levels[0], &expected); + } + + #[test] + fn test_list_view_non_null() { + // [[1, 2], [], [3]] + let leaf_field = Arc::new(Field::new_list_field(DataType::Int32, false)); + let values = Arc::new(Int32Array::from(vec![1, 2, 3])); + let a = ListViewArray::new( + leaf_field.clone(), + vec![0, 0, 2].into(), + vec![2, 0, 1].into(), + values.clone(), + None, + ); + + let list_field = Field::new("list", DataType::ListView(leaf_field), false); + let levels = calculate_array_levels(&(Arc::new(a) as ArrayRef), &list_field).unwrap(); + assert_eq!(levels.len(), 1); + + let expected = ArrayLevels { + def_levels: LevelData::Materialized(vec![1, 1, 0, 1]), + rep_levels: LevelData::Materialized(vec![0, 1, 0, 0]), + values: ValueSelection::from_indices(vec![0, 1, 2]), + max_def_level: 1, + max_rep_level: 1, + array: values as ArrayRef, + logical_nulls: None, + }; + assert_eq!(&levels[0], &expected); + } + #[test] fn test_all_null_nested_struct() { // Struct> where the outer struct is entirely null. @@ -2623,7 +3165,7 @@ mod tests { let expected = ArrayLevels { def_levels: LevelData::Uniform { value: 0, count: 3 }, rep_levels: LevelData::Absent, - non_null_indices: vec![], + values: ValueSelection::Empty, max_def_level: 3, max_rep_level: 0, array: leaf, @@ -2632,6 +3174,62 @@ mod tests { assert_eq!(&levels[0], &expected); } + #[test] + fn test_level_data_uniform_materialized_eq() { + let uniform = LevelData::Uniform { value: 1, count: 3 }; + let materialized = LevelData::Materialized(vec![1, 1, 1]); + assert_eq!(uniform, materialized); + assert_eq!(materialized, uniform); + + // Mismatch + let different = LevelData::Materialized(vec![1, 2, 1]); + assert_ne!(uniform, different); + } + + #[test] + fn test_value_selection_dense_sparse_eq() { + let dense = ValueSelection::Dense { offset: 2, len: 3 }; + let sparse = ValueSelection::Sparse(vec![2, 3, 4]); + assert_eq!(dense, sparse); + assert_eq!(sparse, dense); + + // Mismatch + let non_contiguous = ValueSelection::Sparse(vec![2, 4, 5]); + assert_ne!(dense, non_contiguous); + } + + #[test] + fn test_level_data_append_run_zero_count() { + let mut data = LevelData::Uniform { value: 1, count: 3 }; + data.append_run(1, 0); + assert_eq!(data, LevelData::Uniform { value: 1, count: 3 }); + + let mut materialized = LevelData::Materialized(vec![1, 2]); + materialized.append_run(3, 0); + assert_eq!(materialized, LevelData::Materialized(vec![1, 2])); + } + + #[test] + fn test_level_data_absent_materialize_is_none() { + let mut absent = LevelData::Absent; + assert!(absent.materialize_mut().is_none()); + } + + #[test] + fn test_value_selection_append_range_empty() { + let mut sel = ValueSelection::Dense { offset: 0, len: 3 }; + sel.append_range(0..0); + assert_eq!(sel, ValueSelection::Dense { offset: 0, len: 3 }); + } + + #[test] + fn test_value_selection_from_indices_unsorted_becomes_sparse() { + match ValueSelection::from_indices(vec![0, 2, 1, 3]) { + ValueSelection::Sparse(indices) => assert_eq!(indices, vec![0, 2, 1, 3]), + selection => panic!("expected Sparse, got {selection:?}"), + } + } + #[test] fn test_all_null_struct_multiple_children() { // Struct with two leaf children, entirely null. @@ -2657,7 +3255,7 @@ mod tests { let expected = ArrayLevels { def_levels: LevelData::Uniform { value: 0, count: 2 }, rep_levels: LevelData::Absent, - non_null_indices: vec![], + values: ValueSelection::Empty, max_def_level: 2, max_rep_level: 0, array: leaf, diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 79542caed9b7..fa3a71e8caba 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -41,7 +41,7 @@ use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter}; use crate::column::page_encryption::PageEncryptor; use crate::column::writer::encoder::ColumnValueEncoder; use crate::column::writer::{ - ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer, + ColumnCloseResult, ColumnWriter, GenericColumnWriter, ValueSelectionRef, get_column_writer, }; use crate::data_type::{ByteArray, FixedLenByteArray}; #[cfg(feature = "encryption")] @@ -53,7 +53,7 @@ use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift}; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor}; -use levels::{ArrayLevels, calculate_array_levels}; +use levels::{ArrayLevels, ArrayLevelsView, calculate_array_levels}; mod byte_array; mod levels; @@ -910,7 +910,7 @@ enum ArrowColumnWriterImpl { impl ArrowColumnWriter { /// Write an [`ArrowLeafColumn`] pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> { - self.write_internal(&col.0) + self.write_internal(col.0.view()) } /// Write with content-defined chunking, inserting page flushes at chunk boundaries. @@ -923,13 +923,30 @@ impl ArrowColumnWriter { let chunks = chunker.get_arrow_chunks( levels.def_level_data().as_ref(), levels.rep_level_data().as_ref(), - levels.array(), + levels.value_selection().as_ref(), + levels.array().as_ref(), )?; + let materialized_dictionary = match &self.writer { + ArrowColumnWriterImpl::Column(_) => match levels.array().as_any_dictionary_opt() { + Some(dictionary) => Some(arrow_select::take::take( + dictionary.values(), + dictionary.keys(), + None, + )?), + None => None, + }, + ArrowColumnWriterImpl::ByteArray(_) => None, + }; let num_chunks = chunks.len(); for (i, chunk) in chunks.iter().enumerate() { - let chunk_levels = levels.slice_for_chunk(chunk); - self.write_internal(&chunk_levels)?; + let chunk_levels = levels.chunk_view(chunk); + match materialized_dictionary.as_ref() { + Some(materialized) => { + self.write_internal(chunk_levels.with_array(materialized.as_ref()))? + } + None => self.write_internal(chunk_levels)?, + } // Add a page break after each chunk except the last if i + 1 < num_chunks { @@ -942,7 +959,7 @@ impl ArrowColumnWriter { Ok(()) } - fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> { + fn write_internal(&mut self, levels: ArrayLevelsView<'_>) -> Result<()> { match &mut self.writer { ArrowColumnWriterImpl::Column(c) => { let leaf = levels.array(); @@ -950,13 +967,13 @@ impl ArrowColumnWriter { Some(dictionary) => { let materialized = arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?; - write_leaf(c, &materialized, levels)? + write_leaf(c, materialized.as_ref(), levels)? } None => write_leaf(c, leaf, levels)?, }; } ArrowColumnWriterImpl::ByteArray(c) => { - write_primitive(c, levels.array().as_ref(), levels)?; + write_primitive(c, levels.array(), levels)?; } } Ok(()) @@ -1298,10 +1315,8 @@ impl ArrowColumnWriterFactory { fn write_leaf( writer: &mut ColumnWriter<'_>, column: &dyn arrow_array::Array, - levels: &ArrayLevels, + levels: ArrayLevelsView<'_>, ) -> Result { - let indices = levels.non_null_indices(); - match writer { // Note: this should match the contents of arrow_to_parquet_type ColumnWriter::Int32ColumnWriter(typed) => { @@ -1387,12 +1402,15 @@ fn write_leaf( } ColumnWriter::BoolColumnWriter(typed) => { let array = column.as_boolean(); - let values = get_bool_array_slice(array, indices.iter().copied()); + let values = get_bool_array_slice(array, levels.value_selection()); typed.write_batch_internal( values.as_slice(), - None, - levels.def_level_data().as_ref(), - levels.rep_level_data().as_ref(), + ValueSelectionRef::Dense { + offset: 0, + len: values.len(), + }, + levels.def_level_data(), + levels.rep_level_data(), None, None, None, @@ -1500,15 +1518,16 @@ fn write_leaf( unreachable!("should use ByteArrayWriter") } ColumnWriter::FixedLenByteArrayColumnWriter(typed) => { + let val_sel = levels.value_selection(); let bytes = match column.data_type() { ArrowDataType::Interval(interval_unit) => match interval_unit { IntervalUnit::YearMonth => { let array = column.as_primitive::(); - get_interval_ym_array_slice(array, indices.iter().copied()) + get_interval_ym_array_slice(array, val_sel) } IntervalUnit::DayTime => { let array = column.as_primitive::(); - get_interval_dt_array_slice(array, indices.iter().copied()) + get_interval_dt_array_slice(array, val_sel) } _ => { return Err(ParquetError::NYI(format!( @@ -1518,27 +1537,27 @@ fn write_leaf( }, ArrowDataType::FixedSizeBinary(_) => { let array = column.as_fixed_size_binary(); - get_fsb_array_slice(array, indices.iter().copied()) + get_fsb_array_slice(array, val_sel) } ArrowDataType::Decimal32(_, _) => { let array = column.as_primitive::(); - get_decimal_32_array_slice(array, indices.iter().copied()) + get_decimal_32_array_slice(array, val_sel) } ArrowDataType::Decimal64(_, _) => { let array = column.as_primitive::(); - get_decimal_64_array_slice(array, indices.iter().copied()) + get_decimal_64_array_slice(array, val_sel) } ArrowDataType::Decimal128(_, _) => { let array = column.as_primitive::(); - get_decimal_128_array_slice(array, indices.iter().copied()) + get_decimal_128_array_slice(array, val_sel) } ArrowDataType::Decimal256(_, _) => { let array = column.as_primitive::(); - get_decimal_256_array_slice(array, indices.iter().copied()) + get_decimal_256_array_slice(array, val_sel) } ArrowDataType::Float16 => { let array = column.as_primitive::(); - get_float_16_array_slice(array, indices.iter().copied()) + get_float_16_array_slice(array, val_sel) } _ => { return Err(ParquetError::NYI( @@ -1548,9 +1567,12 @@ fn write_leaf( }; typed.write_batch_internal( bytes.as_slice(), - None, - levels.def_level_data().as_ref(), - levels.rep_level_data().as_ref(), + ValueSelectionRef::Dense { + offset: 0, + len: bytes.len(), + }, + levels.def_level_data(), + levels.rep_level_data(), None, None, None, @@ -1562,13 +1584,13 @@ fn write_leaf( fn write_primitive( writer: &mut GenericColumnWriter, values: &E::Values, - levels: &ArrayLevels, + levels: ArrayLevelsView<'_>, ) -> Result { writer.write_batch_internal( values, - Some(levels.non_null_indices()), - levels.def_level_data().as_ref(), - levels.rep_level_data().as_ref(), + levels.value_selection(), + levels.def_level_data(), + levels.rep_level_data(), None, None, None, @@ -1577,126 +1599,106 @@ fn write_primitive( fn get_bool_array_slice( array: &arrow_array::BooleanArray, - indices: impl ExactSizeIterator, + selection: ValueSelectionRef<'_>, ) -> Vec { - let mut values = Vec::with_capacity(indices.len()); - for i in indices { - values.push(array.value(i)) - } - values + selection.map_indices(|i| array.value(i)) } /// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each). /// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated. fn get_interval_ym_array_slice( array: &arrow_array::IntervalYearMonthArray, - indices: impl ExactSizeIterator, + selection: ValueSelectionRef<'_>, ) -> Vec { - let mut values = Vec::with_capacity(indices.len()); - for i in indices { + selection.map_indices(|i| { let mut value = array.value(i).to_le_bytes().to_vec(); let mut suffix = vec![0; 8]; value.append(&mut suffix); - values.push(FixedLenByteArray::from(ByteArray::from(value))) - } - values + FixedLenByteArray::from(ByteArray::from(value)) + }) } /// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each). /// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated. fn get_interval_dt_array_slice( array: &arrow_array::IntervalDayTimeArray, - indices: impl ExactSizeIterator, + selection: ValueSelectionRef<'_>, ) -> Vec { - let mut values = Vec::with_capacity(indices.len()); - for i in indices { + selection.map_indices(|i| { let mut out = [0; 12]; let value = array.value(i); out[4..8].copy_from_slice(&value.days.to_le_bytes()); out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes()); - values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec()))); - } - values + FixedLenByteArray::from(ByteArray::from(out.to_vec())) + }) } fn get_decimal_32_array_slice( array: &arrow_array::Decimal32Array, - indices: impl ExactSizeIterator, + selection: ValueSelectionRef<'_>, ) -> Vec { - let mut values = Vec::with_capacity(indices.len()); let size = decimal_length_from_precision(array.precision()); - for i in indices { + selection.map_indices(|i| { let as_be_bytes = array.value(i).to_be_bytes(); let resized_value = as_be_bytes[(4 - size)..].to_vec(); - values.push(FixedLenByteArray::from(ByteArray::from(resized_value))); - } - values + FixedLenByteArray::from(ByteArray::from(resized_value)) + }) } fn get_decimal_64_array_slice( array: &arrow_array::Decimal64Array, - indices: impl ExactSizeIterator, + selection: ValueSelectionRef<'_>, ) -> Vec { - let mut values = Vec::with_capacity(indices.len()); let size = decimal_length_from_precision(array.precision()); - for i in indices { + selection.map_indices(|i| { let as_be_bytes = array.value(i).to_be_bytes(); let resized_value = as_be_bytes[(8 - size)..].to_vec(); - values.push(FixedLenByteArray::from(ByteArray::from(resized_value))); - } - values + FixedLenByteArray::from(ByteArray::from(resized_value)) + }) } fn get_decimal_128_array_slice( array: &arrow_array::Decimal128Array, - indices: impl ExactSizeIterator, + selection: ValueSelectionRef<'_>, ) -> Vec { - let mut values = Vec::with_capacity(indices.len()); let size = decimal_length_from_precision(array.precision()); - for i in indices { + selection.map_indices(|i| { let as_be_bytes = array.value(i).to_be_bytes(); let resized_value = as_be_bytes[(16 - size)..].to_vec(); - values.push(FixedLenByteArray::from(ByteArray::from(resized_value))); - } - values + FixedLenByteArray::from(ByteArray::from(resized_value)) + }) } fn get_decimal_256_array_slice( array: &arrow_array::Decimal256Array, - indices: impl ExactSizeIterator, + selection: ValueSelectionRef<'_>, ) -> Vec { - let mut values = Vec::with_capacity(indices.len()); let size = decimal_length_from_precision(array.precision()); - for i in indices { + selection.map_indices(|i| { let as_be_bytes = array.value(i).to_be_bytes(); let resized_value = as_be_bytes[(32 - size)..].to_vec(); - values.push(FixedLenByteArray::from(ByteArray::from(resized_value))); - } - values + FixedLenByteArray::from(ByteArray::from(resized_value)) + }) } fn get_float_16_array_slice( array: &arrow_array::Float16Array, - indices: impl ExactSizeIterator, + selection: ValueSelectionRef<'_>, ) -> Vec { - let mut values = Vec::with_capacity(indices.len()); - for i in indices { + selection.map_indices(|i| { let value = array.value(i).to_le_bytes().to_vec(); - values.push(FixedLenByteArray::from(ByteArray::from(value))); - } - values + FixedLenByteArray::from(ByteArray::from(value)) + }) } fn get_fsb_array_slice( array: &arrow_array::FixedSizeBinaryArray, - indices: impl ExactSizeIterator, + selection: ValueSelectionRef<'_>, ) -> Vec { - let mut values = Vec::with_capacity(indices.len()); - for i in indices { + selection.map_indices(|i| { let value = array.value(i).to_vec(); - values.push(FixedLenByteArray::from(ByteArray::from(value))) - } - values + FixedLenByteArray::from(ByteArray::from(value)) + }) } #[cfg(test)] diff --git a/parquet/src/column/chunker/cdc.rs b/parquet/src/column/chunker/cdc.rs index b40dd74a8d83..a6621a05897a 100644 --- a/parquet/src/column/chunker/cdc.rs +++ b/parquet/src/column/chunker/cdc.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#[cfg(feature = "arrow")] -use crate::column::writer::LevelDataRef; +use crate::column::writer::{LevelDataRef, ValueSelectionRef}; use crate::errors::{ParquetError, Result}; use crate::file::properties::CdcOptions; use crate::schema::types::ColumnDescriptor; @@ -90,9 +89,6 @@ pub(crate) struct ContentDefinedChunker { max_def_level: i16, /// Maximum repetition level for this column. max_rep_level: i16, - /// Definition level at the nearest REPEATED ancestor. - repeated_ancestor_def_level: i16, - /// Minimum chunk size in bytes. /// The rolling hash will not be updated until this size is reached for each chunk. /// All data sent through the hash function counts towards the chunk size, including @@ -129,7 +125,6 @@ impl ContentDefinedChunker { Ok(Self { max_def_level: desc.max_def_level(), max_rep_level: desc.max_rep_level(), - repeated_ancestor_def_level: desc.repeated_ancestor_def_level(), min_chunk_size: options.min_chunk_size as i64, max_chunk_size: options.max_chunk_size as i64, rolling_hash_mask, @@ -279,6 +274,7 @@ impl ContentDefinedChunker { &mut self, def_levels: LevelDataRef<'_>, rep_levels: LevelDataRef<'_>, + value_selection: ValueSelectionRef<'_>, num_levels: usize, mut roll_value: F, ) -> Vec @@ -302,7 +298,10 @@ impl ContentDefinedChunker { // level: 0 1 2 // value_offset: 0 1 2 for offset in 0..num_levels { - roll_value(self, offset); + let array_index = value_selection + .index_at(offset) + .expect("value selection required for non-null values"); + roll_value(self, array_index); if self.need_new_chunk() { chunks.push(CdcChunk { level_offset: prev_offset, @@ -331,10 +330,10 @@ impl ContentDefinedChunker { .expect("def_levels required when max_def_level > 0"); self.roll_level(def_level); if def_level == self.max_def_level { - // For non-nested data, the leaf array has one slot per - // level (nulls are array elements), so `offset` (the - // level index) is the correct array index for hashing. - roll_value(self, offset); + let array_index = value_selection + .index_at(value_offset) + .expect("value selection required for non-null values"); + roll_value(self, array_index); } // Check boundary before incrementing value_offset so that // num_values reflects only entries in the completed chunk. @@ -353,18 +352,11 @@ impl ContentDefinedChunker { } } } else { - // Nested data with nulls. Two counters are needed: - // - // leaf_offset: index into the leaf values array for hashing, - // incremented for all leaf slots (def >= repeated_ancestor_def_level), - // including null elements. - // - // value_offset: index into non_null_indices for chunk boundaries, - // incremented only for non-null leaf values (def == max_def_level). - // - // These diverge when nullable elements exist inside lists. + // Nested data with nulls. `value_offset` is the index into the + // logical Parquet value stream, and `value_selection` maps it to + // the corresponding Arrow leaf-array index. // - // Example: List with repeated_ancestor_def_level=2, max_def=3 + // Example: List with max_def=3 // row 0: [1, null, 2] (3 leaf slots, 2 non-null) // row 1: [3] (1 leaf slot, 1 non-null) // @@ -372,18 +364,13 @@ impl ContentDefinedChunker { // def_levels: [3, 2, 3, 3] // rep_levels: [0, 1, 1, 0] // - // level def leaf_offset value_offset action - // ───── ─── ─────────── ──────────── ────────────────────────── - // 0 3 0 0 roll_value(0), value++, leaf++ - // 1 2 1 1 leaf++ only (null element) - // 2 3 2 1 roll_value(2), value++, leaf++ - // 3 3 3 2 roll_value(3), value++, leaf++ - // - // roll_value(2) correctly indexes leaf array position 2 (value "2"). - // Using value_offset=1 would index position 1 (the null slot). - // - // Using value_offset for roll_value would hash the wrong array slot. - let mut leaf_offset: usize = 0; + // value_selection: [0, 2, 3] + // level def value_offset action + // ----- --- ------------ ------------------------- + // 0 3 0 roll_value(selection[0]) + // 1 2 1 null element, no value + // 2 3 1 roll_value(selection[1]) + // 3 3 2 roll_value(selection[2]) for offset in 0..num_levels { let def_level = def_levels @@ -396,7 +383,10 @@ impl ContentDefinedChunker { self.roll_level(def_level); self.roll_level(rep_level); if def_level == self.max_def_level { - roll_value(self, leaf_offset); + let array_index = value_selection + .index_at(value_offset) + .expect("value selection required for non-null values"); + roll_value(self, array_index); } // Check boundary before incrementing value_offset so that @@ -417,9 +407,6 @@ impl ContentDefinedChunker { if def_level == self.max_def_level { value_offset += 1; } - if def_level >= self.repeated_ancestor_def_level { - leaf_offset += 1; - } } } @@ -446,6 +433,7 @@ impl ContentDefinedChunker { &mut self, def_levels: LevelDataRef<'_>, rep_levels: LevelDataRef<'_>, + value_selection: ValueSelectionRef<'_>, array: &dyn arrow_array::Array, ) -> Result> { use arrow_array::cast::AsArray; @@ -456,7 +444,7 @@ impl ContentDefinedChunker { // levels. Always drive the loop by the level count; fall back to the // array length only when there are no levels at all. let num_levels = match (def_levels.len(), rep_levels.len()) { - (0, 0) => array.len(), + (0, 0) => value_selection.len(), (d, r) => d.max(r), }; @@ -465,31 +453,55 @@ impl ContentDefinedChunker { let data = array.to_data(); let buffer = data.buffers()[0].as_slice(); let values = &buffer[data.offset() * $N..]; - self.calculate(def_levels, rep_levels, num_levels, |c, i| { - let offset = i * $N; - let slice = &values[offset..offset + $N]; - c.roll_fixed::<$N>(slice.try_into().unwrap()); - }) + self.calculate( + def_levels, + rep_levels, + value_selection, + num_levels, + |c, i| { + let offset = i * $N; + let slice = &values[offset..offset + $N]; + c.roll_fixed::<$N>(slice.try_into().unwrap()); + }, + ) }}; } macro_rules! binary_like { ($a:expr) => {{ let a = $a; - self.calculate(def_levels, rep_levels, num_levels, |c, i| { - c.roll(a.value(i).as_ref()); - }) + self.calculate( + def_levels, + rep_levels, + value_selection, + num_levels, + |c, i| { + c.roll(a.value(i).as_ref()); + }, + ) }}; } let dtype = array.data_type(); let chunks = match dtype { - DataType::Null => self.calculate(def_levels, rep_levels, num_levels, |_, _| {}), + DataType::Null => self.calculate( + def_levels, + rep_levels, + value_selection, + num_levels, + |_, _| {}, + ), DataType::Boolean => { let a = array.as_boolean(); - self.calculate(def_levels, rep_levels, num_levels, |c, i| { - c.roll_fixed(&[a.value(i) as u8]); - }) + self.calculate( + def_levels, + rep_levels, + value_selection, + num_levels, + |c, i| { + c.roll_fixed(&[a.value(i) as u8]); + }, + ) } DataType::Int8 | DataType::UInt8 => fixed_width!(1), DataType::Int16 | DataType::UInt16 | DataType::Float16 => fixed_width!(2), @@ -521,7 +533,7 @@ impl ContentDefinedChunker { DataType::Utf8View => binary_like!(array.as_string_view()), DataType::Dictionary(_, _) => { let dict = array.as_any_dictionary(); - self.get_arrow_chunks(def_levels, rep_levels, dict.keys())? + self.get_arrow_chunks(def_levels, rep_levels, value_selection, dict.keys())? } _ => { return Err(ParquetError::General(format!( @@ -575,8 +587,7 @@ impl ContentDefinedChunker { mod tests { use super::*; use crate::basic::Type as PhysicalType; - #[cfg(feature = "arrow")] - use crate::column::writer::LevelDataRef; + use crate::column::writer::{LevelDataRef, ValueSelectionRef}; use crate::schema::types::{ColumnPath, Type}; use std::sync::Arc; @@ -629,6 +640,10 @@ mod tests { let chunks = chunker.calculate( LevelDataRef::Absent, LevelDataRef::Absent, + ValueSelectionRef::Dense { + offset: 0, + len: num_values, + }, num_values, |c, i| { c.roll_fixed::<4>(&(i as i32).to_le_bytes()); @@ -655,6 +670,10 @@ mod tests { let chunks = chunker.calculate( LevelDataRef::Absent, LevelDataRef::Absent, + ValueSelectionRef::Dense { + offset: 0, + len: num_values, + }, num_values, |c, i| { c.roll_fixed::<4>(&(i as i32).to_le_bytes()); @@ -689,10 +708,28 @@ mod tests { }; let mut chunker1 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap(); - let chunks1 = chunker1.calculate(LevelDataRef::Absent, LevelDataRef::Absent, 200, roll); + let chunks1 = chunker1.calculate( + LevelDataRef::Absent, + LevelDataRef::Absent, + ValueSelectionRef::Dense { + offset: 0, + len: 200, + }, + 200, + roll, + ); let mut chunker2 = ContentDefinedChunker::new(&make_desc(0, 0), &options).unwrap(); - let chunks2 = chunker2.calculate(LevelDataRef::Absent, LevelDataRef::Absent, 200, roll); + let chunks2 = chunker2.calculate( + LevelDataRef::Absent, + LevelDataRef::Absent, + ValueSelectionRef::Dense { + offset: 0, + len: 200, + }, + 200, + roll, + ); assert_eq!(chunks1.len(), chunks2.len()); for (a, b) in chunks1.iter().zip(chunks2.iter()) { @@ -719,10 +756,13 @@ mod tests { .map(|i| if i % 3 == 0 { 0 } else { 1 }) .collect(); let expected_non_null: usize = def_levels.iter().filter(|&&d| d == 1).count(); - let chunks = chunker.calculate( LevelDataRef::Materialized(&def_levels), LevelDataRef::Absent, + ValueSelectionRef::Dense { + offset: 0, + len: expected_non_null, + }, num_levels, |c, i| { c.roll_fixed::<4>(&(i as i32).to_le_bytes()); @@ -755,7 +795,7 @@ mod arrow_tests { use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use crate::arrow::arrow_writer::ArrowWriter; - use crate::column::writer::LevelDataRef; + use crate::column::writer::{LevelDataRef, ValueSelectionRef}; use crate::file::properties::{CdcOptions, WriterProperties}; use crate::file::reader::{FileReader, SerializedFileReader}; @@ -2239,13 +2279,26 @@ mod arrow_tests { let array: Int32Array = (0..n).map(|i| test_hash(0, i as u64) as i32).collect(); let mut chunker = super::ContentDefinedChunker::new(&desc, &options).unwrap(); let chunks = chunker - .get_arrow_chunks(LevelDataRef::Absent, LevelDataRef::Absent, &array) + .get_arrow_chunks( + LevelDataRef::Absent, + LevelDataRef::Absent, + ValueSelectionRef::Dense { offset: 0, len: n }, + &array, + ) .unwrap(); let sliced = array.slice(offset, n - offset); let mut chunker2 = super::ContentDefinedChunker::new(&desc, &options).unwrap(); let chunks2 = chunker2 - .get_arrow_chunks(LevelDataRef::Absent, LevelDataRef::Absent, &sliced) + .get_arrow_chunks( + LevelDataRef::Absent, + LevelDataRef::Absent, + ValueSelectionRef::Dense { + offset: 0, + len: n - offset, + }, + &sliced, + ) .unwrap(); let values: Vec = chunks.iter().map(|c| c.num_values).collect(); diff --git a/parquet/src/column/chunker/mod.rs b/parquet/src/column/chunker/mod.rs index 42631e026db4..daf758778927 100644 --- a/parquet/src/column/chunker/mod.rs +++ b/parquet/src/column/chunker/mod.rs @@ -33,8 +33,8 @@ pub(crate) struct CdcChunk { pub level_offset: usize, /// The number of levels in this chunk. pub num_levels: usize, - /// The start index into `non_null_indices` for this chunk. + /// The start index into the value selection for this chunk. pub value_offset: usize, - /// The number of `non_null_indices` entries in this chunk. + /// The number of value selection entries in this chunk. pub num_values: usize, } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 595eadbc90f2..d3128dd3add4 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -321,7 +321,7 @@ impl ColumnMetrics { /// /// The variants are different physical representations of the same logical /// sequence of levels. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum LevelDataRef<'a> { Absent, Materialized(&'a [i16]), @@ -375,6 +375,96 @@ impl<'a> LevelDataRef<'a> { } } +/// Borrowed view of a value selection, analogous to `&str` for `ValueSelection`'s `String`. +/// +/// This type exists so that [`GenericColumnWriter::write_batch_internal`] can accept value +/// selections from two callers without allocating: the public [`GenericColumnWriter::write_batch`] +/// API constructs `Dense` directly from the caller's values length, while the Arrow writer +/// borrows from `ArrayLevels` as either empty, dense, or sparse. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ValueSelectionRef<'a> { + #[cfg(feature = "arrow")] + Empty, + Dense { + offset: usize, + len: usize, + }, + #[cfg(feature = "arrow")] + Sparse(&'a [usize]), + #[cfg(not(feature = "arrow"))] + #[doc(hidden)] + _Phantom(std::marker::PhantomData<&'a ()>), +} + +impl<'a> ValueSelectionRef<'a> { + pub(crate) fn len(self) -> usize { + match self { + #[cfg(feature = "arrow")] + Self::Empty => 0, + Self::Dense { len, .. } => len, + #[cfg(feature = "arrow")] + Self::Sparse(indices) => indices.len(), + #[cfg(not(feature = "arrow"))] + Self::_Phantom(_) => unreachable!(), + } + } + + #[cfg(feature = "arrow")] + pub(crate) fn slice(self, offset: usize, len: usize) -> Self { + match self { + Self::Empty => { + debug_assert_eq!(offset, 0); + debug_assert_eq!(len, 0); + Self::Empty + } + Self::Dense { + offset: base, + len: selection_len, + } => { + debug_assert!(offset + len <= selection_len); + Self::Dense { + offset: base + offset, + len, + } + } + Self::Sparse(indices) => Self::Sparse(&indices[offset..offset + len]), + } + } + + #[cfg(feature = "arrow")] + pub(crate) fn index_at(self, idx: usize) -> Option { + match self { + Self::Empty => None, + Self::Dense { offset, len } => (idx < len).then_some(offset + idx), + Self::Sparse(indices) => indices.get(idx).copied(), + } + } + + #[cfg(feature = "arrow")] + pub(crate) fn for_each_index(self, mut f: impl FnMut(usize)) { + match self { + Self::Empty => {} + Self::Dense { offset, len } => { + for idx in offset..offset + len { + f(idx); + } + } + Self::Sparse(indices) => { + for &idx in indices { + f(idx); + } + } + } + } + + #[cfg(feature = "arrow")] + pub(crate) fn map_indices(self, mut f: impl FnMut(usize) -> T) -> Vec { + let mut values = Vec::with_capacity(self.len()); + self.for_each_index(|i| values.push(f(i))); + values + } +} + /// Typed column writer for a primitive column. pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl>; @@ -484,7 +574,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { pub(crate) fn write_batch_internal( &mut self, values: &E::Values, - value_indices: Option<&[usize]>, + value_selection: ValueSelectionRef<'_>, def_levels: LevelDataRef<'_>, rep_levels: LevelDataRef<'_>, min: Option<&E::T>, @@ -506,14 +596,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // The purpose of this chunking is to bound this. Even if a user writes large // number of values, the chunking will ensure that we add data page at a // reasonable pagesize limit. - - // TODO: find out why we don't account for size of levels when we estimate page - // size. let num_levels = def_levels.len().max(rep_levels.len()); let num_levels = if num_levels > 0 { num_levels } else { - value_indices.map_or(values.len(), |i| i.len()) + value_selection.len() }; if let Some(min) = min { @@ -538,7 +625,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { || !matches!(rep_levels, LevelDataRef::Absent); // When both level vectors are compact (Uniform or Absent), there is no // materialized slice to split and the per-mini-batch work is O(1), so we - // can safely use a much larger batch size. + // can safely use a much larger batch size. We use + // `data_page_row_count_limit` (default 20 000) instead of the normal + // `write_batch_size` (default 1 024) to amortise the per-batch overhead + // while still respecting the page row-count ceiling. let base_batch_size = if both_levels_compact && has_levels { self.props.data_page_row_count_limit() } else { @@ -557,7 +647,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { values_offset += self.write_mini_batch( values, values_offset, - value_indices, + value_selection, end_offset - levels_offset, def_levels.slice(levels_offset, end_offset - levels_offset), rep_levels.slice(levels_offset, end_offset - levels_offset), @@ -589,7 +679,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { ) -> Result { self.write_batch_internal( values, - None, + ValueSelectionRef::Dense { + offset: 0, + len: values.len(), + }, LevelDataRef::from(def_levels), LevelDataRef::from(rep_levels), None, @@ -616,7 +709,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { ) -> Result { self.write_batch_internal( values, - None, + ValueSelectionRef::Dense { + offset: 0, + len: values.len(), + }, LevelDataRef::from(def_levels), LevelDataRef::from(rep_levels), min, @@ -727,7 +823,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { &mut self, values: &E::Values, values_offset: usize, - value_indices: Option<&[usize]>, + value_selection: ValueSelectionRef<'_>, num_levels: usize, def_levels: LevelDataRef<'_>, rep_levels: LevelDataRef<'_>, @@ -834,12 +930,27 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.page_metrics.num_buffered_rows += num_levels as u32; } - match value_indices { - Some(indices) => { + match value_selection { + #[cfg(feature = "arrow")] + ValueSelectionRef::Empty => {} + #[cfg(feature = "arrow")] + ValueSelectionRef::Sparse(indices) => { + debug_assert!( + values_offset + values_to_write <= indices.len(), + "Sparse value selection out of bounds: \ + values_offset={values_offset} values_to_write={values_to_write} \ + indices.len()={}", + indices.len() + ); let indices = &indices[values_offset..values_offset + values_to_write]; self.encoder.write_gather(values, indices)?; } - None => self.encoder.write(values, values_offset, values_to_write)?, + ValueSelectionRef::Dense { offset, .. } => { + self.encoder + .write(values, offset + values_offset, values_to_write)? + } + #[cfg(not(feature = "arrow"))] + ValueSelectionRef::_Phantom(_) => unreachable!(), } self.page_metrics.num_buffered_values += num_levels as u32; @@ -1790,6 +1901,17 @@ mod tests { } } + #[test] + fn test_column_writer_levels_limit_values_written() { + let page_writer = get_test_page_writer(); + let props = Default::default(); + let mut writer = get_test_column_writer::(page_writer, 1, 0, props); + + let values_written = writer.write_batch(&[1, 2, 3], Some(&[1, 0]), None).unwrap(); + + assert_eq!(values_written, 1); + } + #[test] fn test_column_writer_write_only_one_dictionary_page() { let page_writer = get_test_page_writer(); @@ -4656,10 +4778,15 @@ mod tests { Arc::new(self.props), ); + let value_selection = ValueSelectionRef::Dense { + offset: 0, + len: self.values.len(), + }; + writer .write_batch_internal( self.values, - None, + value_selection, self.def_levels, self.rep_levels, None, @@ -4784,4 +4911,65 @@ mod tests { .run(); } } + + #[cfg(feature = "arrow")] + #[test] + fn test_sparse_value_selection() { + // Nullable column with a mix of nulls and values. + // def_levels: [1, 0, 1, 0, 1] — values at indices 0, 2, 4. + // ValueSelectionRef::Sparse picks out the non-null positions. + let max_def_level = 1; + let all_values: Vec = vec![10, 20, 30, 40, 50]; + let def_levels: &[i16] = &[1, 0, 1, 0, 1]; + let non_null_indices: &[usize] = &[0, 2, 4]; + + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let mut writer = get_test_column_writer::( + page_writer, + max_def_level, + 0, + Arc::new(WriterProperties::default()), + ); + + writer + .write_batch_internal( + &all_values, + ValueSelectionRef::Sparse(non_null_indices), + LevelDataRef::Materialized(def_levels), + LevelDataRef::Absent, + None, + None, + None, + ) + .unwrap(); + let result = writer.close().unwrap(); + drop(write); + + let props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &result.metadata, + result.rows_written as usize, + None, + Arc::new(props), + ) + .unwrap(), + ); + let mut reader = get_test_column_reader::(page_reader, max_def_level, 0); + + let mut actual_values = Vec::with_capacity(5); + let mut actual_def = Vec::with_capacity(5); + + let (_, values_read, levels_read) = reader + .read_records(5, Some(&mut actual_def), None, &mut actual_values) + .unwrap(); + + assert_eq!(&actual_values[..values_read], &[10, 30, 50]); + assert_eq!(&actual_def[..levels_read], def_levels); + } }