Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion vortex-file/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

//! This module defines the default layout strategy for a Vortex file.

use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::LazyLock;

Expand Down Expand Up @@ -35,6 +36,7 @@ use vortex_btrblocks::schemes::integer::IntDictScheme;
use vortex_bytebool::ByteBool;
use vortex_datetime_parts::DateTimeParts;
use vortex_decimal_byte_parts::DecimalByteParts;
use vortex_error::VortexExpect;
use vortex_fastlanes::BitPacked;
use vortex_fastlanes::Delta;
use vortex_fastlanes::FoR;
Expand Down Expand Up @@ -297,7 +299,7 @@ impl WriteStrategyBuilder {
dict,
compress_then_flat.clone(),
ZonedLayoutOptions {
block_size: self.row_block_size,
block_size: NonZeroUsize::new(self.row_block_size).vortex_expect("must be non 0"),
..Default::default()
},
);
Expand Down
9 changes: 4 additions & 5 deletions vortex-layout/src/layouts/zoned/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod schema;
pub mod writer;
pub mod zone_map;

use std::num::NonZeroUsize;
use std::sync::Arc;

pub(crate) use builder::AggregateStatsAccumulator;
Expand Down Expand Up @@ -319,11 +320,9 @@ impl ZonedLayout {
pub fn try_new(
data: LayoutRef,
zones: LayoutRef,
zone_len: usize,
zone_len: NonZeroUsize,
aggregate_fns: Arc<[AggregateFnRef]>,
) -> VortexResult<Self> {
vortex_ensure!(zone_len > 0, "Zone length must be greater than 0");

let expected_dtype = aggregate_stats_table_dtype(data.dtype(), &aggregate_fns);
if zones.dtype() != &expected_dtype {
vortex_bail!("Invalid zone map layout: zones dtype does not match expected dtype");
Expand All @@ -333,7 +332,7 @@ impl ZonedLayout {
Ok(Self {
dtype: data.dtype().clone(),
children: OwnedLayoutChildren::layout_children(vec![data, zones]),
zone_len,
zone_len: zone_len.get(),
zone_map_schema: ZoneMapSchema::AggregateFns(aggregate_fns),
stats_table_dtype: expected_dtype,
})
Expand Down Expand Up @@ -540,7 +539,7 @@ mod tests {
fn test_metadata_serialization_preserves_aggregate_options() -> VortexResult<()> {
let aggregate_fn = BoundedMax.bind(BoundedMaxOptions {
// SAFETY: 128 is non-zero.
max_bytes: unsafe { std::num::NonZeroUsize::new_unchecked(128) },
max_bytes: unsafe { NonZeroUsize::new_unchecked(128) },
});
let metadata = ZonedMetadata {
zone_len: 314,
Expand Down
35 changes: 4 additions & 31 deletions vortex-layout/src/layouts/zoned/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ impl LayoutReader for ZonedReader {

#[cfg(test)]
mod test {
use std::num::NonZeroUsize;
use std::sync::Arc;

use rstest::fixture;
Expand All @@ -237,6 +238,7 @@ mod test {
use vortex_array::expr::root;
use vortex_array::validity::Validity;
use vortex_buffer::buffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_io::runtime::Handle;
use vortex_io::runtime::single::block_on;
Expand Down Expand Up @@ -283,7 +285,7 @@ mod test {
ChunkedLayoutStrategy::new(FlatLayoutStrategy::default()),
FlatLayoutStrategy::default(),
ZonedLayoutOptions {
block_size: 3,
block_size: NonZeroUsize::new(3).vortex_expect("non zero"),
..Default::default()
},
);
Expand Down Expand Up @@ -370,7 +372,7 @@ mod test {
ChunkedLayoutStrategy::new(FlatLayoutStrategy::default()),
FlatLayoutStrategy::default(),
ZonedLayoutOptions {
block_size: 3,
block_size: NonZeroUsize::new(3).vortex_expect("non zero"),
..Default::default()
},
);
Expand Down Expand Up @@ -470,33 +472,4 @@ mod test {
Ok(())
})
}

#[test]
fn test_writer_rejects_zero_block_size() {
let ctx = ArrayContext::empty();
let segments = Arc::new(TestSegments::default());
let (ptr, eof) = SequenceId::root().split();
let strategy = ZonedStrategy::new(
ChunkedLayoutStrategy::new(FlatLayoutStrategy::default()),
FlatLayoutStrategy::default(),
ZonedLayoutOptions {
block_size: 0,
..Default::default()
},
);
let array_stream = ChunkedArray::from_iter([buffer![1, 2, 3].into_array()])
.into_array()
.to_array_stream()
.sequenced(ptr);
let segments2 = Arc::<TestSegments>::clone(&segments);

let result = block_on(|handle| async move {
let session = session_with_handle(handle);
strategy
.write_stream(ctx, segments2, array_stream, eof, &session)
.await
});

assert!(result.is_err());
}
}
54 changes: 34 additions & 20 deletions vortex-layout/src/layouts/zoned/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::num::NonZeroUsize;
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -26,9 +27,11 @@ use vortex_array::aggregate_fn::fns::nan_count::NanCount;
use vortex_array::aggregate_fn::fns::null_count::NullCount;
use vortex_array::aggregate_fn::fns::sum::Sum;
use vortex_array::dtype::DType;
use vortex_error::VortexError;
use vortex_error::VortexResult;
use vortex_error::vortex_ensure;
use vortex_io::session::RuntimeSessionExt;
use vortex_session::VortexSession;
use vortex_utils::parallelism::get_available_parallelism;

use crate::IntoLayout;
use crate::LayoutRef;
Expand All @@ -50,18 +53,23 @@ use crate::sequence::SequentialStreamExt;
/// possibly the final partial zone.
pub struct ZonedLayoutOptions {
/// The size of a statistics block
pub block_size: usize,
pub block_size: NonZeroUsize,
/// The aggregate partials to collect for each block.
///
/// If unset, the writer chooses pruning aggregates from the input dtype.
pub aggregate_fns: Option<Arc<[AggregateFnRef]>>,
/// Number of chunks to compute aggregate partials in parallel.
pub concurrency: NonZeroUsize,
}

impl Default for ZonedLayoutOptions {
fn default() -> Self {
Self {
block_size: 8192,
block_size: unsafe { NonZeroUsize::new_unchecked(8192) },
aggregate_fns: None,
concurrency: unsafe {
NonZeroUsize::new_unchecked(get_available_parallelism().unwrap_or(1))
},
}
}
}
Expand Down Expand Up @@ -97,38 +105,44 @@ impl LayoutStrategy for ZonedStrategy {
mut eof: SequencePointer,
session: &VortexSession,
) -> VortexResult<LayoutRef> {
vortex_ensure!(
self.options.block_size > 0,
"ZonedStrategy requires block_size > 0 when writing"
);

let aggregate_fns = self
.options
.aggregate_fns
.clone()
.unwrap_or_else(|| default_zoned_aggregate_fns(stream.dtype()));
let session = session.clone();
let compute_session = session.clone();

let stats_accumulator = Arc::new(Mutex::new(AggregateStatsAccumulator::new(
stream.dtype(),
&aggregate_fns,
)));
let aggregate_fns = stats_accumulator.lock().aggregate_fns();

let stream_dtype = stream.dtype().clone();
let concurrency = self.options.concurrency.get();
let stream = stream
.map(move |item| {
let aggregate_fns = Arc::clone(&aggregate_fns);
let session = compute_session.clone();
session.handle().spawn_cpu(move || {
let (sequence_id, chunk) = item?;
let partials = aggregate_partials(
&chunk,
&aggregate_fns,
&mut session.create_execution_ctx(),
)?;
Ok::<_, VortexError>((sequence_id, chunk, partials))
})
})
.buffered(concurrency);

// Accumulate zone stats in stream order so the auxiliary table stays aligned with the
// data child.
let stats_accumulator2 = Arc::clone(&stats_accumulator);
let aggregate_fns2 = Arc::clone(&aggregate_fns);
let compute_session = session.clone();
let stream = SequentialStreamAdapter::new(
stream.dtype().clone(),
stream_dtype,
stream.map(move |item| {
let (sequence_id, chunk) = item?;
let partials = aggregate_partials(
&chunk,
&aggregate_fns2,
&mut compute_session.create_execution_ctx(),
)?;
let (sequence_id, chunk, partials) = item?;
stats_accumulator2.lock().push_partials(partials)?;
Ok((sequence_id, chunk))
}),
Expand All @@ -146,7 +160,7 @@ impl LayoutStrategy for ZonedStrategy {
Arc::clone(&segment_sink),
stream,
data_eof,
&session,
session,
)
.await?;

Expand All @@ -164,7 +178,7 @@ impl LayoutStrategy for ZonedStrategy {
.sequenced(eof.split_off());
let zones_layout = self
.stats
.write_stream(ctx, Arc::clone(&segment_sink), stats_stream, eof, &session)
.write_stream(ctx, Arc::clone(&segment_sink), stats_stream, eof, session)
.await?;

Ok(
Expand Down
Loading