diff --git a/vortex-array/src/aggregate_fn/fns/all_nan/mod.rs b/vortex-array/src/aggregate_fn/fns/all_nan/mod.rs index c33443dd02b..cd703af7529 100644 --- a/vortex-array/src/aggregate_fn/fns/all_nan/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/all_nan/mod.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use vortex_error::VortexResult; +use vortex_session::VortexSession; use crate::ArrayRef; use crate::Columnar; @@ -37,7 +38,15 @@ impl AggregateFnVTable for AllNan { } fn serialize(&self, _options: &Self::Options) -> VortexResult>> { - Ok(None) + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) } fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { diff --git a/vortex-array/src/aggregate_fn/fns/all_non_nan/mod.rs b/vortex-array/src/aggregate_fn/fns/all_non_nan/mod.rs index f8d371cb776..a1786eee500 100644 --- a/vortex-array/src/aggregate_fn/fns/all_non_nan/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/all_non_nan/mod.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use vortex_error::VortexResult; +use vortex_session::VortexSession; use crate::ArrayRef; use crate::Columnar; @@ -37,7 +38,15 @@ impl AggregateFnVTable for AllNonNan { } fn serialize(&self, _options: &Self::Options) -> VortexResult>> { - Ok(None) + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) } fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { diff --git a/vortex-array/src/aggregate_fn/fns/all_non_null/mod.rs b/vortex-array/src/aggregate_fn/fns/all_non_null/mod.rs index a297c318fc9..f318c2586a4 100644 --- a/vortex-array/src/aggregate_fn/fns/all_non_null/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/all_non_null/mod.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use vortex_error::VortexResult; +use vortex_session::VortexSession; use crate::ArrayRef; use crate::Columnar; @@ -29,7 +30,15 @@ impl AggregateFnVTable for AllNonNull { } fn serialize(&self, _options: &Self::Options) -> VortexResult>> { - Ok(None) + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) } fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option { diff --git a/vortex-array/src/aggregate_fn/fns/all_null/mod.rs b/vortex-array/src/aggregate_fn/fns/all_null/mod.rs index 5476c7d534b..838f0502e4f 100644 --- a/vortex-array/src/aggregate_fn/fns/all_null/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/all_null/mod.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use vortex_error::VortexResult; +use vortex_session::VortexSession; use crate::ArrayRef; use crate::Columnar; @@ -29,7 +30,15 @@ impl AggregateFnVTable for AllNull { } fn serialize(&self, _options: &Self::Options) -> VortexResult>> { - Ok(None) + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) } fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option { diff --git a/vortex-array/src/aggregate_fn/fns/max/mod.rs b/vortex-array/src/aggregate_fn/fns/max/mod.rs index 755e1a4ed35..89a98cc54c0 100644 --- a/vortex-array/src/aggregate_fn/fns/max/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/max/mod.rs @@ -3,6 +3,7 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_session::VortexSession; use crate::ArrayRef; use crate::Columnar; @@ -52,7 +53,15 @@ impl AggregateFnVTable for Max { } fn serialize(&self, _options: &Self::Options) -> VortexResult>> { - Ok(None) + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) } fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { diff --git a/vortex-array/src/aggregate_fn/fns/min/mod.rs b/vortex-array/src/aggregate_fn/fns/min/mod.rs index b176bbaf742..79548b44b14 100644 --- a/vortex-array/src/aggregate_fn/fns/min/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/min/mod.rs @@ -3,6 +3,7 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_session::VortexSession; use crate::ArrayRef; use crate::Columnar; @@ -52,7 +53,15 @@ impl AggregateFnVTable for Min { } fn serialize(&self, _options: &Self::Options) -> VortexResult>> { - Ok(None) + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) } fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { diff --git a/vortex-array/src/aggregate_fn/fns/nan_count/mod.rs b/vortex-array/src/aggregate_fn/fns/nan_count/mod.rs index 51087f3bb3e..20ac7843065 100644 --- a/vortex-array/src/aggregate_fn/fns/nan_count/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/nan_count/mod.rs @@ -7,6 +7,7 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_err; +use vortex_session::VortexSession; use self::primitive::accumulate_primitive; use crate::ArrayRef; @@ -87,7 +88,15 @@ impl AggregateFnVTable for NanCount { } fn serialize(&self, _options: &Self::Options) -> VortexResult>> { - unimplemented!("NanCount is not yet serializable"); + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) } fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { diff --git a/vortex-array/src/aggregate_fn/fns/null_count/mod.rs b/vortex-array/src/aggregate_fn/fns/null_count/mod.rs index 47faabc82ef..6e3f0264e26 100644 --- a/vortex-array/src/aggregate_fn/fns/null_count/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/null_count/mod.rs @@ -4,6 +4,7 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; +use vortex_session::VortexSession; use crate::ArrayRef; use crate::Columnar; @@ -62,7 +63,15 @@ impl AggregateFnVTable for NullCount { } fn serialize(&self, _options: &Self::Options) -> VortexResult>> { - Ok(None) + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) } fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option { diff --git a/vortex-array/src/aggregate_fn/fns/sum/mod.rs b/vortex-array/src/aggregate_fn/fns/sum/mod.rs index f48397798d9..12935144531 100644 --- a/vortex-array/src/aggregate_fn/fns/sum/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/sum/mod.rs @@ -12,6 +12,7 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_err; use vortex_error::vortex_panic; +use vortex_session::VortexSession; use self::bool::accumulate_bool; use self::constant::multiply_constant; @@ -76,7 +77,15 @@ impl AggregateFnVTable for Sum { } fn serialize(&self, _options: &Self::Options) -> VortexResult>> { - unimplemented!("Sum is not yet serializable"); + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) } fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs index 7a77ed9555c..7a08054ef86 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs @@ -27,6 +27,7 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_err; use vortex_mask::Mask; +use vortex_session::VortexSession; use crate::ArrayRef; use crate::Canonical; @@ -104,7 +105,15 @@ impl AggregateFnVTable for UncompressedSizeInBytes { } fn serialize(&self, _options: &Self::Options) -> VortexResult>> { - unimplemented!("UncompressedSizeInBytes is not yet serializable"); + Ok(Some(vec![])) + } + + fn deserialize( + &self, + _metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + Ok(EmptyOptions) } fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option { diff --git a/vortex-cuda/src/layout.rs b/vortex-cuda/src/layout.rs index ac16d858d2d..2ccb2a344ec 100644 --- a/vortex-cuda/src/layout.rs +++ b/vortex-cuda/src/layout.rs @@ -40,6 +40,7 @@ use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_panic; use vortex::layout::IntoLayout; +use vortex::layout::LayoutBuildContext; use vortex::layout::LayoutChildType; use vortex::layout::LayoutChildren; use vortex::layout::LayoutEncodingRef; @@ -198,7 +199,7 @@ impl VTable for CudaFlat { metadata: &::Output, segment_ids: Vec, _children: &dyn LayoutChildren, - ctx: &ReadContext, + build_ctx: &LayoutBuildContext<'_>, ) -> VortexResult { if segment_ids.len() != 1 { vortex_bail!("CudaFlatLayout must have exactly one segment ID"); @@ -212,7 +213,7 @@ impl VTable for CudaFlat { row_count, dtype: dtype.clone(), segment_id: segment_ids[0], - ctx: ctx.clone(), + ctx: build_ctx.array_read_ctx.clone(), array_tree: ByteBuffer::from(metadata.array_encoding_tree.clone()), host_buffers: Arc::new(host_buffers), }) diff --git a/vortex-file/src/footer/mod.rs b/vortex-file/src/footer/mod.rs index ac2153f7cd1..f7e71d31d1d 100644 --- a/vortex-file/src/footer/mod.rs +++ b/vortex-file/src/footer/mod.rs @@ -35,7 +35,6 @@ use vortex_flatbuffers::footer as fb; use vortex_layout::LayoutEncodingId; use vortex_layout::LayoutRef; use vortex_layout::layout_from_flatbuffer_with_options; -use vortex_layout::session::LayoutSessionExt; use vortex_session::VortexSession; use vortex_session::registry::ReadContext; @@ -106,7 +105,7 @@ impl Footer { &dtype, &layout_read_ctx, &array_read_ctx, - session.layouts().registry(), + session, session.allows_unknown(), )?; diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index cd7e372c25d..3663c1fadc4 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -63,6 +63,8 @@ use vortex_buffer::buffer; use vortex_error::VortexResult; use vortex_io::session::RuntimeSession; use vortex_layout::Layout; +use vortex_layout::layouts::zoned::LegacyStats; +use vortex_layout::layouts::zoned::Zoned; use vortex_layout::scan::scan_builder::ScanBuilder; use vortex_layout::session::LayoutSession; use vortex_session::VortexSession; @@ -1886,7 +1888,7 @@ async fn test_segment_ordering_zonemaps_after_data() -> VortexResult<()> { // Find all zoned layouts and verify data segments come before zone map segments. fn check_zoned_ordering(layout: &dyn Layout, segment_specs: &[SegmentSpec]) { - if layout.encoding_id().as_ref() == "vortex.stats" { + if layout.is::() || layout.is::() { // child 0 = data, child 1 = zones let data_offsets = collect_segment_offsets(layout.child(0).unwrap().as_ref(), segment_specs); @@ -1918,7 +1920,7 @@ async fn test_segment_ordering_zonemaps_after_data() -> VortexResult<()> { all_data: &mut Vec, all_zones: &mut Vec, ) { - if layout.encoding_id().as_ref() == "vortex.stats" { + if layout.is::() || layout.is::() { // child 0 = data, child 1 = zones all_data.extend(collect_segment_offsets( layout.child(0).unwrap().as_ref(), diff --git a/vortex-layout/src/children.rs b/vortex-layout/src/children.rs index 9dbc993c602..dade2cbc4a4 100644 --- a/vortex-layout/src/children.rs +++ b/vortex-layout/src/children.rs @@ -14,8 +14,10 @@ use vortex_error::vortex_bail; use vortex_error::vortex_err; use vortex_flatbuffers::FlatBuffer; use vortex_flatbuffers::layout as fbl; +use vortex_session::VortexSession; use vortex_session::registry::ReadContext; +use crate::LayoutBuildContext; use crate::LayoutRef; use crate::layouts::foreign::new_foreign_layout; use crate::segments::SegmentId; @@ -105,6 +107,7 @@ pub(crate) struct ViewedLayoutChildren { layout_read_ctx: ReadContext, layouts: LayoutRegistry, allow_unknown: bool, + session: VortexSession, cache: Arc<[OnceCell]>, } @@ -121,6 +124,7 @@ impl ViewedLayoutChildren { layout_read_ctx: ReadContext, layouts: LayoutRegistry, allow_unknown: bool, + session: VortexSession, ) -> Self { // SAFETY: guaranteed by caller let nchildren = unsafe { fbl::Layout::follow(flatbuffer.as_ref(), flatbuffer_loc) } @@ -135,6 +139,7 @@ impl ViewedLayoutChildren { layout_read_ctx, layouts, allow_unknown, + session, cache, } } @@ -206,6 +211,7 @@ impl LayoutChildren for ViewedLayoutChildren { self.layout_read_ctx.clone(), self.layouts.clone(), self.allow_unknown, + self.session.clone(), ) }; @@ -223,6 +229,10 @@ impl LayoutChildren for ViewedLayoutChildren { )); }; + let build_ctx = LayoutBuildContext { + session: &self.session, + array_read_ctx: &self.array_read_ctx, + }; encoding.build( dtype, fb_child.row_count(), @@ -237,7 +247,7 @@ impl LayoutChildren for ViewedLayoutChildren { .map(SegmentId::from) .collect_vec(), &viewed_children, - &self.array_read_ctx, + &build_ctx, ) })?; Ok(Arc::clone(layout_ref)) diff --git a/vortex-layout/src/encoding.rs b/vortex-layout/src/encoding.rs index 828634a8902..c2529b86e82 100644 --- a/vortex-layout/src/encoding.rs +++ b/vortex-layout/src/encoding.rs @@ -13,9 +13,9 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_panic; use vortex_session::registry::Id; -use vortex_session::registry::ReadContext; use crate::IntoLayout; +use crate::LayoutBuildContext; use crate::LayoutChildren; use crate::LayoutRef; use crate::VTable; @@ -37,7 +37,7 @@ pub trait LayoutEncoding: 'static + Send + Sync + Debug + private::Sealed { metadata: &[u8], segment_ids: Vec, children: &dyn LayoutChildren, - ctx: &ReadContext, + build_ctx: &LayoutBuildContext<'_>, ) -> VortexResult; } @@ -60,7 +60,7 @@ impl LayoutEncoding for LayoutEncodingAdapter { metadata: &[u8], segment_ids: Vec, children: &dyn LayoutChildren, - ctx: &ReadContext, + build_ctx: &LayoutBuildContext<'_>, ) -> VortexResult { let metadata = ::deserialize(metadata)?; let layout = V::build( @@ -70,7 +70,7 @@ impl LayoutEncoding for LayoutEncodingAdapter { &metadata, segment_ids, children, - ctx, + build_ctx, )?; // Validate that the builder function returned the expected values. diff --git a/vortex-layout/src/flatbuffers.rs b/vortex-layout/src/flatbuffers.rs index 8d5c978bd14..92d34cd5de3 100644 --- a/vortex-layout/src/flatbuffers.rs +++ b/vortex-layout/src/flatbuffers.rs @@ -15,15 +15,17 @@ use vortex_flatbuffers::FlatBuffer; use vortex_flatbuffers::FlatBufferRoot; use vortex_flatbuffers::WriteFlatBuffer; use vortex_flatbuffers::layout; +use vortex_session::VortexSession; use vortex_session::registry::ReadContext; use crate::Layout; +use crate::LayoutBuildContext; use crate::LayoutContext; use crate::LayoutRef; use crate::children::ViewedLayoutChildren; use crate::layouts::foreign::new_foreign_layout; use crate::segments::SegmentId; -use crate::session::LayoutRegistry; +use crate::session::LayoutSessionExt; static LAYOUT_VERIFIER: LazyLock = LazyLock::new(|| { VerifierOptions { @@ -48,9 +50,9 @@ pub fn layout_from_flatbuffer( dtype: &DType, layout_ctx: &ReadContext, ctx: &ReadContext, - layouts: &LayoutRegistry, + session: &VortexSession, ) -> VortexResult { - layout_from_flatbuffer_with_options(flatbuffer, dtype, layout_ctx, ctx, layouts, false) + layout_from_flatbuffer_with_options(flatbuffer, dtype, layout_ctx, ctx, session, false) } /// Parse a [`LayoutRef`] from a layout flatbuffer with unknown-encoding behavior control. @@ -59,9 +61,10 @@ pub fn layout_from_flatbuffer_with_options( dtype: &DType, layout_ctx: &ReadContext, ctx: &ReadContext, - layouts: &LayoutRegistry, + session: &VortexSession, allow_unknown: bool, ) -> VortexResult { + let layouts = session.layouts().registry(); let fb_layout = root_with_opts::(&LAYOUT_VERIFIER, &flatbuffer)?; let encoding_id = layout_ctx .resolve(fb_layout.encoding()) @@ -83,9 +86,14 @@ pub fn layout_from_flatbuffer_with_options( layout_ctx.clone(), layouts.clone(), allow_unknown, + session.clone(), ) }; + let build_ctx = LayoutBuildContext { + session, + array_read_ctx: ctx, + }; let layout = encoding.build( dtype, fb_layout.row_count(), @@ -100,7 +108,7 @@ pub fn layout_from_flatbuffer_with_options( .map(SegmentId::from) .collect(), &viewed_children, - ctx, + &build_ctx, )?; Ok(layout) @@ -264,14 +272,14 @@ mod tests { LayoutEncodingId::new("vortex.test.foreign_child_layout"), ]); let array_ctx = ReadContext::new([]); - let layouts = LayoutSession::default().registry().clone(); + let session = vortex_array::array_session().with::(); let layout = layout_from_flatbuffer_with_options( layout_buffer, &DType::Variant(Nullability::Nullable), &layout_ctx, &array_ctx, - &layouts, + &session, true, ) .unwrap(); diff --git a/vortex-layout/src/layouts/chunked/mod.rs b/vortex-layout/src/layouts/chunked/mod.rs index ee37cf87411..b2d8826db77 100644 --- a/vortex-layout/src/layouts/chunked/mod.rs +++ b/vortex-layout/src/layouts/chunked/mod.rs @@ -11,8 +11,8 @@ use vortex_array::EmptyMetadata; use vortex_array::dtype::DType; use vortex_error::VortexResult; use vortex_session::VortexSession; -use vortex_session::registry::ReadContext; +use crate::LayoutBuildContext; use crate::LayoutChildType; use crate::LayoutEncodingRef; use crate::LayoutId; @@ -93,7 +93,7 @@ impl VTable for Chunked { _metadata: &::Output, _segment_ids: Vec, children: &dyn LayoutChildren, - _ctx: &ReadContext, + _build_ctx: &LayoutBuildContext<'_>, ) -> VortexResult { Ok(ChunkedLayout::new( row_count, diff --git a/vortex-layout/src/layouts/dict/mod.rs b/vortex-layout/src/layouts/dict/mod.rs index 5fb4ea080b4..8e119534946 100644 --- a/vortex-layout/src/layouts/dict/mod.rs +++ b/vortex-layout/src/layouts/dict/mod.rs @@ -19,8 +19,8 @@ use vortex_error::vortex_ensure; use vortex_error::vortex_err; use vortex_error::vortex_panic; use vortex_session::VortexSession; -use vortex_session::registry::ReadContext; +use crate::LayoutBuildContext; use crate::LayoutChildType; use crate::LayoutEncodingRef; use crate::LayoutId; @@ -110,7 +110,7 @@ impl VTable for Dict { metadata: &::Output, _segment_ids: Vec, children: &dyn LayoutChildren, - _ctx: &ReadContext, + _build_ctx: &LayoutBuildContext<'_>, ) -> VortexResult { let values = children.child(0, dtype)?; let codes_nullable = metadata diff --git a/vortex-layout/src/layouts/file_stats.rs b/vortex-layout/src/layouts/file_stats.rs index 0b3205e216f..eeaf8520670 100644 --- a/vortex-layout/src/layouts/file_stats.rs +++ b/vortex-layout/src/layouts/file_stats.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::future; +use std::marker::PhantomData; use std::sync::Arc; use futures::StreamExt; @@ -9,19 +10,37 @@ use itertools::Itertools; use parking_lot::Mutex; use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; +use vortex_array::aggregate_fn::fns::sum::sum; +use vortex_array::arrays::ConstantArray; use vortex_array::arrays::StructArray; use vortex_array::arrays::struct_::StructArrayExt; +use vortex_array::builders::ArrayBuilder; +use vortex_array::builders::BoolBuilder; +use vortex_array::builders::builder_with_capacity; use vortex_array::dtype::DType; +use vortex_array::dtype::FieldName; use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; +use vortex_array::expr::stats::Precision; use vortex_array::expr::stats::Stat; +use vortex_array::scalar::Scalar; +use vortex_array::scalar::ScalarTruncation; +use vortex_array::scalar::lower_bound; +use vortex_array::scalar::upper_bound; use vortex_array::stats::StatsSet; +use vortex_array::validity::Validity; +use vortex_buffer::BufferString; +use vortex_buffer::ByteBuffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_panic; use vortex_session::VortexSession; -use crate::layouts::zoned::StatsAccumulator; +use crate::layouts::zoned::MAX_IS_TRUNCATED; +use crate::layouts::zoned::MIN_IS_TRUNCATED; use crate::sequence::SendableSequentialStream; use crate::sequence::SequenceId; use crate::sequence::SequentialStreamAdapter; @@ -49,6 +68,336 @@ pub fn accumulate_stats( (accumulator, stream) } +/// Accumulates write-time statistics for a single file column. +struct StatsAccumulator { + builders: Vec>, + length: usize, +} + +impl StatsAccumulator { + fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self { + if !supports_file_stats(dtype) { + return Self { + builders: Vec::new(), + length: 0, + }; + } + + let builders = stats + .iter() + .filter_map(|&stat| { + stat.dtype(dtype).map(|stat_dtype| { + stats_builder_with_capacity( + stat, + &stat_dtype.as_nullable(), + 1024, + max_variable_length_statistics_size, + ) + }) + }) + .collect::>(); + + Self { + builders, + length: 0, + } + } + + fn push_chunk(&mut self, array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<()> { + for builder in &mut self.builders { + if let Some(value) = array.statistics().compute_stat(builder.stat(), ctx)? { + builder.append_scalar(value.cast(&value.dtype().as_nullable())?)?; + } else { + builder.append_null(); + } + } + self.length += 1; + Ok(()) + } + + fn as_array(&mut self) -> VortexResult> { + let mut names = Vec::new(); + let mut fields = Vec::new(); + + for builder in self + .builders + .iter_mut() + // We sort the stats so the DType is deterministic based on which stats are present. + .sorted_unstable_by_key(|builder| builder.stat()) + { + let values = builder.finish(); + + // We drop any all-null stats columns. + if values.all_invalid()? { + continue; + } + + names.extend(values.names); + fields.extend(values.arrays); + } + + if names.is_empty() { + return Ok(None); + } + + StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable).map(Some) + } + + /// Returns an aggregated stats set for the table. + fn as_stats_set(&mut self, stats: &[Stat], ctx: &mut ExecutionCtx) -> VortexResult { + let mut stats_set = StatsSet::default(); + let Some(array) = self.as_array()? else { + return Ok(stats_set); + }; + + for &stat in stats { + let Some(array) = array.unmasked_field_by_name_opt(stat.name()) else { + continue; + }; + + match stat { + Stat::Min | Stat::Max | Stat::Sum => { + if let Some(s) = array.statistics().compute_stat(stat, ctx)? + && let Some(v) = s.into_value() + { + stats_set.set(stat, Precision::exact(v)) + } + } + Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => { + if let Some(sum_value) = sum(array, ctx)? + .cast(&DType::Primitive(PType::U64, Nullability::Nullable))? + .into_value() + { + stats_set.set(stat, Precision::exact(sum_value)); + } + } + Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {} + } + } + Ok(stats_set) + } +} + +fn supports_file_stats(dtype: &DType) -> bool { + !matches!(dtype, DType::Variant(_)) +} + +fn stats_builder_with_capacity( + stat: Stat, + dtype: &DType, + capacity: usize, + max_length: usize, +) -> Box { + let values_builder = builder_with_capacity(dtype, capacity); + match stat { + Stat::Max => match dtype { + DType::Utf8(_) => Box::new(TruncatedMaxBinaryStatsBuilder::::new( + values_builder, + BoolBuilder::with_capacity(Nullability::NonNullable, capacity), + max_length, + )), + DType::Binary(_) => Box::new(TruncatedMaxBinaryStatsBuilder::::new( + values_builder, + BoolBuilder::with_capacity(Nullability::NonNullable, capacity), + max_length, + )), + _ => Box::new(StatNameArrayBuilder::new(stat, values_builder)), + }, + Stat::Min => match dtype { + DType::Utf8(_) => Box::new(TruncatedMinBinaryStatsBuilder::::new( + values_builder, + BoolBuilder::with_capacity(Nullability::NonNullable, capacity), + max_length, + )), + DType::Binary(_) => Box::new(TruncatedMinBinaryStatsBuilder::::new( + values_builder, + BoolBuilder::with_capacity(Nullability::NonNullable, capacity), + max_length, + )), + _ => Box::new(StatNameArrayBuilder::new(stat, values_builder)), + }, + _ => Box::new(StatNameArrayBuilder::new(stat, values_builder)), + } +} + +/// Arrays with their associated names, reduced version of a `StructArray`. +struct NamedArrays { + names: Vec, + arrays: Vec, +} + +impl NamedArrays { + fn all_invalid(&self) -> VortexResult { + self.arrays[0].all_invalid(&mut LEGACY_SESSION.create_execution_ctx()) + } +} + +trait StatsArrayBuilder: Send { + fn stat(&self) -> Stat; + + fn append_scalar(&mut self, value: Scalar) -> VortexResult<()>; + + fn append_null(&mut self); + + fn finish(&mut self) -> NamedArrays; +} + +struct StatNameArrayBuilder { + stat: Stat, + builder: Box, +} + +impl StatNameArrayBuilder { + fn new(stat: Stat, builder: Box) -> Self { + Self { stat, builder } + } +} + +impl StatsArrayBuilder for StatNameArrayBuilder { + fn stat(&self) -> Stat { + self.stat + } + + fn append_scalar(&mut self, value: Scalar) -> VortexResult<()> { + self.builder.append_scalar(&value) + } + + fn append_null(&mut self) { + self.builder.append_null() + } + + fn finish(&mut self) -> NamedArrays { + let array = self.builder.finish(); + let len = array.len(); + match self.stat { + Stat::Max => NamedArrays { + names: vec![self.stat.name().into(), MAX_IS_TRUNCATED.into()], + arrays: vec![array, ConstantArray::new(false, len).into_array()], + }, + Stat::Min => NamedArrays { + names: vec![self.stat.name().into(), MIN_IS_TRUNCATED.into()], + arrays: vec![array, ConstantArray::new(false, len).into_array()], + }, + _ => NamedArrays { + names: vec![self.stat.name().into()], + arrays: vec![array], + }, + } + } +} + +struct TruncatedMaxBinaryStatsBuilder { + values: Box, + is_truncated: BoolBuilder, + max_value_length: usize, + _marker: PhantomData, +} + +impl TruncatedMaxBinaryStatsBuilder { + fn new( + values: Box, + is_truncated: BoolBuilder, + max_value_length: usize, + ) -> Self { + Self { + values, + is_truncated, + max_value_length, + _marker: PhantomData, + } + } +} + +struct TruncatedMinBinaryStatsBuilder { + values: Box, + is_truncated: BoolBuilder, + max_value_length: usize, + _marker: PhantomData, +} + +impl TruncatedMinBinaryStatsBuilder { + fn new( + values: Box, + is_truncated: BoolBuilder, + max_value_length: usize, + ) -> Self { + Self { + values, + is_truncated, + max_value_length, + _marker: PhantomData, + } + } +} + +impl StatsArrayBuilder for TruncatedMaxBinaryStatsBuilder { + fn stat(&self) -> Stat { + Stat::Max + } + + fn append_scalar(&mut self, value: Scalar) -> VortexResult<()> { + let nullability = value.dtype().nullability(); + if let Some((upper_bound, truncated)) = + upper_bound(T::from_scalar(value)?, self.max_value_length, nullability) + { + self.values.append_scalar(&upper_bound)?; + self.is_truncated.append_value(truncated); + } else { + self.append_null() + } + Ok(()) + } + + fn append_null(&mut self) { + ArrayBuilder::append_null(self.values.as_mut()); + self.is_truncated.append_value(false); + } + + fn finish(&mut self) -> NamedArrays { + NamedArrays { + names: vec![Stat::Max.name().into(), MAX_IS_TRUNCATED.into()], + arrays: vec![ + ArrayBuilder::finish(self.values.as_mut()), + ArrayBuilder::finish(&mut self.is_truncated), + ], + } + } +} + +impl StatsArrayBuilder for TruncatedMinBinaryStatsBuilder { + fn stat(&self) -> Stat { + Stat::Min + } + + fn append_scalar(&mut self, value: Scalar) -> VortexResult<()> { + let nullability = value.dtype().nullability(); + if let Some((lower_bound, truncated)) = + lower_bound(T::from_scalar(value)?, self.max_value_length, nullability) + { + self.values.append_scalar(&lower_bound)?; + self.is_truncated.append_value(truncated); + } else { + self.append_null() + } + Ok(()) + } + + fn append_null(&mut self) { + ArrayBuilder::append_null(self.values.as_mut()); + self.is_truncated.append_value(false); + } + + fn finish(&mut self) -> NamedArrays { + NamedArrays { + names: vec![Stat::Min.name().into(), MIN_IS_TRUNCATED.into()], + arrays: vec![ + ArrayBuilder::finish(self.values.as_mut()), + ArrayBuilder::finish(&mut self.is_truncated), + ], + } + } +} + /// An array stream processor that computes aggregate statistics for all fields. /// /// Note: for now this only collects top-level struct fields. @@ -136,3 +485,94 @@ impl FileStatsAccumulator { .collect() } } + +#[cfg(test)] +mod tests { + use rstest::rstest; + use vortex_array::arrays::BoolArray; + use vortex_array::arrays::bool::BoolArrayExt; + use vortex_array::builders::VarBinViewBuilder; + use vortex_buffer::BitBuffer; + use vortex_buffer::buffer; + + use super::*; + + #[rstest] + #[case(DType::Utf8(Nullability::NonNullable))] + #[case(DType::Binary(Nullability::NonNullable))] + fn truncates_accumulated_stats(#[case] dtype: DType) { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2); + builder.append_value("Value to be truncated"); + builder.append_value("untruncated"); + let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2); + builder2.append_value("Another"); + builder2.append_value("wait a minute"); + let mut acc = + StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12); + acc.push_chunk(&builder.finish(), &mut ctx) + .vortex_expect("push_chunk should succeed for test data"); + acc.push_chunk(&builder2.finish(), &mut ctx) + .vortex_expect("push_chunk should succeed for test data"); + let stats_table = acc.as_array().unwrap().expect("Must have stats table"); + assert_eq!( + stats_table.names().as_ref(), + &[ + Stat::Max.name(), + MAX_IS_TRUNCATED, + Stat::Min.name(), + MIN_IS_TRUNCATED, + ] + ); + let field1_bool = stats_table + .unmasked_field(1) + .clone() + .execute::(&mut ctx) + .unwrap(); + assert_eq!( + field1_bool.to_bit_buffer(), + BitBuffer::from(vec![false, true]) + ); + let field3_bool = stats_table + .unmasked_field(3) + .clone() + .execute::(&mut ctx) + .unwrap(); + assert_eq!( + field3_bool.to_bit_buffer(), + BitBuffer::from(vec![true, false]) + ); + } + + #[test] + fn always_adds_is_truncated_column() { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let array = buffer![0, 1, 2].into_array(); + let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12); + acc.push_chunk(&array, &mut ctx) + .vortex_expect("push_chunk should succeed for test array"); + let stats_table = acc.as_array().unwrap().expect("Must have stats table"); + assert_eq!( + stats_table.names().as_ref(), + &[ + Stat::Max.name(), + MAX_IS_TRUNCATED, + Stat::Min.name(), + MIN_IS_TRUNCATED, + Stat::Sum.name(), + ] + ); + let field1_bool = stats_table + .unmasked_field(1) + .clone() + .execute::(&mut ctx) + .unwrap(); + assert_eq!(field1_bool.to_bit_buffer(), BitBuffer::from(vec![false])); + let field3_bool = stats_table + .unmasked_field(3) + .clone() + .execute::(&mut ctx) + .unwrap(); + assert_eq!(field3_bool.to_bit_buffer(), BitBuffer::from(vec![false])); + } +} diff --git a/vortex-layout/src/layouts/flat/mod.rs b/vortex-layout/src/layouts/flat/mod.rs index a652f82fe61..472e822038f 100644 --- a/vortex-layout/src/layouts/flat/mod.rs +++ b/vortex-layout/src/layouts/flat/mod.rs @@ -18,6 +18,7 @@ use vortex_error::vortex_panic; use vortex_session::VortexSession; use vortex_session::registry::ReadContext; +use crate::LayoutBuildContext; use crate::LayoutChildType; use crate::LayoutEncodingRef; use crate::LayoutId; @@ -104,7 +105,7 @@ impl VTable for Flat { metadata: &::Output, segment_ids: Vec, _children: &dyn LayoutChildren, - ctx: &ReadContext, + build_ctx: &LayoutBuildContext<'_>, ) -> VortexResult { if segment_ids.len() != 1 { vortex_bail!("Flat layout must have exactly one segment ID"); @@ -113,7 +114,7 @@ impl VTable for Flat { row_count, dtype.clone(), segment_ids[0], - ctx.clone(), + build_ctx.array_read_ctx.clone(), metadata .array_encoding_tree .as_ref() diff --git a/vortex-layout/src/layouts/foreign/mod.rs b/vortex-layout/src/layouts/foreign/mod.rs index 6e0a351b41b..491b8744262 100644 --- a/vortex-layout/src/layouts/foreign/mod.rs +++ b/vortex-layout/src/layouts/foreign/mod.rs @@ -9,9 +9,9 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_err; use vortex_session::VortexSession; -use vortex_session::registry::ReadContext; use crate::Layout; +use crate::LayoutBuildContext; use crate::LayoutChildType; use crate::LayoutChildren; use crate::LayoutEncoding; @@ -50,7 +50,7 @@ impl LayoutEncoding for ForeignLayoutEncoding { metadata: &[u8], segment_ids: Vec, children: &dyn LayoutChildren, - _ctx: &ReadContext, + _build_ctx: &LayoutBuildContext<'_>, ) -> VortexResult { let child_layouts = (0..children.nchildren()) .map(|idx| children.child(idx, dtype)) diff --git a/vortex-layout/src/layouts/struct_/mod.rs b/vortex-layout/src/layouts/struct_/mod.rs index 39ada8ccaec..97a14fb5d97 100644 --- a/vortex-layout/src/layouts/struct_/mod.rs +++ b/vortex-layout/src/layouts/struct_/mod.rs @@ -20,8 +20,8 @@ use vortex_error::vortex_ensure; use vortex_error::vortex_err; use vortex_session::SessionExt; use vortex_session::VortexSession; -use vortex_session::registry::ReadContext; +use crate::LayoutBuildContext; use crate::LayoutChildType; use crate::LayoutEncodingRef; use crate::LayoutId; @@ -132,7 +132,7 @@ impl VTable for Struct { _metadata: &::Output, _segment_ids: Vec, children: &dyn LayoutChildren, - _ctx: &ReadContext, + _build_ctx: &LayoutBuildContext<'_>, ) -> VortexResult { let struct_dt = dtype .as_struct_fields_opt() diff --git a/vortex-layout/src/layouts/zoned/builder.rs b/vortex-layout/src/layouts/zoned/builder.rs index 7d19a8bb666..bbe359f7485 100644 --- a/vortex-layout/src/layouts/zoned/builder.rs +++ b/vortex-layout/src/layouts/zoned/builder.rs @@ -3,66 +3,42 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::marker::PhantomData; use std::sync::Arc; use itertools::Itertools; use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; -use vortex_array::IntoArray; use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; -use vortex_array::aggregate_fn::fns::sum::sum; -use vortex_array::arrays::ConstantArray; +use vortex_array::aggregate_fn::AggregateFnRef; use vortex_array::arrays::StructArray; -use vortex_array::arrays::struct_::StructArrayExt; use vortex_array::builders::ArrayBuilder; -use vortex_array::builders::BoolBuilder; use vortex_array::builders::builder_with_capacity; use vortex_array::dtype::DType; use vortex_array::dtype::FieldName; -use vortex_array::dtype::Nullability; -use vortex_array::dtype::PType; -use vortex_array::expr::stats::Precision; -use vortex_array::expr::stats::Stat; -use vortex_array::expr::stats::StatsProvider; use vortex_array::scalar::Scalar; -use vortex_array::scalar::ScalarTruncation; -use vortex_array::scalar::lower_bound; -use vortex_array::scalar::upper_bound; -use vortex_array::stats::StatsSet; use vortex_array::validity::Validity; -use vortex_buffer::BufferString; -use vortex_buffer::ByteBuffer; use vortex_error::VortexResult; +use vortex_error::vortex_ensure_eq; -use crate::layouts::zoned::schema::MAX_IS_TRUNCATED; -use crate::layouts::zoned::schema::MIN_IS_TRUNCATED; +use crate::layouts::zoned::schema::aggregate_state_dtype; -/// Accumulates write-time statistics for each logical zone. -pub struct StatsAccumulator { - builders: Vec>, +/// Accumulates aggregate-function partials for each logical zone. +pub(crate) struct AggregateStatsAccumulator { + builders: Vec, length: usize, } -impl StatsAccumulator { - pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self { - if !supports_file_stats(dtype) { - return Self { - builders: Vec::new(), - length: 0, - }; - } - - let builders = stats +impl AggregateStatsAccumulator { + pub(crate) fn new(dtype: &DType, aggregate_fns: &[AggregateFnRef]) -> Self { + let builders = aggregate_fns .iter() - .filter_map(|&stat| { - stat.dtype(dtype).map(|stat_dtype| { - stats_builder_with_capacity( - stat, - &stat_dtype.as_nullable(), + .filter_map(|aggregate_fn| { + aggregate_state_dtype(dtype, aggregate_fn).map(|partial_dtype| { + AggregateStatsArrayBuilder::new( + aggregate_fn.clone(), + &partial_dtype.as_nullable(), 1024, - max_variable_length_statistics_size, ) }) }) @@ -74,49 +50,47 @@ impl StatsAccumulator { } } - pub fn push_chunk_without_compute(&mut self, array: &ArrayRef) -> VortexResult<()> { - for builder in &mut self.builders { - if let Precision::Exact(value) = array.statistics().get(builder.stat()) { - builder.append_scalar(value.cast(&value.dtype().as_nullable())?)?; - } else { - builder.append_null(); - } - } - self.length += 1; - Ok(()) + pub(crate) fn aggregate_fns(&self) -> Arc<[AggregateFnRef]> { + self.builders + .iter() + .map(|builder| builder.aggregate_fn.clone()) + .collect::>() + .into() } - pub fn push_chunk(&mut self, array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<()> { - for builder in &mut self.builders { - if let Some(value) = array.statistics().compute_stat(builder.stat(), ctx)? { - builder.append_scalar(value.cast(&value.dtype().as_nullable())?)?; - } else { - builder.append_null(); - } + pub(crate) fn push_partials(&mut self, partials: Vec) -> VortexResult<()> { + vortex_ensure_eq!( + partials.len(), + self.builders.len(), + "aggregate partial count must match zone stats builder count" + ); + + for (builder, value) in self.builders.iter_mut().zip_eq(partials) { + builder.append_scalar(value)?; } self.length += 1; Ok(()) } - pub fn as_array(&mut self) -> VortexResult)>> { + pub(crate) fn as_array( + &mut self, + ) -> VortexResult)>> { let mut names = Vec::new(); let mut fields = Vec::new(); - let mut stats = Vec::new(); + let mut aggregate_fns = Vec::new(); for builder in self .builders .iter_mut() - // We sort the stats so the DType is deterministic based on which stats are present. - .sorted_unstable_by_key(|builder| builder.stat()) + .sorted_unstable_by_key(|builder| builder.aggregate_fn.to_string()) { let values = builder.finish(); - // We drop any all-null stats columns. if values.all_invalid()? { continue; } - stats.push(builder.stat()); + aggregate_fns.push(builder.aggregate_fn.clone()); names.extend(values.names); fields.extend(values.arrays); } @@ -126,369 +100,61 @@ impl StatsAccumulator { } let array = StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)?; - Ok(Some((array, stats.into()))) - } - - /// Returns an aggregated stats set for the table. - pub fn as_stats_set( - &mut self, - stats: &[Stat], - ctx: &mut ExecutionCtx, - ) -> VortexResult { - let mut stats_set = StatsSet::default(); - let Some((array, _)) = self.as_array()? else { - return Ok(stats_set); - }; - - for &stat in stats { - let Some(array) = array.unmasked_field_by_name_opt(stat.name()) else { - continue; - }; - - // Different stats need different aggregations - match stat { - // For stats that are associative, we can just compute them over the stat column - Stat::Min | Stat::Max | Stat::Sum => { - if let Some(s) = array.statistics().compute_stat(stat, ctx)? - && let Some(v) = s.into_value() - { - stats_set.set(stat, Precision::exact(v)) - } - } - // These stats sum up - Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => { - if let Some(sum_value) = sum(array, ctx)? - .cast(&DType::Primitive(PType::U64, Nullability::Nullable))? - .into_value() - { - stats_set.set(stat, Precision::exact(sum_value)); - } - } - // We could implement these aggregations in the future, but for now they're unused - Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {} - } - } - Ok(stats_set) + Ok(Some((array, aggregate_fns.into()))) } } -fn supports_file_stats(dtype: &DType) -> bool { - !matches!(dtype, DType::Variant(_)) +pub(crate) fn aggregate_partials( + array: &ArrayRef, + aggregate_fns: &[AggregateFnRef], + ctx: &mut ExecutionCtx, +) -> VortexResult> { + aggregate_fns + .iter() + .map(|aggregate_fn| { + let mut accumulator = aggregate_fn.accumulator(array.dtype())?; + accumulator.accumulate(array, ctx)?; + accumulator.partial_scalar() + }) + .collect() } -fn stats_builder_with_capacity( - stat: Stat, - dtype: &DType, - capacity: usize, - max_length: usize, -) -> Box { - let values_builder = builder_with_capacity(dtype, capacity); - match stat { - Stat::Max => match dtype { - DType::Utf8(_) => Box::new(TruncatedMaxBinaryStatsBuilder::::new( - values_builder, - BoolBuilder::with_capacity(Nullability::NonNullable, capacity), - max_length, - )), - DType::Binary(_) => Box::new(TruncatedMaxBinaryStatsBuilder::::new( - values_builder, - BoolBuilder::with_capacity(Nullability::NonNullable, capacity), - max_length, - )), - _ => Box::new(StatNameArrayBuilder::new(stat, values_builder)), - }, - Stat::Min => match dtype { - DType::Utf8(_) => Box::new(TruncatedMinBinaryStatsBuilder::::new( - values_builder, - BoolBuilder::with_capacity(Nullability::NonNullable, capacity), - max_length, - )), - DType::Binary(_) => Box::new(TruncatedMinBinaryStatsBuilder::::new( - values_builder, - BoolBuilder::with_capacity(Nullability::NonNullable, capacity), - max_length, - )), - _ => Box::new(StatNameArrayBuilder::new(stat, values_builder)), - }, - _ => Box::new(StatNameArrayBuilder::new(stat, values_builder)), - } -} - -/// Arrays with their associated names, reduced version of a `StructArray`. -struct NamedArrays { - names: Vec, - arrays: Vec, -} - -impl NamedArrays { - fn all_invalid(&self) -> VortexResult { - // By convention the first array is the logical validity signal for the stat column. - self.arrays[0].all_invalid(&mut LEGACY_SESSION.create_execution_ctx()) - } -} - -trait StatsArrayBuilder: Send { - fn stat(&self) -> Stat; - - fn append_scalar(&mut self, value: Scalar) -> VortexResult<()>; - - fn append_null(&mut self); - - fn finish(&mut self) -> NamedArrays; -} - -struct StatNameArrayBuilder { - stat: Stat, +struct AggregateStatsArrayBuilder { + aggregate_fn: AggregateFnRef, + dtype: DType, builder: Box, } -impl StatNameArrayBuilder { - fn new(stat: Stat, builder: Box) -> Self { - Self { stat, builder } - } -} - -impl StatsArrayBuilder for StatNameArrayBuilder { - fn stat(&self) -> Stat { - self.stat - } - - fn append_scalar(&mut self, value: Scalar) -> VortexResult<()> { - self.builder.append_scalar(&value) - } - - fn append_null(&mut self) { - self.builder.append_null() - } - - fn finish(&mut self) -> NamedArrays { - let array = self.builder.finish(); - let len = array.len(); - match self.stat { - Stat::Max => NamedArrays { - names: vec![self.stat.name().into(), MAX_IS_TRUNCATED.into()], - arrays: vec![array, ConstantArray::new(false, len).into_array()], - }, - Stat::Min => NamedArrays { - names: vec![self.stat.name().into(), MIN_IS_TRUNCATED.into()], - arrays: vec![array, ConstantArray::new(false, len).into_array()], - }, - _ => NamedArrays { - names: vec![self.stat.name().into()], - arrays: vec![array], - }, - } - } -} - -struct TruncatedMaxBinaryStatsBuilder { - values: Box, - is_truncated: BoolBuilder, - max_value_length: usize, - _marker: PhantomData, -} - -impl TruncatedMaxBinaryStatsBuilder { - fn new( - values: Box, - is_truncated: BoolBuilder, - max_value_length: usize, - ) -> Self { - Self { - values, - is_truncated, - max_value_length, - _marker: PhantomData, - } - } -} - -struct TruncatedMinBinaryStatsBuilder { - values: Box, - is_truncated: BoolBuilder, - max_value_length: usize, - _marker: PhantomData, -} - -impl TruncatedMinBinaryStatsBuilder { - fn new( - values: Box, - is_truncated: BoolBuilder, - max_value_length: usize, - ) -> Self { +impl AggregateStatsArrayBuilder { + fn new(aggregate_fn: AggregateFnRef, dtype: &DType, capacity: usize) -> Self { Self { - values, - is_truncated, - max_value_length, - _marker: PhantomData, + aggregate_fn, + dtype: dtype.clone(), + builder: builder_with_capacity(dtype, capacity), } } -} - -impl StatsArrayBuilder for TruncatedMaxBinaryStatsBuilder { - fn stat(&self) -> Stat { - Stat::Max - } fn append_scalar(&mut self, value: Scalar) -> VortexResult<()> { - let nullability = value.dtype().nullability(); - if let Some((upper_bound, truncated)) = - upper_bound(T::from_scalar(value)?, self.max_value_length, nullability) - { - self.values.append_scalar(&upper_bound)?; - self.is_truncated.append_value(truncated); - } else { - self.append_null() - } - Ok(()) - } - - fn append_null(&mut self) { - ArrayBuilder::append_null(self.values.as_mut()); - self.is_truncated.append_value(false); + self.builder.append_scalar(&value.cast(&self.dtype)?) } fn finish(&mut self) -> NamedArrays { NamedArrays { - names: vec![Stat::Max.name().into(), MAX_IS_TRUNCATED.into()], - arrays: vec![ - ArrayBuilder::finish(self.values.as_mut()), - ArrayBuilder::finish(&mut self.is_truncated), - ], + names: vec![self.aggregate_fn.to_string().into()], + arrays: vec![self.builder.finish()], } } } -impl StatsArrayBuilder for TruncatedMinBinaryStatsBuilder { - fn stat(&self) -> Stat { - Stat::Min - } - - fn append_scalar(&mut self, value: Scalar) -> VortexResult<()> { - let nullability = value.dtype().nullability(); - if let Some((lower_bound, truncated)) = - lower_bound(T::from_scalar(value)?, self.max_value_length, nullability) - { - self.values.append_scalar(&lower_bound)?; - self.is_truncated.append_value(truncated); - } else { - self.append_null() - } - Ok(()) - } - - fn append_null(&mut self) { - ArrayBuilder::append_null(self.values.as_mut()); - self.is_truncated.append_value(false); - } - - fn finish(&mut self) -> NamedArrays { - NamedArrays { - names: vec![Stat::Min.name().into(), MIN_IS_TRUNCATED.into()], - arrays: vec![ - ArrayBuilder::finish(self.values.as_mut()), - ArrayBuilder::finish(&mut self.is_truncated), - ], - } - } +/// Arrays with their associated names, reduced version of a `StructArray`. +struct NamedArrays { + names: Vec, + arrays: Vec, } -#[cfg(test)] -mod tests { - use rstest::rstest; - use vortex_array::IntoArray; - use vortex_array::LEGACY_SESSION; - use vortex_array::VortexSessionExecute; - use vortex_array::arrays::BoolArray; - use vortex_array::arrays::bool::BoolArrayExt; - use vortex_array::arrays::struct_::StructArrayExt; - use vortex_array::builders::ArrayBuilder; - use vortex_array::builders::VarBinViewBuilder; - use vortex_array::dtype::DType; - use vortex_array::dtype::Nullability; - use vortex_array::expr::stats::Stat; - use vortex_buffer::BitBuffer; - use vortex_buffer::buffer; - use vortex_error::VortexExpect; - - use super::*; - - #[rstest] - #[case(DType::Utf8(Nullability::NonNullable))] - #[case(DType::Binary(Nullability::NonNullable))] - fn truncates_accumulated_stats(#[case] dtype: DType) { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); - let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2); - builder.append_value("Value to be truncated"); - builder.append_value("untruncated"); - let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2); - builder2.append_value("Another"); - builder2.append_value("wait a minute"); - let mut acc = - StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12); - acc.push_chunk(&builder.finish(), &mut ctx) - .vortex_expect("push_chunk should succeed for test data"); - acc.push_chunk(&builder2.finish(), &mut ctx) - .vortex_expect("push_chunk should succeed for test data"); - let (stats_table, _) = acc.as_array().unwrap().expect("Must have stats table"); - assert_eq!( - stats_table.names().as_ref(), - &[ - Stat::Max.name(), - MAX_IS_TRUNCATED, - Stat::Min.name(), - MIN_IS_TRUNCATED, - ] - ); - let field1_bool = stats_table - .unmasked_field(1) - .clone() - .execute::(&mut ctx) - .unwrap(); - assert_eq!( - field1_bool.to_bit_buffer(), - BitBuffer::from(vec![false, true]) - ); - let field3_bool = stats_table - .unmasked_field(3) - .clone() - .execute::(&mut ctx) - .unwrap(); - assert_eq!( - field3_bool.to_bit_buffer(), - BitBuffer::from(vec![true, false]) - ); - } - - #[test] - fn always_adds_is_truncated_column() { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); - let array = buffer![0, 1, 2].into_array(); - let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12); - acc.push_chunk(&array, &mut ctx) - .vortex_expect("push_chunk should succeed for test array"); - let (stats_table, _) = acc.as_array().unwrap().expect("Must have stats table"); - assert_eq!( - stats_table.names().as_ref(), - &[ - Stat::Max.name(), - MAX_IS_TRUNCATED, - Stat::Min.name(), - MIN_IS_TRUNCATED, - Stat::Sum.name(), - ] - ); - let field1_bool = stats_table - .unmasked_field(1) - .clone() - .execute::(&mut ctx) - .unwrap(); - assert_eq!(field1_bool.to_bit_buffer(), BitBuffer::from(vec![false])); - let field3_bool = stats_table - .unmasked_field(3) - .clone() - .execute::(&mut ctx) - .unwrap(); - assert_eq!(field3_bool.to_bit_buffer(), BitBuffer::from(vec![false])); +impl NamedArrays { + fn all_invalid(&self) -> VortexResult { + // By convention the first array is the logical validity signal for the stat column. + self.arrays[0].all_invalid(&mut LEGACY_SESSION.create_execution_ctx()) } } diff --git a/vortex-layout/src/layouts/zoned/mod.rs b/vortex-layout/src/layouts/zoned/mod.rs index 43f9410e831..fa5bd1a2b46 100644 --- a/vortex-layout/src/layouts/zoned/mod.rs +++ b/vortex-layout/src/layouts/zoned/mod.rs @@ -4,9 +4,9 @@ //! - a transparent `data` child containing the underlying column data //! - an auxiliary `zones` child containing one row of aggregate statistics per zone //! -//! Metadata stores the logical zone length in rows plus the sorted list of statistics present in -//! the auxiliary table. During scans, pruning first evaluates a falsification predicate against -//! the `zones` child and only forwards surviving rows to the underlying `data` child. +//! Metadata stores the logical zone length in rows plus the aggregate functions present in the +//! auxiliary table. During scans, pruning first evaluates a falsification predicate against the +//! `zones` child and only forwards surviving rows to the underlying `data` child. // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors @@ -20,11 +20,14 @@ pub mod zone_map; use std::sync::Arc; -pub(crate) use builder::StatsAccumulator; +pub(crate) use builder::AggregateStatsAccumulator; +pub(crate) use builder::aggregate_partials; +use prost::Message; pub use schema::MAX_IS_TRUNCATED; pub use schema::MIN_IS_TRUNCATED; use vortex_array::DeserializeMetadata; use vortex_array::SerializeMetadata; +use vortex_array::aggregate_fn::AggregateFnRef; use vortex_array::dtype::DType; use vortex_array::dtype::TryFromBytes; use vortex_array::expr::stats::Stat; @@ -37,8 +40,8 @@ use vortex_error::vortex_ensure; use vortex_error::vortex_ensure_eq; use vortex_error::vortex_panic; use vortex_session::VortexSession; -use vortex_session::registry::ReadContext; +use crate::LayoutBuildContext; use crate::LayoutChildType; use crate::LayoutEncodingRef; use crate::LayoutId; @@ -48,12 +51,17 @@ use crate::VTable; use crate::children::LayoutChildren; use crate::children::OwnedLayoutChildren; use crate::layouts::zoned::reader::ZonedReader; -use crate::layouts::zoned::schema::stats_table_dtype; +use crate::layouts::zoned::schema::AggregateSpecProto; +use crate::layouts::zoned::schema::aggregate_fns_from_specs; +use crate::layouts::zoned::schema::aggregate_specs_from_fns; +use crate::layouts::zoned::schema::aggregate_stats_table_dtype; +use crate::layouts::zoned::schema::legacy_stats_table_dtype; use crate::segments::SegmentId; use crate::segments::SegmentSource; use crate::vtable; vtable!(Zoned); +vtable!(LegacyStats); impl VTable for Zoned { type Layout = ZonedLayout; @@ -61,8 +69,7 @@ impl VTable for Zoned { type Metadata = ZonedMetadata; fn id(_encoding: &Self::Encoding) -> LayoutId { - // For legacy reasons the serialized layout encoding ID is still `vortex.stats`. - LayoutId::new("vortex.stats") + LayoutId::new("vortex.zoned") } fn encoding(_layout: &Self::Layout) -> LayoutEncodingRef { @@ -80,7 +87,16 @@ impl VTable for Zoned { fn metadata(layout: &Self::Layout) -> Self::Metadata { ZonedMetadata { zone_len: u32::try_from(layout.zone_len).vortex_expect("Invalid zone length"), - present_stats: Arc::clone(&layout.present_stats), + aggregate_specs: match &layout.zone_map_schema { + ZoneMapSchema::AggregateFns(aggregate_fns) => { + aggregate_specs_from_fns(aggregate_fns).vortex_expect( + "aggregate functions should be validated as serializable during build", + ) + } + ZoneMapSchema::LegacyStats(_) => { + vortex_panic!("Cannot serialize legacy stats schema as vortex.zoned") + } + }, } } @@ -95,9 +111,7 @@ impl VTable for Zoned { fn child(layout: &Self::Layout, idx: usize) -> VortexResult { match idx { 0 => layout.children.child(0, layout.dtype()), - 1 => layout - .children - .child(1, &stats_table_dtype(layout.dtype(), &layout.present_stats)), + 1 => layout.children.child(1, &layout.stats_table_dtype), _ => vortex_bail!("Invalid child index: {}", idx), } } @@ -133,18 +147,22 @@ impl VTable for Zoned { metadata: &ZonedMetadata, _segment_ids: Vec, children: &dyn LayoutChildren, - _ctx: &ReadContext, + build_ctx: &LayoutBuildContext<'_>, ) -> VortexResult { vortex_ensure_eq!( children.nchildren(), 2, "ZonedLayout expects exactly 2 children (data, zones)" ); + let aggregate_fns = aggregate_fns_from_specs(&metadata.aggregate_specs, build_ctx.session)?; + aggregate_specs_from_fns(&aggregate_fns)?; + let stats_table_dtype = aggregate_stats_table_dtype(dtype, &aggregate_fns); Ok(ZonedLayout { dtype: dtype.clone(), children: children.to_arc(), zone_len: metadata.zone_len as usize, - present_stats: Arc::clone(&metadata.present_stats), + zone_map_schema: ZoneMapSchema::AggregateFns(aggregate_fns), + stats_table_dtype, }) } @@ -160,10 +178,110 @@ impl VTable for Zoned { } } +// TODO: This legacy vtable is only needed until layouts move onto the new vtable structure, where +// a LayoutPlugin can deserialize legacy `vortex.stats` metadata directly into `vortex.zoned`. +impl VTable for LegacyStats { + type Layout = LegacyStatsLayout; + type Encoding = LegacyStatsLayoutEncoding; + type Metadata = LegacyStatsMetadata; + + fn id(_encoding: &Self::Encoding) -> LayoutId { + LayoutId::new("vortex.stats") + } + + fn encoding(_layout: &Self::Layout) -> LayoutEncodingRef { + LayoutEncodingRef::new_ref(LegacyStatsLayoutEncoding.as_ref()) + } + + fn row_count(layout: &Self::Layout) -> u64 { + ::row_count(&layout.0) + } + + fn dtype(layout: &Self::Layout) -> &DType { + ::dtype(&layout.0) + } + + fn metadata(layout: &Self::Layout) -> Self::Metadata { + LegacyStatsMetadata { + zone_len: u32::try_from(layout.0.zone_len).vortex_expect("Invalid zone length"), + zone_map_schema: layout.0.zone_map_schema.clone(), + } + } + + fn segment_ids(layout: &Self::Layout) -> Vec { + ::segment_ids(&layout.0) + } + + fn nchildren(layout: &Self::Layout) -> usize { + ::nchildren(&layout.0) + } + + fn child(layout: &Self::Layout, idx: usize) -> VortexResult { + ::child(&layout.0, idx) + } + + fn child_type(layout: &Self::Layout, idx: usize) -> LayoutChildType { + ::child_type(&layout.0, idx) + } + + fn new_reader( + layout: &Self::Layout, + name: Arc, + segment_source: Arc, + session: &VortexSession, + ctx: &crate::LayoutReaderContext, + ) -> VortexResult { + Ok(Arc::new(ZonedReader::try_new( + layout.0.clone(), + name, + segment_source, + session.clone(), + ctx.clone(), + )?)) + } + + fn build( + _encoding: &Self::Encoding, + dtype: &DType, + _row_count: u64, + metadata: &LegacyStatsMetadata, + _segment_ids: Vec, + children: &dyn LayoutChildren, + _build_ctx: &LayoutBuildContext<'_>, + ) -> VortexResult { + vortex_ensure_eq!( + children.nchildren(), + 2, + "LegacyStatsLayout expects exactly 2 children (data, zones)" + ); + let stats_table_dtype = match &metadata.zone_map_schema { + ZoneMapSchema::LegacyStats(stats) => legacy_stats_table_dtype(dtype, stats), + ZoneMapSchema::AggregateFns(aggregate_fns) => { + aggregate_stats_table_dtype(dtype, aggregate_fns) + } + }; + Ok(LegacyStatsLayout(ZonedLayout { + dtype: dtype.clone(), + children: children.to_arc(), + zone_len: metadata.zone_len as usize, + zone_map_schema: metadata.zone_map_schema.clone(), + stats_table_dtype, + })) + } + + fn with_children(layout: &mut Self::Layout, children: Vec) -> VortexResult<()> { + ::with_children(&mut layout.0, children) + } +} + /// Encoding marker for the zoned layout. #[derive(Debug)] pub struct ZonedLayoutEncoding; +/// Encoding marker for the legacy `vortex.stats` zoned layout. +#[derive(Debug)] +pub struct LegacyStatsLayoutEncoding; + /// A layout that annotates a data child with one row of aggregate statistics per zone. /// /// The first child is the underlying data layout. The second child is an auxiliary stats table @@ -174,29 +292,51 @@ pub struct ZonedLayout { dtype: DType, children: Arc, zone_len: usize, - present_stats: Arc<[Stat]>, + zone_map_schema: ZoneMapSchema, + stats_table_dtype: DType, +} + +/// A legacy `vortex.stats` layout backed by the shared zoned runtime implementation. +#[derive(Clone, Debug)] +pub struct LegacyStatsLayout(ZonedLayout); + +impl LegacyStatsLayout { + /// Returns display names for the zone-map aggregates stored by this layout. + pub fn present_aggregates(&self) -> Arc<[String]> { + self.0.present_aggregates() + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum ZoneMapSchema { + LegacyStats(Arc<[Stat]>), + AggregateFns(Arc<[AggregateFnRef]>), } impl ZonedLayout { - pub fn new( + /// Create a zoned layout from a data child, a zone-map child, a zone length, and the aggregate + /// functions stored in the zone map. + pub fn try_new( data: LayoutRef, zones: LayoutRef, zone_len: usize, - present_stats: Arc<[Stat]>, - ) -> Self { - if zone_len == 0 { - vortex_panic!("Zone length must be greater than 0"); - } - let expected_dtype = stats_table_dtype(data.dtype(), &present_stats); + aggregate_fns: Arc<[AggregateFnRef]>, + ) -> VortexResult { + vortex_ensure!(zone_len > 0, "Zone length must be greater than 0"); + + let expected_dtype = aggregate_stats_table_dtype(data.dtype(), &aggregate_fns); if zones.dtype() != &expected_dtype { - vortex_panic!("Invalid zone map layout: zones dtype does not match expected dtype"); + vortex_bail!("Invalid zone map layout: zones dtype does not match expected dtype"); } - Self { + aggregate_specs_from_fns(&aggregate_fns)?; + + Ok(Self { dtype: data.dtype().clone(), children: OwnedLayoutChildren::layout_children(vec![data, zones]), zone_len, - present_stats, - } + zone_map_schema: ZoneMapSchema::AggregateFns(aggregate_fns), + stats_table_dtype: expected_dtype, + }) } pub fn nzones(&self) -> usize { @@ -207,29 +347,115 @@ impl ZonedLayout { self.zone_len } - /// Returns an array of stats that exist in the layout's data, must be sorted. - pub fn present_stats(&self) -> &Arc<[Stat]> { - &self.present_stats + /// Returns display names for the zone-map aggregates stored by this layout. + pub fn present_aggregates(&self) -> Arc<[String]> { + match &self.zone_map_schema { + ZoneMapSchema::LegacyStats(stats) => stats + .iter() + .filter_map(Stat::aggregate_fn) + .map(|aggregate_fn| aggregate_fn.to_string()) + .collect::>() + .into(), + ZoneMapSchema::AggregateFns(aggregate_fns) => aggregate_fns + .iter() + .map(ToString::to_string) + .collect::>() + .into(), + } + } + + pub(super) fn aggregate_fns( + &self, + _session: &VortexSession, + ) -> VortexResult> { + match &self.zone_map_schema { + ZoneMapSchema::LegacyStats(stats) => Ok(stats + .iter() + .filter_map(Stat::aggregate_fn) + .collect::>() + .into()), + ZoneMapSchema::AggregateFns(aggregate_fns) => Ok(Arc::clone(aggregate_fns)), + } + } + + pub(super) fn stats_table_dtype_for(&self, aggregate_fns: &[AggregateFnRef]) -> DType { + if let ZoneMapSchema::LegacyStats(stats) = &self.zone_map_schema { + return legacy_stats_table_dtype(&self.dtype, stats); + } + + aggregate_stats_table_dtype(&self.dtype, aggregate_fns) } } /// Serialized zoned-layout metadata. /// -/// `zone_len` is the logical row length of each zone. `present_stats` is the sorted list of -/// statistics stored in the auxiliary stats-table child. +/// `zone_len` is the logical row length of each zone. `aggregate_specs` is the ordered list of +/// aggregate functions stored in the auxiliary stats-table child. #[derive(Debug, PartialEq, Eq, Clone)] pub struct ZonedMetadata { pub(super) zone_len: u32, - pub(super) present_stats: Arc<[Stat]>, + pub(super) aggregate_specs: Arc<[AggregateSpecProto]>, +} + +/// Serialized metadata for legacy `vortex.stats` layouts. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct LegacyStatsMetadata { + pub(super) zone_len: u32, + pub(crate) zone_map_schema: ZoneMapSchema, +} + +const ZONED_METADATA_PROTO_VERSION: u8 = 1; + +#[derive(Clone, PartialEq, Message)] +struct ZonedMetadataProto { + #[prost(uint32, tag = "1")] + zone_len: u32, + #[prost(message, repeated, tag = "2")] + aggregate_specs: Vec, } impl DeserializeMetadata for ZonedMetadata { type Output = Self; + fn deserialize(metadata: &[u8]) -> VortexResult { + let Some((&version, proto_bytes)) = metadata.split_first() else { + vortex_bail!("Zoned metadata missing protobuf version"); + }; + + vortex_ensure!( + version == ZONED_METADATA_PROTO_VERSION, + "Unsupported zoned metadata version: {}", + version + ); + vortex_ensure!(!proto_bytes.is_empty(), "Zoned metadata missing protobuf"); + + let proto = ZonedMetadataProto::decode(proto_bytes)?; + Ok(Self { + zone_len: proto.zone_len, + aggregate_specs: proto.aggregate_specs.into(), + }) + } +} + +impl SerializeMetadata for ZonedMetadata { + fn serialize(self) -> Vec { + let proto = ZonedMetadataProto { + zone_len: self.zone_len, + aggregate_specs: self.aggregate_specs.to_vec(), + }; + let mut metadata = vec![ZONED_METADATA_PROTO_VERSION]; + metadata.extend(proto.encode_to_vec()); + metadata + } +} + +impl DeserializeMetadata for LegacyStatsMetadata { + type Output = Self; + fn deserialize(metadata: &[u8]) -> VortexResult { vortex_ensure!( metadata.len() >= 4, - "Zoned metadata must contain at least 4 bytes for zone length, got {}", + "Legacy zoned metadata must contain at least 4 bytes for zone length, got {}", metadata.len() ); @@ -241,19 +467,23 @@ impl DeserializeMetadata for ZonedMetadata { Ok(Self { zone_len, - present_stats, + zone_map_schema: ZoneMapSchema::LegacyStats(present_stats), }) } } -impl SerializeMetadata for ZonedMetadata { +impl SerializeMetadata for LegacyStatsMetadata { fn serialize(self) -> Vec { - let mut metadata = vec![]; - // First, write the block size to the metadata. - metadata.extend_from_slice(&self.zone_len.to_le_bytes()); - // Then write the bit-set of statistics. - metadata.extend_from_slice(&as_stat_bitset_bytes(&self.present_stats)); - metadata + match self.zone_map_schema { + ZoneMapSchema::LegacyStats(stats) => { + let mut metadata = self.zone_len.to_le_bytes().to_vec(); + metadata.extend(as_stat_bitset_bytes(&stats)); + metadata + } + ZoneMapSchema::AggregateFns(_) => { + vortex_panic!("Cannot serialize aggregate specs as legacy stats metadata") + } + } } } @@ -262,9 +492,19 @@ mod tests { use std::panic; use rstest::rstest; + use vortex_array::aggregate_fn::AggregateFnRef; + use vortex_array::aggregate_fn::AggregateFnVTableExt; + use vortex_array::aggregate_fn::EmptyOptions; + use vortex_array::aggregate_fn::fns::bounded_max::BoundedMax; + use vortex_array::aggregate_fn::fns::bounded_max::BoundedMaxOptions; + use vortex_array::aggregate_fn::fns::max::Max; + use vortex_array::aggregate_fn::fns::min::Min; + use vortex_array::aggregate_fn::session::AggregateFnSession; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; use vortex_array::dtype::PType; + use vortex_array::stats::as_stat_bitset_bytes; + use vortex_session::VortexSession; use vortex_session::registry::ReadContext; use super::*; @@ -273,46 +513,73 @@ mod tests { use crate::layouts::flat::FlatLayout; use crate::segments::SegmentId; + fn aggregate_spec(aggregate_fn: AggregateFnRef) -> AggregateSpecProto { + AggregateSpecProto::try_from_aggregate_fn(&aggregate_fn).unwrap() + } + #[rstest] #[case(ZonedMetadata { zone_len: u32::MAX, - present_stats: Arc::new([]), - })] - #[case::all_sorted(ZonedMetadata { - zone_len: 314, - present_stats: Arc::new([Stat::IsConstant, Stat::IsSorted, Stat::IsStrictSorted, Stat::Max, Stat::Min, Stat::Sum, Stat::NullCount, Stat::UncompressedSizeInBytes, Stat::NaNCount]), + aggregate_specs: Arc::new([]), })] - #[case::some_sorted(ZonedMetadata { + #[case::min_max(ZonedMetadata { zone_len: 314, - present_stats: Arc::new([Stat::IsSorted, Stat::IsStrictSorted, Stat::Max, Stat::Min, Stat::Sum, Stat::NullCount, Stat::UncompressedSizeInBytes, Stat::NaNCount]), + aggregate_specs: Arc::new([ + aggregate_spec(Max.bind(EmptyOptions)), + aggregate_spec(Min.bind(EmptyOptions)), + ]), })] fn test_metadata_serialization(#[case] metadata: ZonedMetadata) { let serialized = metadata.clone().serialize(); + assert_eq!(serialized[0], ZONED_METADATA_PROTO_VERSION); let deserialized = ZonedMetadata::deserialize(&serialized).unwrap(); assert_eq!(deserialized, metadata); } #[test] - fn test_deserialize_unsorted_stats() { + fn test_metadata_serialization_preserves_aggregate_options() -> VortexResult<()> { + let aggregate_fn = BoundedMax.bind(BoundedMaxOptions { + // SAFETY: 128 is non-zero. + max_bytes: unsafe { std::num::NonZeroUsize::new_unchecked(128) }, + }); let metadata = ZonedMetadata { - zone_len: u32::MAX, - present_stats: Arc::new([Stat::IsStrictSorted, Stat::IsSorted]), + zone_len: 314, + aggregate_specs: Arc::new([AggregateSpecProto::try_from_aggregate_fn(&aggregate_fn)?]), }; - let serialized = metadata.clone().serialize(); - let deserialized = ZonedMetadata::deserialize(&serialized).unwrap(); - assert!(deserialized.present_stats.is_sorted()); + + let deserialized = ZonedMetadata::deserialize(&metadata.serialize())?; + let session = VortexSession::empty().with::(); + let aggregate_fns = aggregate_fns_from_specs(&deserialized.aggregate_specs, &session)?; + + assert_eq!(aggregate_fns.as_ref(), std::slice::from_ref(&aggregate_fn)); + Ok(()) + } + + #[test] + fn test_deserialize_legacy_stat_bitset_as_legacy_stats() { + let mut serialized = u32::MAX.to_le_bytes().to_vec(); + serialized.extend(as_stat_bitset_bytes(&[ + Stat::IsStrictSorted, + Stat::IsSorted, + Stat::Max, + ])); + let deserialized = LegacyStatsMetadata::deserialize(&serialized).unwrap(); + let ZoneMapSchema::LegacyStats(legacy_stats) = deserialized.zone_map_schema else { + panic!("legacy bitset metadata should deserialize as legacy stats"); + }; + + assert!(legacy_stats.is_sorted()); assert_eq!( - deserialized.present_stats.len(), - metadata.present_stats.len() + legacy_stats.as_ref(), + &[Stat::IsSorted, Stat::IsStrictSorted, Stat::Max] ); - assert_ne!(deserialized.present_stats, metadata.present_stats); } #[rstest] - #[case(vec![])] - #[case(vec![0])] - #[case(vec![0, 0])] - #[case(vec![0, 0, 0])] + #[case::empty(vec![])] + #[case::unsupported_version(vec![0])] + #[case::missing_proto(vec![ZONED_METADATA_PROTO_VERSION])] + #[case::malformed_proto(vec![ZONED_METADATA_PROTO_VERSION, 0])] fn test_deserialize_short_metadata_errors(#[case] metadata: Vec) { assert!(ZonedMetadata::deserialize(&metadata).is_err()); } @@ -330,9 +597,12 @@ mod tests { #[test] fn test_deserialize_zero_zone_len_is_allowed_for_backcompat() { let metadata = 0u32.to_le_bytes(); - let deserialized = ZonedMetadata::deserialize(&metadata).unwrap(); + let deserialized = LegacyStatsMetadata::deserialize(&metadata).unwrap(); assert_eq!(deserialized.zone_len, 0); - assert!(deserialized.present_stats.is_empty()); + let ZoneMapSchema::LegacyStats(legacy_stats) = deserialized.zone_map_schema else { + panic!("legacy bitset metadata should deserialize as legacy stats"); + }; + assert!(legacy_stats.is_empty()); } #[test] @@ -343,27 +613,33 @@ mod tests { FlatLayout::new(0, dtype.clone(), SegmentId::from(0), read_ctx.clone()).into_layout(), FlatLayout::new( 0, - stats_table_dtype(&dtype, &[]), + legacy_stats_table_dtype(&dtype, &[]), SegmentId::from(1), read_ctx, ) .into_layout(), ]); + let session = vortex_array::array_session(); + let build_read_ctx = ReadContext::new([]); + let build_ctx = LayoutBuildContext { + session: &session, + array_read_ctx: &build_read_ctx, + }; - let layout = ::build( - &ZonedLayoutEncoding, + let layout = ::build( + &LegacyStatsLayoutEncoding, &dtype, 0, - &ZonedMetadata { + &LegacyStatsMetadata { zone_len: 0, - present_stats: Arc::new([]), + zone_map_schema: ZoneMapSchema::LegacyStats(Arc::new([])), }, vec![], children.as_ref(), - &ReadContext::new([]), + &build_ctx, )?; - assert_eq!(layout.zone_len, 0); + assert_eq!(layout.0.zone_len, 0); Ok(()) } @@ -371,9 +647,15 @@ mod tests { fn test_build_rejects_invalid_child_count() { let metadata = ZonedMetadata { zone_len: 3, - present_stats: Arc::new([]), + aggregate_specs: Arc::new([]), }; let children = OwnedLayoutChildren::layout_children(vec![]); + let session = vortex_array::array_session(); + let build_read_ctx = ReadContext::new([]); + let build_ctx = LayoutBuildContext { + session: &session, + array_read_ctx: &build_read_ctx, + }; let result = ::build( &ZonedLayoutEncoding, @@ -382,7 +664,7 @@ mod tests { &metadata, vec![], children.as_ref(), - &ReadContext::new([]), + &build_ctx, ); assert!(result.is_err()); diff --git a/vortex-layout/src/layouts/zoned/pruning.rs b/vortex-layout/src/layouts/zoned/pruning.rs index 2f048865bf7..b517df985c9 100644 --- a/vortex-layout/src/layouts/zoned/pruning.rs +++ b/vortex-layout/src/layouts/zoned/pruning.rs @@ -15,6 +15,7 @@ use parking_lot::RwLock; use tracing::trace; use vortex_array::MaskFuture; use vortex_array::VortexSessionExecute; +use vortex_array::aggregate_fn::AggregateFnRef; use vortex_array::arrays::StructArray; use vortex_array::dtype::DType; use vortex_array::expr::Expression; @@ -41,6 +42,7 @@ pub(super) struct PruningState { row_count: u64, zone_len: u64, dtype: DType, + aggregate_fns: Arc<[AggregateFnRef]>, lazy_children: Arc, session: VortexSession, @@ -52,6 +54,7 @@ pub(super) struct PruningState { impl PruningState { pub(super) fn new( layout: &ZonedLayout, + aggregate_fns: Arc<[AggregateFnRef]>, lazy_children: Arc, session: VortexSession, ) -> Self { @@ -60,6 +63,7 @@ impl PruningState { row_count: layout.row_count(), zone_len: layout.zone_len() as u64, dtype: layout.dtype().clone(), + aggregate_fns, lazy_children, session, pruning_result: Default::default(), @@ -144,13 +148,22 @@ impl PruningState { let zone_len = self.zone_len; let row_count = self.row_count; let dtype = self.dtype.clone(); + let aggregate_fns = Arc::clone(&self.aggregate_fns); async move { let mut ctx = session.create_execution_ctx(); let zones_array = zones_eval.await?.execute::(&mut ctx)?; // SAFETY: zoned layout validation checked that this zones child was - // written from the same column dtype and stats-table schema. - Ok(unsafe { ZoneMap::new_unchecked(dtype, zones_array, zone_len, row_count) }) + // written from the same column dtype and aggregate stats-table schema. + Ok(unsafe { + ZoneMap::new_unchecked( + dtype, + zones_array, + aggregate_fns, + zone_len, + row_count, + ) + }) } .map_err(Arc::new) .boxed() diff --git a/vortex-layout/src/layouts/zoned/reader.rs b/vortex-layout/src/layouts/zoned/reader.rs index e0e42117aff..f6346d0760f 100644 --- a/vortex-layout/src/layouts/zoned/reader.rs +++ b/vortex-layout/src/layouts/zoned/reader.rs @@ -26,7 +26,6 @@ use crate::RowSplits; use crate::SplitRange; use crate::layouts::zoned::ZonedLayout; use crate::layouts::zoned::pruning::PruningState; -use crate::layouts::zoned::schema::stats_table_dtype; use crate::segments::SegmentSource; pub struct ZonedReader { @@ -44,9 +43,10 @@ impl ZonedReader { session: VortexSession, ctx: crate::LayoutReaderContext, ) -> VortexResult { + let aggregate_fns = layout.aggregate_fns(&session)?; let dtypes = vec![ layout.dtype.clone(), - stats_table_dtype(layout.dtype(), layout.present_stats()), + layout.stats_table_dtype_for(&aggregate_fns), ]; let names = vec![Arc::clone(&name), format!("{}.zones", name).into()]; let lazy_children = Arc::new(LazyReaderChildren::new( @@ -59,7 +59,7 @@ impl ZonedReader { )); Ok(Self { - pruning: PruningState::new(&layout, Arc::clone(&lazy_children), session), + pruning: PruningState::new(&layout, aggregate_fns, Arc::clone(&lazy_children), session), layout, name, lazy_children, @@ -227,10 +227,13 @@ mod test { use vortex_array::IntoArray; use vortex_array::MaskFuture; use vortex_array::arrays::ChunkedArray; + use vortex_array::arrays::PrimitiveArray; use vortex_array::assert_arrays_eq; use vortex_array::expr::gt; + use vortex_array::expr::is_not_null; use vortex_array::expr::lit; use vortex_array::expr::root; + use vortex_array::validity::Validity; use vortex_buffer::buffer; use vortex_error::VortexResult; use vortex_io::runtime::Handle; @@ -242,15 +245,17 @@ mod test { use vortex_session::registry::ReadContext; use crate::IntoLayout; + use crate::LayoutBuildContext; use crate::LayoutRef; use crate::LayoutStrategy; use crate::VTable; use crate::children::OwnedLayoutChildren; use crate::layouts::chunked::writer::ChunkedLayoutStrategy; use crate::layouts::flat::writer::FlatLayoutStrategy; + use crate::layouts::zoned::LegacyStats; + use crate::layouts::zoned::LegacyStatsLayoutEncoding; + use crate::layouts::zoned::LegacyStatsMetadata; use crate::layouts::zoned::Zoned; - use crate::layouts::zoned::ZonedLayoutEncoding; - use crate::layouts::zoned::ZonedMetadata; use crate::layouts::zoned::writer::ZonedLayoutOptions; use crate::layouts::zoned::writer::ZonedStrategy; use crate::segments::SegmentSource; @@ -353,6 +358,70 @@ mod test { }) } + #[test] + fn test_default_zoned_null_count_pruning_mask() { + let ctx = ArrayContext::empty(); + let segments = Arc::new(TestSegments::default()); + let (ptr, eof) = SequenceId::root().split(); + let strategy = ZonedStrategy::new( + ChunkedLayoutStrategy::new(FlatLayoutStrategy::default()), + FlatLayoutStrategy::default(), + ZonedLayoutOptions { + block_size: 3, + ..Default::default() + }, + ); + let array_stream = ChunkedArray::from_iter([ + PrimitiveArray::new( + buffer![0i32, 0, 0], + Validity::from_iter([false, false, false]), + ) + .into_array(), + PrimitiveArray::new(buffer![1i32, 2, 3], Validity::from_iter([true, true, true])) + .into_array(), + PrimitiveArray::new( + buffer![0i32, 0, 0], + Validity::from_iter([false, false, false]), + ) + .into_array(), + ]) + .into_array() + .to_array_stream() + .sequenced(ptr); + let segments2 = Arc::::clone(&segments); + + let layout = block_on(|handle| async move { + let session = session_with_handle(handle); + strategy + .write_stream(ctx, segments2, array_stream, eof, &session) + .await + }) + .unwrap(); + + block_on(|handle| async { + let row_count = layout.row_count(); + let session = session_with_handle(handle); + let reader = layout + .new_reader("".into(), segments, &session, &Default::default()) + .unwrap(); + + let result = reader + .pruning_evaluation( + &(0..row_count), + &is_not_null(root()), + Mask::new_true(row_count.try_into().unwrap()), + ) + .unwrap() + .await + .unwrap(); + + assert_eq!( + result, + Mask::from_iter([false, false, false, true, true, true, false, false, false]) + ); + }) + } + #[rstest] fn test_legacy_zero_zone_len_skips_zoned_pruning( #[from(stats_layout)] (segments, layout): (Arc, LayoutRef), @@ -360,17 +429,23 @@ mod test { let zoned_layout = layout.as_::(); let children = OwnedLayoutChildren::layout_children(vec![layout.child(0)?, layout.child(1)?]); - let legacy_layout = ::build( - &ZonedLayoutEncoding, + let session = vortex_array::array_session(); + let read_ctx = ReadContext::new([]); + let build_ctx = LayoutBuildContext { + session: &session, + array_read_ctx: &read_ctx, + }; + let legacy_layout = ::build( + &LegacyStatsLayoutEncoding, layout.dtype(), layout.row_count(), - &ZonedMetadata { + &LegacyStatsMetadata { zone_len: 0, - present_stats: Arc::clone(zoned_layout.present_stats()), + zone_map_schema: zoned_layout.zone_map_schema.clone(), }, vec![], children.as_ref(), - &ReadContext::new([]), + &build_ctx, )? .into_layout(); diff --git a/vortex-layout/src/layouts/zoned/schema.rs b/vortex-layout/src/layouts/zoned/schema.rs index e14388ba379..6d921e7de8f 100644 --- a/vortex-layout/src/layouts/zoned/schema.rs +++ b/vortex-layout/src/layouts/zoned/schema.rs @@ -3,16 +3,80 @@ //! Shared helpers for the zoned layout's auxiliary stats-table schema. +use std::sync::Arc; + +use vortex_array::aggregate_fn::AggregateFnId; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::session::AggregateFnSessionExt; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; use vortex_array::dtype::StructFields; use vortex_array::expr::stats::Stat; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_session::VortexSession; pub const MAX_IS_TRUNCATED: &str = "max_is_truncated"; pub const MIN_IS_TRUNCATED: &str = "min_is_truncated"; +#[derive(Clone, PartialEq, Eq, ::prost::Message)] +pub(crate) struct AggregateSpecProto { + #[prost(string, tag = "1")] + id: String, + #[prost(bytes, tag = "2")] + options: Vec, +} + +impl AggregateSpecProto { + pub(crate) fn try_from_aggregate_fn(aggregate_fn: &AggregateFnRef) -> VortexResult { + let options = aggregate_fn.options().serialize()?.ok_or_else(|| { + vortex_err!( + "Aggregate function '{}' is not serializable", + aggregate_fn.id() + ) + })?; + + Ok(Self { + id: aggregate_fn.id().to_string(), + options, + }) + } + + pub(crate) fn to_aggregate_fn(&self, session: &VortexSession) -> VortexResult { + let aggregate_fn_id = AggregateFnId::new(self.id.as_str()); + let Some(plugin) = session.aggregate_fns().find_plugin(&aggregate_fn_id) else { + vortex_bail!("unknown aggregate function id: {}", self.id); + }; + + let aggregate_fn = plugin.deserialize(&self.options, session)?; + if aggregate_fn.id() != aggregate_fn_id { + vortex_bail!( + "Aggregate function ID mismatch: expected {}, got {}", + aggregate_fn_id, + aggregate_fn.id() + ); + } + + Ok(aggregate_fn) + } +} + /// Return the auxiliary stats-table schema for a zoned layout. -pub(crate) fn stats_table_dtype(column_dtype: &DType, present_stats: &[Stat]) -> DType { +pub(crate) fn aggregate_stats_table_dtype( + column_dtype: &DType, + aggregate_fns: &[AggregateFnRef], +) -> DType { + DType::Struct( + StructFields::from_iter(aggregate_fns.iter().filter_map(|aggregate_fn| { + aggregate_state_dtype(column_dtype, aggregate_fn) + .map(|dtype| (aggregate_fn.to_string(), dtype.as_nullable())) + })), + Nullability::NonNullable, + ) +} + +pub(crate) fn legacy_stats_table_dtype(column_dtype: &DType, present_stats: &[Stat]) -> DType { assert!(present_stats.is_sorted(), "Stats must be sorted"); DType::Struct( StructFields::from_iter( @@ -47,8 +111,52 @@ pub(crate) fn stats_table_dtype(column_dtype: &DType, present_stats: &[Stat]) -> ) } +pub(crate) fn aggregate_specs_from_fns( + aggregate_fns: &[AggregateFnRef], +) -> VortexResult> { + aggregate_fns + .iter() + .map(AggregateSpecProto::try_from_aggregate_fn) + .collect::>>() + .map(Into::into) +} + +pub(crate) fn aggregate_fns_from_specs( + aggregate_specs: &[AggregateSpecProto], + session: &VortexSession, +) -> VortexResult> { + aggregate_specs + .iter() + .map(|aggregate_spec| aggregate_spec.to_aggregate_fn(session)) + .collect::>>() + .map(Into::into) +} + +pub(crate) fn aggregate_state_dtype( + column_dtype: &DType, + aggregate_fn: &AggregateFnRef, +) -> Option { + aggregate_fn.state_dtype(column_dtype).or_else(|| { + if let DType::Extension(ext) = column_dtype { + aggregate_fn.state_dtype(ext.storage_dtype()) + } else { + None + } + }) +} + +pub(crate) fn default_bounded_stat_max_bytes() -> std::num::NonZeroUsize { + // SAFETY: 64 is non-zero. + unsafe { std::num::NonZeroUsize::new_unchecked(64) } +} + #[cfg(test)] mod tests { + use vortex_array::aggregate_fn::AggregateFnVTableExt; + use vortex_array::aggregate_fn::EmptyOptions; + use vortex_array::aggregate_fn::fns::max::Max; + use vortex_array::aggregate_fn::fns::min::Min; + use vortex_array::aggregate_fn::fns::sum::Sum; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; use vortex_array::dtype::PType; @@ -59,7 +167,7 @@ mod tests { #[test] fn stats_table_dtype_adds_truncation_flags() { - let dtype = stats_table_dtype( + let dtype = legacy_stats_table_dtype( &DType::Primitive(PType::I32, Nullability::NonNullable), &[Stat::Max, Stat::Min, Stat::Sum], ); @@ -79,7 +187,7 @@ mod tests { #[test] fn stats_table_dtype_uses_storage_dtype_for_extensions() { let dtype = DType::Extension(Date::new(TimeUnit::Days, Nullability::NonNullable).erased()); - let stats_dtype = stats_table_dtype(&dtype, &[Stat::Max, Stat::Min]); + let stats_dtype = legacy_stats_table_dtype(&dtype, &[Stat::Max, Stat::Min]); assert_eq!( stats_dtype.as_struct_fields().names().as_ref(), @@ -91,4 +199,25 @@ mod tests { ] ); } + + #[test] + fn aggregate_stats_table_dtype_uses_display_names() { + let dtype = aggregate_stats_table_dtype( + &DType::Primitive(PType::I32, Nullability::NonNullable), + &[ + Max.bind(EmptyOptions), + Min.bind(EmptyOptions), + Sum.bind(EmptyOptions), + ], + ); + + assert_eq!( + dtype.as_struct_fields().names().as_ref(), + &[ + Max.bind(EmptyOptions).to_string(), + Min.bind(EmptyOptions).to_string(), + Sum.bind(EmptyOptions).to_string(), + ] + ); + } } diff --git a/vortex-layout/src/layouts/zoned/writer.rs b/vortex-layout/src/layouts/zoned/writer.rs index 7308a62f044..19211b2dd57 100644 --- a/vortex-layout/src/layouts/zoned/writer.rs +++ b/vortex-layout/src/layouts/zoned/writer.rs @@ -11,19 +11,31 @@ use parking_lot::Mutex; use vortex_array::ArrayContext; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; -use vortex_array::expr::stats::Stat; -use vortex_array::stats::PRUNING_STATS; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::AggregateFnVTable; +use vortex_array::aggregate_fn::AggregateFnVTableExt; +use vortex_array::aggregate_fn::EmptyOptions; +use vortex_array::aggregate_fn::fns::bounded_max::BoundedMax; +use vortex_array::aggregate_fn::fns::bounded_max::BoundedMaxOptions; +use vortex_array::aggregate_fn::fns::bounded_min::BoundedMin; +use vortex_array::aggregate_fn::fns::bounded_min::BoundedMinOptions; +use vortex_array::aggregate_fn::fns::max::Max; +use vortex_array::aggregate_fn::fns::min::Min; +use vortex_array::aggregate_fn::fns::nan_count::NanCount; +use vortex_array::aggregate_fn::fns::null_count::NullCount; +use vortex_array::aggregate_fn::fns::sum::Sum; +use vortex_array::dtype::DType; use vortex_error::VortexResult; use vortex_error::vortex_ensure; -use vortex_io::session::RuntimeSessionExt; use vortex_session::VortexSession; -use vortex_utils::parallelism::get_available_parallelism; use crate::IntoLayout; use crate::LayoutRef; use crate::LayoutStrategy; -use crate::layouts::zoned::StatsAccumulator; +use crate::layouts::zoned::AggregateStatsAccumulator; use crate::layouts::zoned::ZonedLayout; +use crate::layouts::zoned::aggregate_partials; +use crate::layouts::zoned::schema::default_bounded_stat_max_bytes; use crate::segments::SegmentSinkRef; use crate::sequence::SendableSequentialStream; use crate::sequence::SequencePointer; @@ -38,21 +50,17 @@ use crate::sequence::SequentialStreamExt; pub struct ZonedLayoutOptions { /// The size of a statistics block pub block_size: usize, - /// The statistics to collect for each block. - pub stats: Arc<[Stat]>, - /// Maximum length of a variable length statistics - pub max_variable_length_statistics_size: usize, - /// Number of chunks to compute in parallel. - pub concurrency: usize, + /// The aggregate partials to collect for each block. + /// + /// If unset, the writer chooses pruning aggregates from the input dtype. + pub aggregate_fns: Option>, } impl Default for ZonedLayoutOptions { fn default() -> Self { Self { block_size: 8192, - stats: PRUNING_STATS.into(), - max_variable_length_statistics_size: 64, - concurrency: get_available_parallelism().unwrap_or(1), + aggregate_fns: None, } } } @@ -93,48 +101,34 @@ impl LayoutStrategy for ZonedStrategy { "ZonedStrategy requires block_size > 0 when writing" ); - let stats = Arc::clone(&self.options.stats); + let aggregate_fns = self + .options + .aggregate_fns + .clone() + .unwrap_or_else(|| default_zoned_aggregate_fns(stream.dtype())); let session = session.clone(); - let compute_session = session.clone(); - let handle = session.handle(); - let handle2 = handle.clone(); - let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new( + let stats_accumulator = Arc::new(Mutex::new(AggregateStatsAccumulator::new( stream.dtype(), - &stats, - self.options.max_variable_length_statistics_size, + &aggregate_fns, ))); + let aggregate_fns = stats_accumulator.lock().aggregate_fns(); - // We can compute per-chunk statistics in parallel, so we spawn tasks for each chunk - let stream = SequentialStreamAdapter::new( - stream.dtype().clone(), - stream - .map(move |chunk| { - let stats = Arc::clone(&stats); - let session = compute_session.clone(); - handle2.spawn_cpu(move || { - let (sequence_id, chunk) = chunk?; - chunk - .statistics() - .compute_all(&stats, &mut session.create_execution_ctx())?; - Ok((sequence_id, chunk)) - }) - }) - .buffered(self.options.concurrency), - ) - .sendable(); - - // Now we accumulate the stats we computed above, this time we cannot spawn because we - // need to feed the accumulator an ordered stream. + // Accumulate zone stats in stream order so the auxiliary table stays aligned with the + // data child. let stats_accumulator2 = Arc::clone(&stats_accumulator); + let aggregate_fns2 = Arc::clone(&aggregate_fns); + let compute_session = session.clone(); let stream = SequentialStreamAdapter::new( stream.dtype().clone(), stream.map(move |item| { let (sequence_id, chunk) = item?; - // We have already computed per-chunk statistics, so avoid trying again for any that failed. - stats_accumulator2 - .lock() - .push_chunk_without_compute(&chunk)?; + let partials = aggregate_partials( + &chunk, + &aggregate_fns2, + &mut compute_session.create_execution_ctx(), + )?; + stats_accumulator2.lock().push_partials(partials)?; Ok((sequence_id, chunk)) }), ) @@ -155,7 +149,7 @@ impl LayoutStrategy for ZonedStrategy { ) .await?; - let Some((stats_array, stats)) = stats_accumulator.lock().as_array()? else { + let Some((stats_array, aggregate_fns)) = stats_accumulator.lock().as_array()? else { // If we have no stats (e.g. the DType doesn't support them), then we just return the // child layout. return Ok(data_layout); @@ -172,10 +166,88 @@ impl LayoutStrategy for ZonedStrategy { .write_stream(ctx, Arc::clone(&segment_sink), stats_stream, eof, &session) .await?; - Ok(ZonedLayout::new(data_layout, zones_layout, block_size, stats).into_layout()) + Ok( + ZonedLayout::try_new(data_layout, zones_layout, block_size, aggregate_fns)? + .into_layout(), + ) } fn buffered_bytes(&self) -> u64 { self.child.buffered_bytes() + self.stats.buffered_bytes() } } + +fn default_zoned_aggregate_fns(dtype: &DType) -> Arc<[AggregateFnRef]> { + let (max, min) = match dtype { + DType::Utf8(_) | DType::Binary(_) => ( + BoundedMax.bind(BoundedMaxOptions { + max_bytes: default_bounded_stat_max_bytes(), + }), + BoundedMin.bind(BoundedMinOptions { + max_bytes: default_bounded_stat_max_bytes(), + }), + ), + _ => (Max.bind(EmptyOptions), Min.bind(EmptyOptions)), + }; + + let mut aggregate_fns = vec![max, min]; + if Sum.return_dtype(&EmptyOptions, dtype).is_some() { + aggregate_fns.push(Sum.bind(EmptyOptions)); + } + aggregate_fns.push(NanCount.bind(EmptyOptions)); + aggregate_fns.push(NullCount.bind(EmptyOptions)); + + aggregate_fns.into() +} + +#[cfg(test)] +mod tests { + use vortex_array::aggregate_fn::fns::bounded_max::BoundedMax; + use vortex_array::aggregate_fn::fns::bounded_min::BoundedMin; + use vortex_array::aggregate_fn::fns::max::Max; + use vortex_array::aggregate_fn::fns::min::Min; + use vortex_array::aggregate_fn::fns::sum::Sum; + use vortex_array::dtype::Nullability; + use vortex_array::dtype::PType; + use vortex_array::extension::datetime::TimeUnit; + use vortex_array::extension::datetime::Timestamp; + + use super::*; + + #[test] + fn default_aggregates_bound_variable_length_min_max() { + let aggregate_fns = default_zoned_aggregate_fns(&DType::Utf8(Nullability::NonNullable)); + + assert_eq!( + aggregate_fns[0].as_::().max_bytes, + default_bounded_stat_max_bytes() + ); + assert_eq!( + aggregate_fns[1].as_::().max_bytes, + default_bounded_stat_max_bytes() + ); + } + + #[test] + fn default_aggregates_keep_fixed_width_min_max_exact() { + let aggregate_fns = default_zoned_aggregate_fns(&PType::I32.into()); + + assert!(aggregate_fns[0].is::()); + assert!(aggregate_fns[1].is::()); + assert!(aggregate_fns[2].is::()); + } + + #[test] + fn default_aggregates_skip_sum_for_non_summable_dtype() { + let dtype = DType::Extension( + Timestamp::new(TimeUnit::Microseconds, Nullability::Nullable).erased(), + ); + let aggregate_fns = default_zoned_aggregate_fns(&dtype); + + assert!( + aggregate_fns + .iter() + .all(|aggregate_fn| !aggregate_fn.is::()) + ); + } +} diff --git a/vortex-layout/src/layouts/zoned/zone_map.rs b/vortex-layout/src/layouts/zoned/zone_map.rs index dbfaab93910..0fe79cbbc5c 100644 --- a/vortex-layout/src/layouts/zoned/zone_map.rs +++ b/vortex-layout/src/layouts/zoned/zone_map.rs @@ -9,16 +9,30 @@ use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::AggregateFnSatisfaction; +use vortex_array::aggregate_fn::fns::all_nan::AllNan; +use vortex_array::aggregate_fn::fns::all_non_nan::AllNonNan; +use vortex_array::aggregate_fn::fns::all_non_null::AllNonNull; +use vortex_array::aggregate_fn::fns::all_null::AllNull; +use vortex_array::aggregate_fn::fns::nan_count::NanCount; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::StructArray; use vortex_array::arrays::struct_::StructArrayExt; use vortex_array::dtype::DType; use vortex_array::expr::Expression; +use vortex_array::expr::eq; use vortex_array::expr::get_item; use vortex_array::expr::is_root; +use vortex_array::expr::lit; use vortex_array::expr::root; use vortex_array::expr::stats::Stat; +use vortex_array::expr::traversal::NodeExt; +use vortex_array::expr::traversal::Transformed; +use vortex_array::scalar_fn::EmptyOptions; +use vortex_array::scalar_fn::ScalarFnVTableExt; +use vortex_array::scalar_fn::fns::stat::StatFn; +use vortex_array::scalar_fn::internal::row_count::RowCount; use vortex_array::scalar_fn::internal::row_count::contains_row_count; use vortex_array::scalar_fn::internal::row_count::substitute_row_count; use vortex_array::stats::bind::StatBinder; @@ -31,7 +45,8 @@ use vortex_mask::Mask; use vortex_runend::RunEnd; use vortex_session::VortexSession; -use crate::layouts::zoned::schema::stats_table_dtype; +use crate::layouts::zoned::schema::aggregate_stats_table_dtype; +use crate::layouts::zoned::schema::legacy_stats_table_dtype; /// A zone map containing statistics for a column. /// Each row of the zone map corresponds to a chunk of the column. @@ -43,6 +58,8 @@ pub struct ZoneMap { column_dtype: DType, // The struct array backing the zone map array: StructArray, + // Aggregate functions stored in the zone map, ordered by their stats-table fields. + aggregate_fns: Arc<[AggregateFnRef]>, // The length of each zone in the zone map. zone_len: u64, // Number of rows that the zone map covers @@ -55,28 +72,30 @@ impl ZoneMap { pub fn try_new( column_dtype: DType, array: StructArray, - stats: Arc<[Stat]>, + aggregate_fns: Arc<[AggregateFnRef]>, zone_len: u64, row_count: u64, ) -> VortexResult { - let expected_dtype = stats_table_dtype(&column_dtype, &stats); + let expected_dtype = aggregate_stats_table_dtype(&column_dtype, &aggregate_fns); if &expected_dtype != array.dtype() { vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}"); } // SAFETY: We checked that the array matches the expected stats-table schema. - Ok(unsafe { Self::new_unchecked(column_dtype, array, zone_len, row_count) }) + Ok(unsafe { Self::new_unchecked(column_dtype, array, aggregate_fns, zone_len, row_count) }) } pub(super) unsafe fn new_unchecked( column_dtype: DType, array: StructArray, + aggregate_fns: Arc<[AggregateFnRef]>, zone_len: u64, row_count: u64, ) -> Self { Self { column_dtype, array, + aggregate_fns, zone_len, row_count, } @@ -85,9 +104,26 @@ impl ZoneMap { /// Returns the [`DType`] of the statistics table given a set of statistics and column [`DType`]. /// /// This remains as a compatibility wrapper around the zoned schema helper. - #[deprecated(note = "zone-map stats table dtypes are an internal layout detail")] + #[deprecated(note = "use aggregate-function zoned stats instead")] pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType { - stats_table_dtype(column_dtype, present_stats) + legacy_stats_table_dtype(column_dtype, present_stats) + } + + #[cfg(test)] + fn try_new_legacy( + column_dtype: DType, + array: StructArray, + stats: Arc<[Stat]>, + zone_len: u64, + row_count: u64, + ) -> VortexResult { + let expected_dtype = legacy_stats_table_dtype(&column_dtype, &stats); + if &expected_dtype != array.dtype() { + vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}"); + } + + // SAFETY: We checked that the array matches the expected legacy stats-table schema. + Ok(unsafe { Self::new_unchecked(column_dtype, array, Arc::new([]), zone_len, row_count) }) } /// Apply a pruning predicate to this zone map. @@ -120,9 +156,42 @@ impl ZoneMap { } fn lower_stats(&self, predicate: Expression) -> VortexResult { + let predicate = self.lower_non_float_nan_stats(predicate)?; let binder = ZoneMapStatsBinder { zone_map: self }; bind_stats(predicate, &binder)?.optimize_recursive(self.array.dtype()) } + + fn lower_non_float_nan_stats(&self, predicate: Expression) -> VortexResult { + predicate + .transform_down(|expr| { + if !expr.is::() { + return Ok(Transformed::no(expr)); + } + + let options = expr.as_::(); + let aggregate_fn = options.aggregate_fn(); + let input_dtype = expr.child(0).return_dtype(&self.column_dtype)?; + + if has_nans(&input_dtype) { + return Ok(Transformed::no(expr)); + } + + if aggregate_fn.is::() { + return Ok(Transformed::yes(lit(0u64))); + } + + if aggregate_fn.is::() { + return Ok(Transformed::yes(lit(false))); + } + + if aggregate_fn.is::() { + return Ok(Transformed::yes(lit(true))); + } + + Ok(Transformed::no(expr)) + }) + .map(Transformed::into_inner) + } } struct ZoneMapStatsBinder<'a> { @@ -140,24 +209,103 @@ impl StatBinder for ZoneMapStatsBinder<'_> { aggregate_fn: &AggregateFnRef, _stat_dtype: &DType, ) -> VortexResult> { - let Some(stat) = Stat::from_aggregate_fn(aggregate_fn) else { - return Ok(None); - }; if !is_root(input) { return Ok(None); } - if self - .zone_map - .array - .unmasked_field_by_name_opt(stat.name()) - .is_none() + + if let Some(stat_expr) = self.zone_map.aggregate_field_expr(aggregate_fn) { + return Ok(Some(stat_expr)); + } + + if aggregate_fn.is::() { + return Ok(self + .zone_map + .stat_field_expr(Stat::NullCount) + .map(|null_count| eq(null_count, row_count_expr()))); + } + + if aggregate_fn.is::() { + return Ok(self + .zone_map + .stat_field_expr(Stat::NullCount) + .map(|null_count| eq(null_count, lit(0u64)))); + } + + if aggregate_fn.is::() { + return Ok(self + .zone_map + .stat_field_expr(Stat::NaNCount) + .map(|nan_count| eq(nan_count, row_count_expr()))); + } + + if aggregate_fn.is::() { + return Ok(self + .zone_map + .stat_field_expr(Stat::NaNCount) + .map(|nan_count| eq(nan_count, lit(0u64)))); + } + + if let Some(stat) = Stat::from_aggregate_fn(aggregate_fn) { + return Ok(self.zone_map.stat_field_expr(stat)); + } + + Ok(None) + } +} + +impl ZoneMap { + fn aggregate_field_expr(&self, requested: &AggregateFnRef) -> Option { + let field_name = requested.to_string(); + if self.array.unmasked_field_by_name_opt(&field_name).is_some() { + return Some(get_item(field_name, root())); + } + + let mut approximate = None; + for stored in self.aggregate_fns.iter() { + let field_name = stored.to_string(); + if self.array.unmasked_field_by_name_opt(&field_name).is_none() { + continue; + } + + match stored.can_satisfy(requested) { + AggregateFnSatisfaction::Exact => return Some(get_item(field_name, root())), + AggregateFnSatisfaction::Approximate => { + approximate = Some(get_item(field_name, root())); + } + AggregateFnSatisfaction::No => {} + } + } + + approximate + } + + fn stat_field_expr(&self, stat: Stat) -> Option { + if let Some(aggregate_fn) = stat.aggregate_fn() + && let Some(expr) = self.aggregate_field_expr(&aggregate_fn) { - return Ok(None); + return Some(expr); } - Ok(Some(get_item(stat.name(), root()))) + + self.legacy_stat_field_expr(stat) + } + + fn legacy_stat_field_expr(&self, stat: Stat) -> Option { + if self.array.unmasked_field_by_name_opt(stat.name()).is_some() { + return Some(get_item(stat.name(), root())); + } + + None } } +fn row_count_expr() -> Expression { + RowCount.new_expr(EmptyOptions, []) +} + +fn has_nans(dtype: &DType) -> bool { + matches!(dtype, DType::Primitive(ptype, _) if ptype.is_float()) +} + /// Build per-zone row counts for a zone map. /// /// `zone_len` is the nominal zone size; only the final zone may be shorter. The @@ -192,9 +340,22 @@ fn row_count_array(zone_len: u64, row_count: u64, num_zones: usize) -> VortexRes #[cfg(test)] mod tests { + use std::num::NonZeroUsize; use std::sync::Arc; use vortex_array::IntoArray; + use vortex_array::aggregate_fn::AggregateFnVTableExt; + use vortex_array::aggregate_fn::EmptyOptions; + use vortex_array::aggregate_fn::fns::all_non_null::AllNonNull; + use vortex_array::aggregate_fn::fns::all_null::AllNull; + use vortex_array::aggregate_fn::fns::bounded_max::BoundedMax; + use vortex_array::aggregate_fn::fns::bounded_max::BoundedMaxOptions; + use vortex_array::aggregate_fn::fns::bounded_min::BoundedMin; + use vortex_array::aggregate_fn::fns::bounded_min::BoundedMinOptions; + use vortex_array::aggregate_fn::fns::max::Max; + use vortex_array::aggregate_fn::fns::min::Min; + use vortex_array::aggregate_fn::fns::nan_count::NanCount; + use vortex_array::aggregate_fn::fns::null_count::NullCount; use vortex_array::arrays::BoolArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::StructArray; @@ -229,6 +390,11 @@ mod tests { expr.falsify(&dtype, &SESSION).unwrap().unwrap() } + fn default_bounded_stat_max_bytes() -> NonZeroUsize { + // SAFETY: 64 is non-zero. + unsafe { NonZeroUsize::new_unchecked(64) } + } + #[test] fn test_zone_map_prunes() { // Construct a zone map with 3 zones: @@ -242,28 +408,22 @@ mod tests { // +----------+----------+ // | 3 | 7 | // +----------+----------+ + let max = Max.bind(EmptyOptions); + let min = Min.bind(EmptyOptions); let zone_map = ZoneMap::try_new( PType::I32.into(), StructArray::from_fields(&[ ( - "max", + max.to_string(), PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(), ), ( - "max_is_truncated", - BoolArray::from_iter([false, false, false]).into_array(), - ), - ( - "min", + min.to_string(), PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(), ), - ( - "min_is_truncated", - BoolArray::from_iter([false, false, false]).into_array(), - ), ]) .unwrap(), - Arc::new([Stat::Max, Stat::Min]), + Arc::new([max, min]), 3, 10, ) @@ -298,8 +458,49 @@ mod tests { } #[test] - fn row_count_prunes_short_trailing_zone() { + fn bounded_display_names_satisfy_min_max_rewrites() { + let bounded_max = BoundedMax.bind(BoundedMaxOptions { + max_bytes: default_bounded_stat_max_bytes(), + }); + let bounded_min = BoundedMin.bind(BoundedMinOptions { + max_bytes: default_bounded_stat_max_bytes(), + }); let zone_map = ZoneMap::try_new( + PType::I32.into(), + StructArray::from_fields(&[ + ( + bounded_max.to_string(), + PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(), + ), + ( + bounded_min.to_string(), + PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(), + ), + ]) + .unwrap(), + Arc::new([bounded_max, bounded_min]), + 3, + 10, + ) + .unwrap(); + + let expr = gt(root(), lit(5i32)); + let pruning_expr = falsify(&expr, PType::I32.into()); + let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap(); + assert_arrays_eq!( + mask.into_array(), + BoolArray::from_iter([true, false, false]) + ); + + let expr = lt(root(), lit(2i32)); + let pruning_expr = falsify(&expr, PType::I32.into()); + let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap(); + assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true])); + } + + #[test] + fn row_count_prunes_short_trailing_zone() { + let zone_map = ZoneMap::try_new_legacy( PType::U64.into(), StructArray::from_fields(&[( "null_count", @@ -324,7 +525,7 @@ mod tests { #[test] fn row_count_substitution_handles_empty_zone_map() { - let zone_map = ZoneMap::try_new( + let zone_map = ZoneMap::try_new_legacy( PType::U64.into(), StructArray::from_fields(&[( "null_count", @@ -346,7 +547,7 @@ mod tests { #[test] fn is_null_falsification_uses_null_count() { - let zone_map = ZoneMap::try_new( + let zone_map = ZoneMap::try_new_legacy( PType::U64.into(), StructArray::from_fields(&[( "null_count", @@ -370,8 +571,8 @@ mod tests { } #[test] - fn abstract_null_stats_do_not_derive_from_null_count() { - let zone_map = ZoneMap::try_new( + fn all_null_stat_fn_lowers_to_null_count_and_row_count() { + let zone_map = ZoneMap::try_new_legacy( PType::U64.into(), StructArray::from_fields(&[( "null_count", @@ -385,20 +586,85 @@ mod tests { .unwrap(); let mask = zone_map.prune(&all_null(root()), &SESSION).unwrap(); + assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true])); + } + + #[test] + fn all_non_null_stat_fn_lowers_to_null_count() { + let zone_map = ZoneMap::try_new_legacy( + PType::U64.into(), + StructArray::from_fields(&[( + "null_count", + PrimitiveArray::new(buffer![0u64, 4, 2], Validity::AllValid).into_array(), + )]) + .unwrap(), + Arc::new([Stat::NullCount]), + 4, + 10, + ) + .unwrap(); + + let mask = zone_map.prune(&all_non_null(root()), &SESSION).unwrap(); assert_arrays_eq!( mask.into_array(), - BoolArray::from_iter([false, false, false]) + BoolArray::from_iter([true, false, false]) ); + } + + #[test] + fn all_null_stat_fn_lowers_to_null_count_field() { + let null_count = NullCount.bind(EmptyOptions); + let zone_map = ZoneMap::try_new( + PType::U64.into(), + StructArray::from_fields(&[( + null_count.to_string(), + PrimitiveArray::new(buffer![4u64, 0, 2], Validity::AllValid).into_array(), + )]) + .unwrap(), + Arc::new([null_count]), + 4, + 10, + ) + .unwrap(); + + let mask = zone_map.prune(&all_null(root()), &SESSION).unwrap(); + assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([true, false, true])); let mask = zone_map.prune(&all_non_null(root()), &SESSION).unwrap(); assert_arrays_eq!( mask.into_array(), - BoolArray::from_iter([false, false, false]) + BoolArray::from_iter([false, true, false]) ); } #[test] - fn non_float_nan_stat_fns_error() { + fn all_nan_stat_fn_lowers_to_nan_count_field() { + let nan_count = NanCount.bind(EmptyOptions); + let zone_map = ZoneMap::try_new( + PType::F32.into(), + StructArray::from_fields(&[( + nan_count.to_string(), + PrimitiveArray::new(buffer![4u64, 0, 2], Validity::AllValid).into_array(), + )]) + .unwrap(), + Arc::new([nan_count]), + 4, + 10, + ) + .unwrap(); + + let mask = zone_map.prune(&all_nan(root()), &SESSION).unwrap(); + assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([true, false, true])); + + let mask = zone_map.prune(&all_non_nan(root()), &SESSION).unwrap(); + assert_arrays_eq!( + mask.into_array(), + BoolArray::from_iter([false, true, false]) + ); + } + + #[test] + fn non_float_nan_stat_fns_lower_to_constants() { let zone_map = ZoneMap::try_new( PType::I32.into(), StructArray::try_new(FieldNames::empty(), vec![], 2, Validity::NonNullable).unwrap(), @@ -408,8 +674,11 @@ mod tests { ) .unwrap(); - assert!(zone_map.prune(&all_nan(root()), &SESSION).is_err()); - assert!(zone_map.prune(&all_non_nan(root()), &SESSION).is_err()); + let mask = zone_map.prune(&all_nan(root()), &SESSION).unwrap(); + assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, false])); + + let mask = zone_map.prune(&all_non_nan(root()), &SESSION).unwrap(); + assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([true, true])); } #[test] @@ -439,21 +708,16 @@ mod tests { } #[test] - fn float_min_max_prunes_only_with_all_non_nan_proof() { + fn float_min_max_stat_fn_requires_nan_count() { + let max = Max.bind(EmptyOptions); let zone_map = ZoneMap::try_new( PType::F32.into(), - StructArray::from_fields(&[ - ( - "max", - PrimitiveArray::new(buffer![5.0f32, 6.0, 7.0], Validity::AllValid).into_array(), - ), - ( - "max_is_truncated", - BoolArray::from_iter([false, false, false]).into_array(), - ), - ]) + StructArray::from_fields(&[( + max.to_string(), + PrimitiveArray::new(buffer![5.0f32, 6.0, 7.0], Validity::AllValid).into_array(), + )]) .unwrap(), - Arc::new([Stat::Max]), + Arc::new([max.clone()]), 4, 12, ) @@ -467,24 +731,21 @@ mod tests { BoolArray::from_iter([false, false, false]) ); + let nan_count = NanCount.bind(EmptyOptions); let zone_map = ZoneMap::try_new( PType::F32.into(), StructArray::from_fields(&[ ( - "max", + max.to_string(), PrimitiveArray::new(buffer![5.0f32, 6.0, 7.0], Validity::AllValid).into_array(), ), ( - "max_is_truncated", - BoolArray::from_iter([false, false, false]).into_array(), - ), - ( - "nan_count", + nan_count.to_string(), PrimitiveArray::new(buffer![0u64, 0, 0], Validity::AllValid).into_array(), ), ]) .unwrap(), - Arc::new([Stat::Max, Stat::NaNCount]), + Arc::new([max, nan_count]), 4, 12, ) @@ -498,8 +759,8 @@ mod tests { } #[test] - fn float_cast_min_max_stat_fn_requires_all_non_nan() { - let zone_map = ZoneMap::try_new( + fn float_cast_min_max_stat_fn_uses_source_nan_count() { + let zone_map = ZoneMap::try_new_legacy( PType::F32.into(), StructArray::from_fields(&[ ( @@ -595,7 +856,7 @@ mod tests { #[test] fn row_count_prunes_all_null_uniform_zones() { - let zone_map = ZoneMap::try_new( + let zone_map = ZoneMap::try_new_legacy( PType::U64.into(), StructArray::from_fields(&[( "null_count", @@ -618,4 +879,47 @@ mod tests { BoolArray::from_iter([false, true, false]) ); } + + #[test] + fn all_null_stat_fn_lowers_to_aggregate_field() { + let all_null_agg = AllNull.bind(EmptyOptions); + let zone_map = ZoneMap::try_new( + PType::U64.into(), + StructArray::from_fields(&[( + all_null_agg.to_string(), + BoolArray::from_iter([Some(false), Some(true), Some(true)]).into_array(), + )]) + .unwrap(), + Arc::new([all_null_agg]), + 4, + 10, + ) + .unwrap(); + + let mask = zone_map.prune(&all_null(root()), &SESSION).unwrap(); + assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true])); + } + + #[test] + fn all_non_null_stat_fn_lowers_to_aggregate_field() { + let all_non_null_agg = AllNonNull.bind(EmptyOptions); + let zone_map = ZoneMap::try_new( + PType::U64.into(), + StructArray::from_fields(&[( + all_non_null_agg.to_string(), + BoolArray::from_iter([Some(true), Some(false), Some(false)]).into_array(), + )]) + .unwrap(), + Arc::new([all_non_null_agg]), + 4, + 10, + ) + .unwrap(); + + let mask = zone_map.prune(&all_non_null(root()), &SESSION).unwrap(); + assert_arrays_eq!( + mask.into_array(), + BoolArray::from_iter([true, false, false]) + ); + } } diff --git a/vortex-layout/src/session.rs b/vortex-layout/src/session.rs index f46f1d92086..42c8906d107 100644 --- a/vortex-layout/src/session.rs +++ b/vortex-layout/src/session.rs @@ -12,6 +12,7 @@ use crate::layouts::chunked::ChunkedLayoutEncoding; use crate::layouts::dict::DictLayoutEncoding; use crate::layouts::flat::FlatLayoutEncoding; use crate::layouts::struct_::StructLayoutEncoding; +use crate::layouts::zoned::LegacyStatsLayoutEncoding; use crate::layouts::zoned::ZonedLayoutEncoding; pub type LayoutRegistry = Registry; @@ -50,6 +51,10 @@ impl Default for LayoutSession { layouts.register(FlatLayoutEncoding.id(), FlatLayoutEncoding.as_ref()); layouts.register(StructLayoutEncoding.id(), StructLayoutEncoding.as_ref()); layouts.register(ZonedLayoutEncoding.id(), ZonedLayoutEncoding.as_ref()); + layouts.register( + LegacyStatsLayoutEncoding.id(), + LegacyStatsLayoutEncoding.as_ref(), + ); layouts.register(DictLayoutEncoding.id(), DictLayoutEncoding.as_ref()); Self { registry: layouts } diff --git a/vortex-layout/src/vtable.rs b/vortex-layout/src/vtable.rs index c48a9d33d1f..fc7b87445d1 100644 --- a/vortex-layout/src/vtable.rs +++ b/vortex-layout/src/vtable.rs @@ -26,6 +26,14 @@ use crate::children::LayoutChildren; use crate::segments::SegmentId; use crate::segments::SegmentSource; +/// Context available while constructing a layout from serialized metadata. +pub struct LayoutBuildContext<'a> { + /// The session used to resolve plugin-owned metadata such as aggregate function options. + pub session: &'a VortexSession, + /// The array read context referenced by serialized array metadata in descendant layouts. + pub array_read_ctx: &'a ReadContext, +} + pub trait VTable: 'static + Sized + Send + Sync + Debug { type Layout: 'static + Send + Sync + Clone + Debug + Deref + IntoLayout; type Encoding: 'static + Send + Sync + Deref; @@ -83,7 +91,7 @@ pub trait VTable: 'static + Sized + Send + Sync + Debug { metadata: &::Output, segment_ids: Vec, children: &dyn LayoutChildren, - ctx: &ReadContext, + build_ctx: &LayoutBuildContext<'_>, ) -> VortexResult; /// Replaces the children of the layout with the given layout references. diff --git a/vortex-python/src/io.rs b/vortex-python/src/io.rs index a793b16eb82..ac51e0862a1 100644 --- a/vortex-python/src/io.rs +++ b/vortex-python/src/io.rs @@ -257,7 +257,7 @@ impl PyVortexWriteOptions { /// >>> vx.io.VortexWriteOptions.default().write(sprl, "chonky.vortex") /// >>> import os /// >>> os.path.getsize('chonky.vortex') - /// 216004 + /// 215932 /// /// Wow, Vortex manages to use about two bytes per integer! So advanced. So tiny. /// @@ -267,7 +267,7 @@ impl PyVortexWriteOptions { /// /// >>> vx.io.VortexWriteOptions.compact().write(sprl, "tiny.vortex") /// >>> os.path.getsize('tiny.vortex') - /// 55120 + /// 55060 /// /// Random numbers are not (usually) composed of random bytes! #[staticmethod] diff --git a/vortex-tui/src/browse/app.rs b/vortex-tui/src/browse/app.rs index 97e17bac205..d7179c32430 100644 --- a/vortex-tui/src/browse/app.rs +++ b/vortex-tui/src/browse/app.rs @@ -16,6 +16,7 @@ use vortex::file::VortexFile; use vortex::layout::LayoutRef; use vortex::layout::VTable; use vortex::layout::layouts::flat::Flat; +use vortex::layout::layouts::zoned::LegacyStats; use vortex::layout::layouts::zoned::Zoned; use vortex::layout::segments::SegmentId; use vortex::layout::segments::SegmentSource; @@ -151,10 +152,11 @@ impl LayoutCursor { /// Returns `true` if the cursor is currently pointing at a statistics table. /// - /// A statistics table is the second child of a [`Zoned`] layout. + /// A statistics table is the second child of a zoned layout. pub fn is_stats_table(&self) -> bool { let parent = self.parent(); - parent.layout().is::() && self.path.last().copied().unwrap_or_default() == 1 + (parent.layout().is::() || parent.layout().is::()) + && self.path.last().copied().unwrap_or_default() == 1 } /// Get the data type of the current layout. diff --git a/vortex-tui/src/browse/ui/layouts.rs b/vortex-tui/src/browse/ui/layouts.rs index 14c9531645c..93468642db8 100644 --- a/vortex-tui/src/browse/ui/layouts.rs +++ b/vortex-tui/src/browse/ui/layouts.rs @@ -34,6 +34,7 @@ use vortex::array::arrays::StructArray; use vortex::array::arrays::struct_::StructArrayExt; use vortex::error::VortexExpect; use vortex::layout::layouts::flat::Flat; +use vortex::layout::layouts::zoned::LegacyStats; use vortex::layout::layouts::zoned::Zoned; use crate::browse::app::AppState; @@ -89,12 +90,21 @@ fn render_layout_header(app: &AppState, area: Rect, buf: &mut Buffer) { rows.push(Text::from(metadata_info)); } - if let Some(layout) = cursor.layout().as_opt::() { - // Push any zone stats. + let present_aggregates = cursor + .layout() + .as_opt::() + .map(|layout| layout.present_aggregates()) + .or_else(|| { + cursor + .layout() + .as_opt::() + .map(|layout| layout.present_aggregates()) + }); + if let Some(present_aggregates) = present_aggregates { let mut line = String::new(); - line.push_str("Statistics: "); - for stat in layout.present_stats().as_ref() { - line.push_str(stat.to_string().as_str()); + line.push_str("Aggregates: "); + for aggregate in present_aggregates.as_ref() { + line.push_str(aggregate.as_str()); line.push(' '); }