Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor(parquet): hold the cdc chunkers in ArrowWriter
  • Loading branch information
kszucs committed Feb 22, 2026
commit a1d57247fb44842653dd7bf09f5f565bb5f9d3d3
155 changes: 77 additions & 78 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Contains writer which writes arrow data into parquet data.

use crate::column::ContentDefinedChunker;
use crate::column::chunker::ContentDefinedChunker;

use bytes::Bytes;
use std::io::{Read, Write};
Expand Down Expand Up @@ -194,6 +194,10 @@ pub struct ArrowWriter<W: Write> {

/// The maximum size in bytes for a row group, or None for unlimited
max_row_group_bytes: Option<usize>,

/// CDC chunkers persisted across row groups (one per leaf column).
/// Moved into `ArrowRowGroupWriter` for each row group, then returned on close.
cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
}

impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
Expand Down Expand Up @@ -263,13 +267,27 @@ impl<W: Write + Send> ArrowWriter<W> {
let row_group_writer_factory =
ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());

let cdc_chunkers = match props_ptr.cdc_options() {
Some(opts) => {
let chunkers = file_writer
.schema_descr()
.columns()
.iter()
.map(|desc| ContentDefinedChunker::new(desc, opts))
.collect::<Result<Vec<_>>>()?;
Some(chunkers)
}
None => None,
};

Ok(Self {
writer: file_writer,
in_progress: None,
arrow_schema,
row_group_writer_factory,
max_row_group_row_count,
max_row_group_bytes,
cdc_chunkers,
})
}

Expand Down Expand Up @@ -338,9 +356,10 @@ impl<W: Write + Send> ArrowWriter<W> {
let in_progress = match &mut self.in_progress {
Some(in_progress) => in_progress,
x => {
let rg = self
.row_group_writer_factory
.create_row_group_writer(self.writer.flushed_row_groups().len())?;
let rg = self.row_group_writer_factory.create_row_group_writer(
self.writer.flushed_row_groups().len(),
self.cdc_chunkers.take(),
)?;
x.insert(rg)
}
};
Expand Down Expand Up @@ -426,7 +445,7 @@ impl<W: Write + Send> ArrowWriter<W> {
};

let (chunks, chunkers) = in_progress.close()?;
self.row_group_writer_factory.cdc_chunkers = chunkers;
self.cdc_chunkers = chunkers;

let mut row_group_writer = self.writer.next_row_group()?;
for chunk in chunks {
Expand Down Expand Up @@ -488,9 +507,10 @@ impl<W: Write + Send> ArrowWriter<W> {
)]
pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
self.flush()?;
let in_progress = self
.row_group_writer_factory
.create_row_group_writer(self.writer.flushed_row_groups().len())?;
let in_progress = self.row_group_writer_factory.create_row_group_writer(
self.writer.flushed_row_groups().len(),
self.cdc_chunkers.take(),
)?;
Ok(in_progress.writers)
}

Expand Down Expand Up @@ -860,7 +880,6 @@ impl ArrowColumnChunk {
pub struct ArrowColumnWriter {
writer: ArrowColumnWriterImpl,
chunk: SharedColumnChunk,
chunker: Option<ContentDefinedChunker>,
}

impl std::fmt::Debug for ArrowColumnWriter {
Expand All @@ -877,24 +896,28 @@ enum ArrowColumnWriterImpl {
impl ArrowColumnWriter {
/// Write an [`ArrowLeafColumn`]
pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
let levels = &col.0;
self.write_internal(&col.0)
}

if let Some(chunker) = self.chunker.as_mut() {
let chunks =
chunker.get_arrow_chunks(levels.def_levels(), levels.rep_levels(), levels.array())?;
/// Write with content-defined chunking, inserting page flushes at chunk boundaries.
fn write_with_chunker(
&mut self,
col: &ArrowLeafColumn,
chunker: &mut ContentDefinedChunker,
) -> Result<()> {
let levels = &col.0;
let chunks =
chunker.get_arrow_chunks(levels.def_levels(), levels.rep_levels(), levels.array())?;

let num_chunks = chunks.len();
for (i, chunk) in chunks.iter().enumerate() {
let chunk_levels = levels.slice_for_chunk(chunk);
self.write_internal(&chunk_levels)?;
let num_chunks = chunks.len();
for (i, chunk) in chunks.iter().enumerate() {
let chunk_levels = levels.slice_for_chunk(chunk);
self.write_internal(&chunk_levels)?;

// Flush the page after each chunk except the last
if i + 1 < num_chunks {
self.flush_current_page()?;
}
// Flush the page after each chunk except the last
if i + 1 < num_chunks {
self.flush_current_page()?;
}
} else {
self.write_internal(levels)?;
}
Ok(())
}
Expand All @@ -907,7 +930,7 @@ impl ArrowColumnWriter {
Some(dictionary) => {
let materialized =
arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
write_leaf(c, &materialized, levels)?;
write_leaf(c, &materialized, levels)?
}
None => write_leaf(c, leaf, levels)?,
};
Expand Down Expand Up @@ -980,23 +1003,34 @@ struct ArrowRowGroupWriter {
writers: Vec<ArrowColumnWriter>,
schema: SchemaRef,
buffered_rows: usize,
chunkers: Option<Vec<ContentDefinedChunker>>,
}

impl ArrowRowGroupWriter {
fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
fn new(
writers: Vec<ArrowColumnWriter>,
arrow: &SchemaRef,
chunkers: Option<Vec<ContentDefinedChunker>>,
) -> Self {
Self {
writers,
schema: arrow.clone(),
buffered_rows: 0,
chunkers,
}
}

fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.buffered_rows += batch.num_rows();
let mut writers = self.writers.iter_mut();
let mut chunkers = self.chunkers.as_mut().map(|c| c.iter_mut());
for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
for leaf in compute_leaves(field.as_ref(), column)? {
writers.next().unwrap().write(&leaf)?
let writer = writers.next().unwrap();
match chunkers.as_mut().and_then(|c| c.next()) {
Some(chunker) => writer.write_with_chunker(&leaf, chunker)?,
None => writer.write(&leaf)?,
}
}
}
Ok(())
Expand All @@ -1010,26 +1044,13 @@ impl ArrowRowGroupWriter {
.sum()
}

fn close(
self,
) -> Result<(
Vec<ArrowColumnChunk>,
Option<Vec<ContentDefinedChunker>>,
)> {
let mut chunks = Vec::with_capacity(self.writers.len());
let mut chunkers = Vec::new();
for mut writer in self.writers {
if let Some(chunker) = writer.chunker.take() {
chunkers.push(chunker);
}
chunks.push(writer.close()?);
}
let chunkers = if chunkers.is_empty() {
None
} else {
Some(chunkers)
};
Ok((chunks, chunkers))
fn close(self) -> Result<(Vec<ArrowColumnChunk>, Option<Vec<ContentDefinedChunker>>)> {
let chunks = self
.writers
.into_iter()
.map(|writer| writer.close())
.collect::<Result<Vec<_>>>()?;
Ok((chunks, self.chunkers))
}
}

Expand All @@ -1044,9 +1065,6 @@ pub struct ArrowRowGroupWriterFactory {
props: WriterPropertiesPtr,
#[cfg(feature = "encryption")]
file_encryptor: Option<Arc<FileEncryptor>>,
/// CDC chunkers persisted across row groups (one per leaf column).
/// `None` when CDC is not enabled.
cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
}

impl ArrowRowGroupWriterFactory {
Expand All @@ -1063,13 +1081,20 @@ impl ArrowRowGroupWriterFactory {
props,
#[cfg(feature = "encryption")]
file_encryptor: file_writer.file_encryptor(),
cdc_chunkers: None,
}
}

fn create_row_group_writer(&mut self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
fn create_row_group_writer(
&mut self,
row_group_index: usize,
chunkers: Option<Vec<ContentDefinedChunker>>,
) -> Result<ArrowRowGroupWriter> {
let writers = self.create_column_writers(row_group_index)?;
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
Ok(ArrowRowGroupWriter::new(
writers,
&self.arrow_schema,
chunkers,
))
}

/// Create column writers for a new row group, with the given row group index
Expand All @@ -1088,33 +1113,9 @@ impl ArrowRowGroupWriterFactory {
&mut writers,
)?;
}
let chunkers = match self.cdc_chunkers.take() {
Some(chunkers) => chunkers,
None => match self.create_cdc_chunkers()? {
Some(chunkers) => chunkers,
None => return Ok(writers),
},
};
for (writer, chunker) in writers.iter_mut().zip(chunkers) {
writer.chunker = Some(chunker);
}
Ok(writers)
}

/// Create CDC chunkers for all leaf columns, or `None` if CDC is not enabled.
fn create_cdc_chunkers(&self) -> Result<Option<Vec<ContentDefinedChunker>>> {
let opts = match self.props.cdc_options() {
Some(opts) => opts,
None => return Ok(None),
};
self.schema
.columns()
.iter()
.map(|desc| ContentDefinedChunker::new(desc, opts))
.collect::<Result<Vec<_>>>()
.map(Some)
}

#[cfg(feature = "encryption")]
fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
ArrowColumnWriterFactory::new()
Expand Down Expand Up @@ -1221,7 +1222,6 @@ impl ArrowColumnWriterFactory {
Ok(ArrowColumnWriter {
chunk,
writer: ArrowColumnWriterImpl::Column(writer),
chunker: None,
})
};

Expand All @@ -1233,7 +1233,6 @@ impl ArrowColumnWriterFactory {
Ok(ArrowColumnWriter {
chunk,
writer: ArrowColumnWriterImpl::ByteArray(writer),
chunker: None,
})
};

Expand Down
Loading