Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion parquet/benches/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use arrow::{record_batch::RecordBatch, util::data_gen::*};
use arrow_array::RecordBatchOptions;
use parquet::arrow::ArrowSchemaConverter;
use parquet::errors::Result;
use parquet::file::properties::{WriterProperties, WriterVersion};
use parquet::file::properties::{CdcOptions, WriterProperties, WriterVersion};
use parquet::file::writer::SerializedFileWriter;

fn create_primitive_bench_batch(
Expand Down Expand Up @@ -440,6 +440,11 @@ fn create_writer_props() -> Vec<(&'static str, WriterProperties)> {
.build();
props.push(("zstd_parquet_2", prop));

let prop = WriterProperties::builder()
.set_content_defined_chunking(Some(CdcOptions::default()))
.build();
props.push(("cdc", prop));

props
}

Expand Down
181 changes: 181 additions & 0 deletions parquet/src/arrow/arrow_writer/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
//!
//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)

use crate::column::chunker::Chunk;
use crate::errors::{ParquetError, Result};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
Expand Down Expand Up @@ -801,11 +802,55 @@ impl ArrayLevels {
pub fn non_null_indices(&self) -> &[usize] {
&self.non_null_indices
}

/// Create a sliced view of this `ArrayLevels` for a CDC chunk.
pub(crate) fn slice_for_chunk(&self, chunk: &Chunk) -> Self {
let level_offset = chunk.level_offset;
let num_levels = chunk.num_levels;
let value_offset = chunk.value_offset;
let num_values = chunk.num_values;
let def_levels = self
.def_levels
.as_ref()
.map(|levels| levels[level_offset..level_offset + num_levels].to_vec());
let rep_levels = self
.rep_levels
.as_ref()
.map(|levels| levels[level_offset..level_offset + num_levels].to_vec());

// Filter non_null_indices to [value_offset, value_offset + num_values)
// and shift by -value_offset. Use binary search since the slice is sorted.
let value_end = value_offset + num_values;
let start = self
.non_null_indices
.partition_point(|&idx| idx < value_offset);
let end = self
.non_null_indices
.partition_point(|&idx| idx < value_end);
let non_null_indices: Vec<usize> = self.non_null_indices[start..end]
.iter()
.map(|&idx| idx - value_offset)
.collect();

let array = self.array.slice(value_offset, num_values);
let logical_nulls = array.logical_nulls();

Self {
def_levels,
rep_levels,
non_null_indices,
max_def_level: self.max_def_level,
max_rep_level: self.max_rep_level,
array,
logical_nulls,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::column::chunker::Chunk;

use arrow_array::builder::*;
use arrow_array::types::Int32Type;
Expand Down Expand Up @@ -2096,4 +2141,140 @@ mod tests {
let v = Arc::new(array) as ArrayRef;
LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
}

#[test]
fn test_slice_for_chunk_flat() {
// Required field (no levels): array [1..=6], slice values 2..5
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: None,
rep_levels: None,
non_null_indices: vec![0, 1, 2, 3, 4, 5],
max_def_level: 0,
max_rep_level: 0,
array,
logical_nulls,
};
let sliced = levels.slice_for_chunk(&Chunk {
level_offset: 0,
num_levels: 0,
value_offset: 2,
num_values: 3,
});
assert!(sliced.def_levels.is_none());
assert!(sliced.rep_levels.is_none());
assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
assert_eq!(sliced.array.len(), 3);

// Optional field (def levels only): [1, null, 3, null, 5, 6]
// Slice levels 1..4 (def=[0,1,0]), values 1..4 → non_null_indices [2]→[1]
let array: ArrayRef = Arc::new(Int32Array::from(vec![
Some(1),
None,
Some(3),
None,
Some(5),
Some(6),
]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: Some(vec![1, 0, 1, 0, 1, 1]),
rep_levels: None,
non_null_indices: vec![0, 2, 4, 5],
max_def_level: 1,
max_rep_level: 0,
array,
logical_nulls,
};
let sliced = levels.slice_for_chunk(&Chunk {
level_offset: 1,
num_levels: 3,
value_offset: 1,
num_values: 3,
});
assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
assert!(sliced.rep_levels.is_none());
assert_eq!(sliced.non_null_indices, vec![1]);
assert_eq!(sliced.array.len(), 3);
}

#[test]
fn test_slice_for_chunk_nested() {
// [[1,2],[3],[4,5]]: def=[2,2,2,2,2], rep=[0,1,0,0,1]
// Slice levels 2..5 (def=[2,2,2], rep=[0,0,1]), values 2..5
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: Some(vec![2, 2, 2, 2, 2]),
rep_levels: Some(vec![0, 1, 0, 0, 1]),
non_null_indices: vec![0, 1, 2, 3, 4],
max_def_level: 2,
max_rep_level: 1,
array,
logical_nulls,
};
let sliced = levels.slice_for_chunk(&Chunk {
level_offset: 2,
num_levels: 3,
value_offset: 2,
num_values: 3,
});
assert_eq!(sliced.def_levels, Some(vec![2, 2, 2]));
assert_eq!(sliced.rep_levels, Some(vec![0, 0, 1]));
// [0,1,2,3,4] filtered to [2,5) → [2,3,4] → shifted -2 → [0,1,2]
assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
assert_eq!(sliced.array.len(), 3);
}

#[test]
fn test_slice_for_chunk_non_null_indices_boundary() {
// [1, null, 3]: non_null_indices=[0, 2]; test inclusive lower / exclusive upper bounds
let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: Some(vec![1, 0, 1]),
rep_levels: None,
non_null_indices: vec![0, 2],
max_def_level: 1,
max_rep_level: 0,
array,
logical_nulls,
};
assert_eq!(
levels
.slice_for_chunk(&Chunk {
level_offset: 0,
num_levels: 1,
value_offset: 0,
num_values: 1
})
.non_null_indices,
vec![0]
);
// idx 2 in range [1,3), shifted -1 → 1
assert_eq!(
levels
.slice_for_chunk(&Chunk {
level_offset: 1,
num_levels: 2,
value_offset: 1,
num_values: 2
})
.non_null_indices,
vec![1]
);
// idx 2 excluded from [1,2)
assert_eq!(
levels
.slice_for_chunk(&Chunk {
level_offset: 1,
num_levels: 1,
value_offset: 1,
num_values: 1
})
.non_null_indices,
Vec::<usize>::new()
);
}
}
97 changes: 86 additions & 11 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! Contains writer which writes arrow data into parquet data.

use crate::column::chunker::ContentDefinedChunker;

use bytes::Bytes;
use std::io::{Read, Write};
use std::iter::Peekable;
Expand Down Expand Up @@ -192,6 +194,9 @@ pub struct ArrowWriter<W: Write> {

/// The maximum size in bytes for a row group, or None for unlimited
max_row_group_bytes: Option<usize>,

/// CDC chunkers persisted across row groups (one per leaf column).
cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
}

impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
Expand Down Expand Up @@ -261,13 +266,27 @@ impl<W: Write + Send> ArrowWriter<W> {
let row_group_writer_factory =
ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());

let cdc_chunkers = match props_ptr.content_defined_chunking() {
Some(opts) => {
let chunkers = file_writer
.schema_descr()
.columns()
.iter()
.map(|desc| ContentDefinedChunker::new(desc, opts))
.collect::<Result<Vec<_>>>()?;
Some(chunkers)
}
None => None,
};

Ok(Self {
writer: file_writer,
in_progress: None,
arrow_schema,
row_group_writer_factory,
max_row_group_row_count,
max_row_group_bytes,
cdc_chunkers,
})
}

Expand Down Expand Up @@ -335,10 +354,12 @@ impl<W: Write + Send> ArrowWriter<W> {

let in_progress = match &mut self.in_progress {
Some(in_progress) => in_progress,
x => x.insert(
self.row_group_writer_factory
.create_row_group_writer(self.writer.flushed_row_groups().len())?,
),
x => {
let rg = self
.row_group_writer_factory
.create_row_group_writer(self.writer.flushed_row_groups().len())?;
x.insert(rg)
}
};

if let Some(max_rows) = self.max_row_group_row_count {
Expand Down Expand Up @@ -383,7 +404,10 @@ impl<W: Write + Send> ArrowWriter<W> {
}
}

in_progress.write(batch)?;
match self.cdc_chunkers.as_mut() {
Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
None => in_progress.write(batch)?,
}

let should_flush = self
.max_row_group_row_count
Expand Down Expand Up @@ -421,8 +445,10 @@ impl<W: Write + Send> ArrowWriter<W> {
None => return Ok(()),
};

let chunks = in_progress.close()?;

let mut row_group_writer = self.writer.next_row_group()?;
for chunk in in_progress.close()? {
for chunk in chunks {
chunk.append_to_row_group(&mut row_group_writer)?;
}
row_group_writer.close()?;
Expand Down Expand Up @@ -869,20 +895,50 @@ enum ArrowColumnWriterImpl {
impl ArrowColumnWriter {
/// Write an [`ArrowLeafColumn`]
pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
self.write_internal(&col.0)
}

/// Write with content-defined chunking, inserting page flushes at chunk boundaries.
fn write_with_chunker(
&mut self,
col: &ArrowLeafColumn,
chunker: &mut ContentDefinedChunker,
) -> Result<()> {
let levels = &col.0;
let chunks =
chunker.get_arrow_chunks(levels.def_levels(), levels.rep_levels(), levels.array())?;

let num_chunks = chunks.len();
for (i, chunk) in chunks.iter().enumerate() {
let chunk_levels = levels.slice_for_chunk(chunk);
self.write_internal(&chunk_levels)?;

// Add a page break after each chunk except the last
if i + 1 < num_chunks {
match &mut self.writer {
ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
}
}
}
Ok(())
}

fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
match &mut self.writer {
ArrowColumnWriterImpl::Column(c) => {
let leaf = col.0.array();
let leaf = levels.array();
match leaf.as_any_dictionary_opt() {
Some(dictionary) => {
let materialized =
arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
write_leaf(c, &materialized, &col.0)?
write_leaf(c, &materialized, levels)?
}
None => write_leaf(c, leaf, &col.0)?,
None => write_leaf(c, leaf, levels)?,
};
}
ArrowColumnWriterImpl::ByteArray(c) => {
write_primitive(c, col.0.array().as_ref(), &col.0)?;
write_primitive(c, levels.array().as_ref(), levels)?;
}
}
Ok(())
Expand Down Expand Up @@ -958,7 +1014,26 @@ impl ArrowRowGroupWriter {
let mut writers = self.writers.iter_mut();
for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
for leaf in compute_leaves(field.as_ref(), column)? {
writers.next().unwrap().write(&leaf)?
writers.next().unwrap().write(&leaf)?;
}
}
Ok(())
}

fn write_with_chunkers(
&mut self,
batch: &RecordBatch,
chunkers: &mut [ContentDefinedChunker],
) -> Result<()> {
self.buffered_rows += batch.num_rows();
let mut writers = self.writers.iter_mut();
let mut chunkers = chunkers.iter_mut();
for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
for leaf in compute_leaves(field.as_ref(), column)? {
writers
.next()
.unwrap()
.write_with_chunker(&leaf, chunkers.next().unwrap())?;
}
}
Ok(())
Expand Down
Loading
Loading