From 08a1ef69d195aeeee4c4af9f84e8da5d428a79c5 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 25 Jun 2026 15:31:12 +0100 Subject: [PATCH 1/2] Compute zone layout stats concurrently when writing Signed-off-by: Robert Kruszewski --- vortex-layout/src/layouts/zoned/mod.rs | 9 ++-- vortex-layout/src/layouts/zoned/reader.rs | 35 ++------------- vortex-layout/src/layouts/zoned/writer.rs | 54 ++++++++++++++--------- 3 files changed, 42 insertions(+), 56 deletions(-) diff --git a/vortex-layout/src/layouts/zoned/mod.rs b/vortex-layout/src/layouts/zoned/mod.rs index 3d82fa27b34..0871068a85b 100644 --- a/vortex-layout/src/layouts/zoned/mod.rs +++ b/vortex-layout/src/layouts/zoned/mod.rs @@ -18,6 +18,7 @@ mod schema; pub mod writer; pub mod zone_map; +use std::num::NonZeroUsize; use std::sync::Arc; pub(crate) use builder::AggregateStatsAccumulator; @@ -319,11 +320,9 @@ impl ZonedLayout { pub fn try_new( data: LayoutRef, zones: LayoutRef, - zone_len: usize, + zone_len: NonZeroUsize, aggregate_fns: Arc<[AggregateFnRef]>, ) -> VortexResult { - vortex_ensure!(zone_len > 0, "Zone length must be greater than 0"); - let expected_dtype = aggregate_stats_table_dtype(data.dtype(), &aggregate_fns); if zones.dtype() != &expected_dtype { vortex_bail!("Invalid zone map layout: zones dtype does not match expected dtype"); @@ -333,7 +332,7 @@ impl ZonedLayout { Ok(Self { dtype: data.dtype().clone(), children: OwnedLayoutChildren::layout_children(vec![data, zones]), - zone_len, + zone_len: zone_len.get(), zone_map_schema: ZoneMapSchema::AggregateFns(aggregate_fns), stats_table_dtype: expected_dtype, }) @@ -540,7 +539,7 @@ mod tests { fn test_metadata_serialization_preserves_aggregate_options() -> VortexResult<()> { let aggregate_fn = BoundedMax.bind(BoundedMaxOptions { // SAFETY: 128 is non-zero. - max_bytes: unsafe { std::num::NonZeroUsize::new_unchecked(128) }, + max_bytes: unsafe { NonZeroUsize::new_unchecked(128) }, }); let metadata = ZonedMetadata { zone_len: 314, diff --git a/vortex-layout/src/layouts/zoned/reader.rs b/vortex-layout/src/layouts/zoned/reader.rs index 8feaf46fa03..162742b6f22 100644 --- a/vortex-layout/src/layouts/zoned/reader.rs +++ b/vortex-layout/src/layouts/zoned/reader.rs @@ -219,6 +219,7 @@ impl LayoutReader for ZonedReader { #[cfg(test)] mod test { + use std::num::NonZeroUsize; use std::sync::Arc; use rstest::fixture; @@ -237,6 +238,7 @@ mod test { use vortex_array::expr::root; use vortex_array::validity::Validity; use vortex_buffer::buffer; + use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_io::runtime::Handle; use vortex_io::runtime::single::block_on; @@ -283,7 +285,7 @@ mod test { ChunkedLayoutStrategy::new(FlatLayoutStrategy::default()), FlatLayoutStrategy::default(), ZonedLayoutOptions { - block_size: 3, + block_size: NonZeroUsize::new(3).vortex_expect("non zero"), ..Default::default() }, ); @@ -370,7 +372,7 @@ mod test { ChunkedLayoutStrategy::new(FlatLayoutStrategy::default()), FlatLayoutStrategy::default(), ZonedLayoutOptions { - block_size: 3, + block_size: NonZeroUsize::new(3).vortex_expect("non zero"), ..Default::default() }, ); @@ -470,33 +472,4 @@ mod test { Ok(()) }) } - - #[test] - fn test_writer_rejects_zero_block_size() { - let ctx = ArrayContext::empty(); - let segments = Arc::new(TestSegments::default()); - let (ptr, eof) = SequenceId::root().split(); - let strategy = ZonedStrategy::new( - ChunkedLayoutStrategy::new(FlatLayoutStrategy::default()), - FlatLayoutStrategy::default(), - ZonedLayoutOptions { - block_size: 0, - ..Default::default() - }, - ); - let array_stream = ChunkedArray::from_iter([buffer![1, 2, 3].into_array()]) - .into_array() - .to_array_stream() - .sequenced(ptr); - let segments2 = Arc::::clone(&segments); - - let result = block_on(|handle| async move { - let session = session_with_handle(handle); - strategy - .write_stream(ctx, segments2, array_stream, eof, &session) - .await - }); - - assert!(result.is_err()); - } } diff --git a/vortex-layout/src/layouts/zoned/writer.rs b/vortex-layout/src/layouts/zoned/writer.rs index 7ebca0104e9..6ed7b541bcb 100644 --- a/vortex-layout/src/layouts/zoned/writer.rs +++ b/vortex-layout/src/layouts/zoned/writer.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::num::NonZeroUsize; use std::sync::Arc; use async_trait::async_trait; @@ -26,9 +27,11 @@ use vortex_array::aggregate_fn::fns::nan_count::NanCount; use vortex_array::aggregate_fn::fns::null_count::NullCount; use vortex_array::aggregate_fn::fns::sum::Sum; use vortex_array::dtype::DType; +use vortex_error::VortexError; use vortex_error::VortexResult; -use vortex_error::vortex_ensure; +use vortex_io::session::RuntimeSessionExt; use vortex_session::VortexSession; +use vortex_utils::parallelism::get_available_parallelism; use crate::IntoLayout; use crate::LayoutRef; @@ -50,18 +53,23 @@ use crate::sequence::SequentialStreamExt; /// possibly the final partial zone. pub struct ZonedLayoutOptions { /// The size of a statistics block - pub block_size: usize, + pub block_size: NonZeroUsize, /// The aggregate partials to collect for each block. /// /// If unset, the writer chooses pruning aggregates from the input dtype. pub aggregate_fns: Option>, + /// Number of chunks to compute aggregate partials in parallel. + pub concurrency: NonZeroUsize, } impl Default for ZonedLayoutOptions { fn default() -> Self { Self { - block_size: 8192, + block_size: unsafe { NonZeroUsize::new_unchecked(8192) }, aggregate_fns: None, + concurrency: unsafe { + NonZeroUsize::new_unchecked(get_available_parallelism().unwrap_or(1)) + }, } } } @@ -97,17 +105,12 @@ impl LayoutStrategy for ZonedStrategy { mut eof: SequencePointer, session: &VortexSession, ) -> VortexResult { - vortex_ensure!( - self.options.block_size > 0, - "ZonedStrategy requires block_size > 0 when writing" - ); - let aggregate_fns = self .options .aggregate_fns .clone() .unwrap_or_else(|| default_zoned_aggregate_fns(stream.dtype())); - let session = session.clone(); + let compute_session = session.clone(); let stats_accumulator = Arc::new(Mutex::new(AggregateStatsAccumulator::new( stream.dtype(), @@ -115,20 +118,31 @@ impl LayoutStrategy for ZonedStrategy { ))); let aggregate_fns = stats_accumulator.lock().aggregate_fns(); + let stream_dtype = stream.dtype().clone(); + let concurrency = self.options.concurrency.get(); + let stream = stream + .map(move |item| { + let aggregate_fns = Arc::clone(&aggregate_fns); + let session = compute_session.clone(); + session.handle().spawn_cpu(move || { + let (sequence_id, chunk) = item?; + let partials = aggregate_partials( + &chunk, + &aggregate_fns, + &mut session.create_execution_ctx(), + )?; + Ok::<_, VortexError>((sequence_id, chunk, partials)) + }) + }) + .buffered(concurrency); + // Accumulate zone stats in stream order so the auxiliary table stays aligned with the // data child. let stats_accumulator2 = Arc::clone(&stats_accumulator); - let aggregate_fns2 = Arc::clone(&aggregate_fns); - let compute_session = session.clone(); let stream = SequentialStreamAdapter::new( - stream.dtype().clone(), + stream_dtype, stream.map(move |item| { - let (sequence_id, chunk) = item?; - let partials = aggregate_partials( - &chunk, - &aggregate_fns2, - &mut compute_session.create_execution_ctx(), - )?; + let (sequence_id, chunk, partials) = item?; stats_accumulator2.lock().push_partials(partials)?; Ok((sequence_id, chunk)) }), @@ -146,7 +160,7 @@ impl LayoutStrategy for ZonedStrategy { Arc::clone(&segment_sink), stream, data_eof, - &session, + session, ) .await?; @@ -164,7 +178,7 @@ impl LayoutStrategy for ZonedStrategy { .sequenced(eof.split_off()); let zones_layout = self .stats - .write_stream(ctx, Arc::clone(&segment_sink), stats_stream, eof, &session) + .write_stream(ctx, Arc::clone(&segment_sink), stats_stream, eof, session) .await?; Ok( From ab858d46f52a69f448c1c56a59a943de83081dc8 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 25 Jun 2026 16:52:38 +0100 Subject: [PATCH 2/2] more Signed-off-by: Robert Kruszewski --- vortex-file/src/strategy.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 804218779c8..980082b5802 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -3,6 +3,7 @@ //! This module defines the default layout strategy for a Vortex file. +use std::num::NonZeroUsize; use std::sync::Arc; use std::sync::LazyLock; @@ -35,6 +36,7 @@ use vortex_btrblocks::schemes::integer::IntDictScheme; use vortex_bytebool::ByteBool; use vortex_datetime_parts::DateTimeParts; use vortex_decimal_byte_parts::DecimalByteParts; +use vortex_error::VortexExpect; use vortex_fastlanes::BitPacked; use vortex_fastlanes::Delta; use vortex_fastlanes::FoR; @@ -297,7 +299,7 @@ impl WriteStrategyBuilder { dict, compress_then_flat.clone(), ZonedLayoutOptions { - block_size: self.row_block_size, + block_size: NonZeroUsize::new(self.row_block_size).vortex_expect("must be non 0"), ..Default::default() }, );