From d8f0001ba14fbdf9389453ce54e4b49fcfeb1ef9 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 26 Jun 2026 14:33:05 +0100 Subject: [PATCH 1/4] Remove ArrayAccessor Signed-off-by: Robert Kruszewski --- .../experimental/onpair/benches/decode.rs | 25 +- encodings/experimental/onpair/src/compress.rs | 155 +++++------ .../onpair/src/compute/compare.rs | 11 +- encodings/experimental/onpair/src/tests.rs | 252 ++++++++---------- .../experimental/onpair/tests/big_data.rs | 41 +-- .../fastlanes/src/for/array/for_compress.rs | 64 ++--- .../fastlanes/src/for/compute/is_sorted.rs | 14 +- encodings/fastlanes/src/for/vtable/mod.rs | 4 +- .../fastlanes/src/for/vtable/operations.rs | 9 +- encodings/fsst/benches/fsst_url_compare.rs | 19 +- encodings/fsst/src/array.rs | 31 +-- encodings/fsst/src/canonical.rs | 13 +- encodings/pco/src/array.rs | 1 + encodings/zstd/src/array.rs | 33 ++- fuzz/src/array/compare.rs | 55 ++-- fuzz/src/array/filter.rs | 13 +- fuzz/src/array/search_sorted.rs | 7 +- fuzz/src/array/slice.rs | 7 +- fuzz/src/array/sort.rs | 7 +- fuzz/src/array/take.rs | 7 +- vortex-array/benches/dict_compare.rs | 50 ++-- vortex-array/src/accessor.rs | 14 - .../src/aggregate_fn/fns/is_sorted/mod.rs | 2 +- .../src/aggregate_fn/fns/is_sorted/varbin.rs | 14 +- .../src/aggregate_fn/fns/min_max/mod.rs | 2 +- .../src/aggregate_fn/fns/min_max/varbin.rs | 25 +- vortex-array/src/arrays/chunked/tests.rs | 39 ++- .../src/arrays/chunked/vtable/canonical.rs | 29 +- vortex-array/src/arrays/dict/compute/mod.rs | 46 ++-- .../src/arrays/primitive/array/accessor.rs | 41 --- .../src/arrays/primitive/array/mod.rs | 32 ++- vortex-array/src/arrays/primitive/mod.rs | 3 +- vortex-array/src/arrays/varbin/accessor.rs | 63 ----- vortex-array/src/arrays/varbin/mod.rs | 2 - .../src/arrays/varbinview/accessor.rs | 72 ----- .../src/arrays/varbinview/compute/mod.rs | 20 +- .../src/arrays/varbinview/compute/take.rs | 44 +-- .../src/arrays/varbinview/compute/zip.rs | 23 +- vortex-array/src/arrays/varbinview/mod.rs | 1 - vortex-array/src/builders/dict/bytes.rs | 108 ++++---- vortex-array/src/lib.rs | 1 - vortex-btrblocks/src/schemes/integer/for_.rs | 2 +- vortex-btrblocks/src/schemes/string/onpair.rs | 2 +- vortex-btrblocks/tests/onpair_roundtrip.rs | 92 ++++--- vortex-cuda/benches/dynamic_dispatch_cuda.rs | 5 +- vortex-cuda/src/dynamic_dispatch/mod.rs | 10 +- vortex-file/src/tests.rs | 17 +- vortex-row/src/codec.rs | 2 +- .../arrays/synthetic/encodings/for_.rs | 22 +- .../common_encoding_tree_throughput.rs | 12 +- vortex/benches/single_encoding_throughput.rs | 6 +- 51 files changed, 728 insertions(+), 841 deletions(-) delete mode 100644 vortex-array/src/accessor.rs delete mode 100644 vortex-array/src/arrays/primitive/array/accessor.rs delete mode 100644 vortex-array/src/arrays/varbin/accessor.rs delete mode 100644 vortex-array/src/arrays/varbinview/accessor.rs diff --git a/encodings/experimental/onpair/benches/decode.rs b/encodings/experimental/onpair/benches/decode.rs index 1ee96652891..ff4158cda32 100644 --- a/encodings/experimental/onpair/benches/decode.rs +++ b/encodings/experimental/onpair/benches/decode.rs @@ -33,6 +33,7 @@ use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; +use vortex_array::array_session; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::VarBinArray; use vortex_array::arrays::VarBinViewArray; @@ -80,7 +81,7 @@ use vortex_onpair::onpair_compress; use vortex_session::VortexSession; static SESSION: LazyLock = LazyLock::new(|| { - let session = vortex_array::array_session(); + let session = array_session(); vortex_onpair::initialize(&session); session }); @@ -153,13 +154,13 @@ fn corpus(n: usize, shape: Shape) -> Vec { out } -fn compress(n: usize, shape: Shape) -> OnPairArray { +fn compress(n: usize, shape: Shape, ctx: &mut ExecutionCtx) -> OnPairArray { let strings = corpus(n, shape); let varbin = VarBinArray::from_iter( strings.iter().map(|s| Some(s.as_bytes())), DType::Utf8(Nullability::NonNullable), ); - onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG) + onpair_compress(varbin.as_array(), DEFAULT_DICT12_CONFIG, ctx) .unwrap_or_else(|e| panic!("onpair_compress failed: {e}")) } @@ -172,13 +173,12 @@ fn widen(arr: &ArrayRef, ctx: &mut ExecutionCtx) -> Buffer { .into_buffer::() } -fn materialise(arr: &OnPairArray) -> (DecodeInputs, usize) { - let mut ctx = SESSION.create_execution_ctx(); +fn materialise(arr: &OnPairArray, ctx: &mut ExecutionCtx) -> (DecodeInputs, usize) { let view = arr.as_view(); let inputs = DecodeInputs { dict_bytes: view.dict_bytes().clone(), - dict_offsets: widen::(view.dict_offsets(), &mut ctx), - codes: widen::(view.codes(), &mut ctx), + dict_offsets: widen::(view.dict_offsets(), ctx), + codes: widen::(view.codes(), ctx), bits: view.bits(), }; let total = inputs.decompressed_len(); @@ -197,9 +197,10 @@ const CASES: &[(Shape, usize)] = &[ /// Hits `onpair::decompress_into` directly. #[divan::bench(args = CASES)] fn decompress_into_bench(bencher: Bencher, case: (Shape, usize)) { + let mut ctx = SESSION.create_execution_ctx(); let (shape, n) = case; - let arr = compress(n, shape); - let (inputs, total) = materialise(&arr); + let arr = compress(n, shape, &mut ctx); + let (inputs, total) = materialise(&arr, &mut ctx); bencher.bench_local(|| { let mut out: Vec = Vec::with_capacity(total); let written = inputs.decompress_into(out.spare_capacity_mut()); @@ -212,8 +213,9 @@ fn decompress_into_bench(bencher: Bencher, case: (Shape, usize)) { /// building the view buffer + `BinaryView` list, etc. #[divan::bench(args = CASES)] fn canonicalize_to_varbinview(bencher: Bencher, case: (Shape, usize)) { + let mut ctx = SESSION.create_execution_ctx(); let (shape, n) = case; - let arr = compress(n, shape); + let arr = compress(n, shape, &mut ctx); bencher .with_inputs(|| (arr.clone().into_array(), SESSION.create_execution_ctx())) .bench_local_values(|(arr, mut ctx)| { @@ -232,8 +234,9 @@ const COMPUTE_CASES: &[(Shape, usize)] = &[(Shape::UrlLog, 100_000), (Shape::Url /// rows; the cost is dominated by the `codes` segment copy + offsets. #[divan::bench(args = COMPUTE_CASES)] fn filter_share_dict(bencher: Bencher, case: (Shape, usize)) { + let mut ctx = SESSION.create_execution_ctx(); let (shape, n) = case; - let arr = compress(n, shape); + let arr = compress(n, shape, &mut ctx); let mask = Mask::from_iter((0..n).map(|i| i % 7 == 0)); bencher .with_inputs(|| SESSION.create_execution_ctx()) diff --git a/encodings/experimental/onpair/src/compress.rs b/encodings/experimental/onpair/src/compress.rs index 226922cf593..ed0833995f6 100644 --- a/encodings/experimental/onpair/src/compress.rs +++ b/encodings/experimental/onpair/src/compress.rs @@ -8,20 +8,19 @@ use onpair::Offset; use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; -use vortex_array::LEGACY_SESSION; -use vortex_array::VortexSessionExecute; -use vortex_array::accessor::ArrayAccessor; +use vortex_array::arrays::ConstantArray; use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::varbinview::BinaryView; use vortex_array::buffer::BufferHandle; -use vortex_array::dtype::DType; -use vortex_array::dtype::Nullability; use vortex_array::validity::Validity; +use vortex_buffer::Alignment; use vortex_buffer::Buffer; use vortex_buffer::BufferMut; use vortex_buffer::ByteBuffer; -use vortex_error::VortexExpect; +use vortex_buffer::ByteBufferMut; use vortex_error::VortexResult; use vortex_error::vortex_err; +use vortex_mask::AllOr; use crate::OnPair; use crate::OnPairArray; @@ -29,49 +28,64 @@ use crate::OnPairArray; /// Default OnPair training configuration: 12-bit codes ("dict-12"). pub const DEFAULT_DICT12_CONFIG: Config = onpair::DEFAULT_CONFIG; -/// Compress an iterable of optional byte strings via the OnPair encoder. -pub fn onpair_compress_iter<'a, I>( - iter: I, - len: usize, - dtype: DType, - config: Config, -) -> VortexResult -where - I: Iterator>, -{ - onpair_compress_iter_with_offsets::(iter, len, dtype, config) -} - -fn onpair_compress_iter_with_offsets<'a, O, I>( - iter: I, - len: usize, - dtype: DType, +fn onpair_compress_varbinview( + array: VarBinViewArray, config: Config, + ctx: &mut ExecutionCtx, ) -> VortexResult where O: Offset, - I: Iterator>, { + let len = array.len(); + let mask = array.validity()?.execute_mask(len, ctx)?; + if mask.all_false() { + return OnPair::try_new( + array.dtype().clone(), + BufferHandle::new_host(ByteBuffer::empty()), + ConstantArray::new(0, len).into_array(), + ConstantArray::new(0u16, len).into_array(), + ConstantArray::new(0u32, len + 1).into_array(), + ConstantArray::new(0i32, len).into_array(), + Validity::AllInvalid, + 9, + ); + } + let mut flat: Vec = Vec::with_capacity(len * 16); let mut offsets: Vec = Vec::with_capacity(len + 1); let mut uncompressed_lengths: BufferMut = BufferMut::with_capacity(len); - let mut validity_bits: Vec = Vec::with_capacity(len); - offsets.push(::from_usize(0)); + offsets.push(O::from_usize(0)); + let views = array.views(); + let buffers = array + .data_buffers() + .as_ref() + .iter() + .map(|b| b.as_host()) + .collect::>(); - for item in iter { - match item { - Some(bytes) => { + match mask.bit_buffer() { + AllOr::All => { + for view in views { + let bytes = view_bytes(view, &buffers); flat.extend_from_slice(bytes); - offsets.push(::from_usize(flat.len())); - uncompressed_lengths.push( - i32::try_from(bytes.len()).vortex_expect("string length must fit in i32"), - ); - validity_bits.push(true); + offsets.push(O::from_usize(flat.len())); + uncompressed_lengths.push(view.len() as i32); } - None => { - offsets.push(::from_usize(flat.len())); - uncompressed_lengths.push(0); - validity_bits.push(false); + } + AllOr::None => { + unreachable!("all_false() should have been caught earlier"); + } + AllOr::Some(validity) => { + for (view, valid) in views.iter().zip(validity.iter()) { + if valid { + let bytes = view_bytes(view, &buffers); + flat.extend_from_slice(bytes); + offsets.push(O::from_usize(flat.len())); + uncompressed_lengths.push(view.len() as i32); + } else { + offsets.push(O::from_usize(flat.len())); + uncompressed_lengths.push(0); + } } } } @@ -80,44 +94,53 @@ where .map_err(|e| vortex_err!("OnPair compress failed: {e}"))?; let bits = column.bits; let dict_bytes = dict_bytes_to_buffer(column.dict_bytes); - let codes_offsets = build_codes_offsets(&column.codes, &column.dict_offsets, &offsets)?; + let codes_offsets = + build_codes_offsets(&column.codes, &column.dict_offsets, &offsets)?.into_array(); let codes = Buffer::from(column.codes).into_array(); let dict_offsets = Buffer::from(column.dict_offsets).into_array(); - let codes_offsets = Buffer::from(codes_offsets).into_array(); let uncompressed_lengths = uncompressed_lengths.into_array(); - let validity = match dtype.nullability() { - Nullability::NonNullable => Validity::NonNullable, - Nullability::Nullable => Validity::from_iter(validity_bits), - }; OnPair::try_new( - dtype, + array.dtype().clone(), dict_bytes, dict_offsets, codes, codes_offsets, uncompressed_lengths, - validity, + array.validity()?, bits, ) } +fn view_bytes<'a>(view: &'a BinaryView, buffers: &'a [&ByteBuffer]) -> &'a [u8] { + if view.is_inlined() { + view.as_inlined().value() + } else { + let view_ref = view.as_view(); + &buffers[view_ref.buffer_index as usize][view_ref.as_range()] + } +} + /// Lift compressed dictionary bytes into the Vortex buffer slot. fn dict_bytes_to_buffer(dict_bytes: Vec) -> BufferHandle { // Pad the dictionary blob with MAX_TOKEN_SIZE zero bytes so the // over-copy decoder can issue a fixed 16-byte load for every token // without risking an OOB read on the last entry. - let mut padded = Vec::with_capacity(dict_bytes.len() + onpair::MAX_TOKEN_SIZE); - padded.extend_from_slice(&dict_bytes); - padded.resize(dict_bytes.len() + onpair::MAX_TOKEN_SIZE, 0); + // // Align dict_bytes to 8 bytes so the segment that ultimately holds the // OnPair tree starts at an 8-aligned in-memory address. Without this // anchor, the per-buffer padding the serializer inserts is only // *relative* to the segment start; if the segment lands at a u8-aligned // heap address, downstream `PrimitiveArray::deserialize` panics // with `Misaligned buffer cannot be used to build PrimitiveArray of u32`. - BufferHandle::new_host(ByteBuffer::from(padded).aligned(vortex_buffer::Alignment::new(8))) + let mut padded = ByteBufferMut::with_capacity_aligned( + dict_bytes.len() + onpair::MAX_TOKEN_SIZE, + Alignment::new(8), + ); + padded.extend_from_slice(&dict_bytes); + unsafe { padded.push_n_unchecked(0, dict_bytes.len() + onpair::MAX_TOKEN_SIZE - padded.len()) }; + BufferHandle::new_host(padded.freeze()) } /// Reconstruct the per-row `codes_offsets` from the flat `codes`, the @@ -128,9 +151,9 @@ fn build_codes_offsets( codes: &[u16], dict_offsets: &[u32], row_byte_offsets: &[O], -) -> VortexResult> { +) -> VortexResult> { let nrows = row_byte_offsets.len() - 1; - let mut codes_offsets = Vec::with_capacity(nrows + 1); + let mut codes_offsets = BufferMut::with_capacity(nrows + 1); codes_offsets.push(0u32); let mut decoded_bytes: u64 = 0; let mut code_idx: usize = 0; @@ -149,38 +172,16 @@ fn build_codes_offsets( .map_err(|_| vortex_err!("OnPair: code boundary {code_idx} does not fit u32"))?, ); } - Ok(codes_offsets) -} - -/// Compress a byte-string accessor (typically a `VarBinArray` or -/// `VarBinViewArray`). -pub fn onpair_compress>( - array: A, - len: usize, - dtype: &DType, - config: Config, -) -> VortexResult { - array.with_iterator(|iter| onpair_compress_iter(iter, len, dtype.clone(), config)) + Ok(codes_offsets.freeze()) } /// Compress any [`ArrayRef`] whose canonical form is a string array, by first /// canonicalising to `VarBinViewArray`. -pub fn onpair_compress_array( +pub fn onpair_compress( array: &ArrayRef, config: Config, ctx: &mut ExecutionCtx, ) -> VortexResult { let view = array.clone().execute::(ctx)?; - let len = view.len(); - let dtype = view.dtype().clone(); - onpair_compress(&view, len, &dtype, config) -} - -/// Convenience: build a default `ExecutionCtx` from `LEGACY_SESSION`. -pub fn onpair_compress_array_default( - array: &ArrayRef, - config: Config, -) -> VortexResult { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); - onpair_compress_array(array, config, &mut ctx) + onpair_compress_varbinview::(view, config, ctx) } diff --git a/encodings/experimental/onpair/src/compute/compare.rs b/encodings/experimental/onpair/src/compute/compare.rs index 8c01a36977c..939a1a2fb42 100644 --- a/encodings/experimental/onpair/src/compute/compare.rs +++ b/encodings/experimental/onpair/src/compute/compare.rs @@ -69,6 +69,7 @@ mod tests { use rstest::rstest; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; + use vortex_array::array_session; use vortex_array::arrays::BoolArray; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::VarBinArray; @@ -84,7 +85,7 @@ mod tests { use crate::compress::DEFAULT_DICT12_CONFIG; use crate::compress::onpair_compress; - static SESSION: LazyLock = LazyLock::new(vortex_array::array_session); + static SESSION: LazyLock = LazyLock::new(array_session); #[cfg_attr(miri, ignore)] #[rstest] @@ -99,10 +100,9 @@ mod tests { [Some(""), Some("a"), Some(""), Some("bbb")], DType::Utf8(Nullability::NonNullable), ); - let arr = onpair_compress(&input, input.len(), input.dtype(), DEFAULT_DICT12_CONFIG)? - .into_array(); - let mut ctx = SESSION.create_execution_ctx(); + let arr = onpair_compress(input.as_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?.into_array(); + let result = arr .binary(ConstantArray::new("", input.len()).into_array(), op)? .execute::(&mut ctx)?; @@ -117,9 +117,8 @@ mod tests { [Some(""), None, Some("x")], DType::Utf8(Nullability::Nullable), ); - let arr = onpair_compress(&input, input.len(), input.dtype(), DEFAULT_DICT12_CONFIG)? - .into_array(); let mut ctx = SESSION.create_execution_ctx(); + let arr = onpair_compress(input.as_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?.into_array(); let eq_empty = arr .clone() diff --git a/encodings/experimental/onpair/src/tests.rs b/encodings/experimental/onpair/src/tests.rs index 571240f3206..4f2124025f2 100644 --- a/encodings/experimental/onpair/src/tests.rs +++ b/encodings/experimental/onpair/src/tests.rs @@ -6,7 +6,6 @@ use std::sync::LazyLock; use prost::Message; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; -use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::VarBinArray; use vortex_array::arrays::VarBinViewArray; @@ -62,74 +61,65 @@ fn test_onpair_metadata_golden() { #[cfg_attr(miri, ignore)] #[test] -fn test_onpair_roundtrip() { +fn test_onpair_roundtrip() -> vortex_error::VortexResult<()> { let input = sample_input(); - let len = input.len(); - let dtype = input.dtype().clone(); - let compressed = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG).expect("compress"); + let mut ctx = SESSION.create_execution_ctx(); + let compressed = onpair_compress(&input.into_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?; assert!(compressed.clone().into_array().is::()); - let mut ctx = SESSION.create_execution_ctx(); let decoded = compressed .into_array() - .execute::(&mut ctx) - .expect("canonicalize"); + .execute::(&mut ctx)?; - decoded - .with_iterator(|iter| { - let got: Vec>> = iter.map(|b| b.map(|s| s.to_vec())).collect(); - assert_eq!(got.len(), 5); - assert_eq!( - got[0].as_deref(), - Some(b"https://www.example.com/page".as_ref()) - ); - assert_eq!( - got[3].as_deref(), - Some(b"ftp://files.example.com/x".as_ref()) - ); - Ok::<_, vortex_error::VortexError>(()) - }) - .unwrap(); + let mask = decoded.validity()?.execute_mask(decoded.len(), &mut ctx)?; + let got: Vec>> = (0..decoded.len()) + .map(|i| mask.value(i).then(|| decoded.bytes_at(i).to_vec())) + .collect(); + assert_eq!(got.len(), 5); + assert_eq!( + got[0].as_deref(), + Some(b"https://www.example.com/page".as_ref()) + ); + assert_eq!( + got[3].as_deref(), + Some(b"ftp://files.example.com/x".as_ref()) + ); + Ok(()) } #[cfg_attr(miri, ignore)] #[test] -fn test_onpair_nullable_canonicalize() { +fn test_onpair_nullable_canonicalize() -> vortex_error::VortexResult<()> { let input = VarBinArray::from_iter( [Some("a"), None, Some("bbb"), None, Some("ccccc")], DType::Utf8(Nullability::Nullable), ); - let len = input.len(); - let dtype = input.dtype().clone(); - let arr = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG).unwrap(); let mut ctx = SESSION.create_execution_ctx(); - let canonical = arr - .into_array() - .execute::(&mut ctx) - .unwrap(); - canonical - .with_iterator(|iter| { - let got: Vec>> = iter.map(|b| b.map(|s| s.to_vec())).collect(); - assert_eq!(got[1], None); - assert_eq!(got[3], None); - assert_eq!(got[4].as_deref(), Some(b"ccccc".as_ref())); - Ok::<_, vortex_error::VortexError>(()) - }) - .unwrap(); + let arr = onpair_compress(&input.into_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?; + let canonical = arr.into_array().execute::(&mut ctx)?; + let mask = canonical + .validity()? + .execute_mask(canonical.len(), &mut ctx)?; + let got: Vec>> = (0..canonical.len()) + .map(|i| mask.value(i).then(|| canonical.bytes_at(i).to_vec())) + .collect(); + assert_eq!(got[1], None); + assert_eq!(got[3], None); + assert_eq!(got[4].as_deref(), Some(b"ccccc".as_ref())); + Ok(()) } #[cfg_attr(miri, ignore)] #[test] -fn test_onpair_scalar_at() { +fn test_onpair_scalar_at() -> vortex_error::VortexResult<()> { let input = sample_input(); - let len = input.len(); - let dtype = input.dtype().clone(); - let arr = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG).unwrap(); let mut ctx = SESSION.create_execution_ctx(); - let s = arr.into_array().execute_scalar(2, &mut ctx).unwrap(); + let arr = onpair_compress(&input.into_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?; + let s = arr.into_array().execute_scalar(2, &mut ctx)?; let v = s.as_utf8().value().unwrap(); assert_eq!(v.as_bytes(), b"https://www.test.org/page"); + Ok(()) } /// `scalar_at` must decode only the requested row's code window — fetching @@ -149,10 +139,9 @@ fn test_onpair_scalar_at_window() -> vortex_error::VortexResult<()> { strings.iter().map(|s| Some(s.as_bytes())), DType::Utf8(Nullability::NonNullable), ); - let arr = - onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG)?.into_array(); - let mut ctx = SESSION.create_execution_ctx(); + let arr = onpair_compress(&varbin.into_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?.into_array(); + for &i in &[0usize, 1, 999, 1000, n - 1] { let got = arr.execute_scalar(i, &mut ctx)?; assert_eq!( @@ -192,51 +181,43 @@ fn test_onpair_scalar_at_window() -> vortex_error::VortexResult<()> { #[case::n_7(7)] #[case::n_8(8)] #[case::n_9(9)] -fn test_onpair_unroll_tail_boundaries(#[case] n: usize) { +fn test_onpair_unroll_tail_boundaries(#[case] n: usize) -> vortex_error::VortexResult<()> { let words: &[&str] = &["a", "bb", "ccc", "https://www.example.com/x"]; let strings: Vec<&str> = (0..n).map(|i| words[i % words.len()]).collect(); let input = VarBinArray::from_iter( strings.iter().map(|s| Some(*s)), DType::Utf8(Nullability::NonNullable), ); - let len = input.len(); - let dtype = input.dtype().clone(); - let arr = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG).unwrap(); let mut ctx = SESSION.create_execution_ctx(); - let canonical = arr - .into_array() - .execute::(&mut ctx) - .unwrap(); - canonical - .with_iterator(|iter| { - let got: Vec>> = iter.map(|b| b.map(|s| s.to_vec())).collect(); - assert_eq!(got.len(), n); - for (i, expected) in strings.iter().enumerate() { - assert_eq!(got[i].as_deref(), Some(expected.as_bytes()), "n={n}, i={i}"); - } - Ok::<_, vortex_error::VortexError>(()) - }) - .unwrap(); + let arr = onpair_compress(&input.into_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?; + let canonical = arr.into_array().execute::(&mut ctx)?; + let mask = canonical + .validity()? + .execute_mask(canonical.len(), &mut ctx)?; + let got: Vec>> = (0..canonical.len()) + .map(|i| mask.value(i).then(|| canonical.bytes_at(i).to_vec())) + .collect(); + assert_eq!(got.len(), n); + for (i, expected) in strings.iter().enumerate() { + assert_eq!(got[i].as_deref(), Some(expected.as_bytes()), "n={n}, i={i}"); + } + Ok(()) } /// Empty array — the unroll path must short-circuit cleanly. #[cfg_attr(miri, ignore)] #[test] -fn test_onpair_empty() { +fn test_onpair_empty() -> vortex_error::VortexResult<()> { let input = VarBinArray::from_iter( std::iter::empty::>(), DType::Utf8(Nullability::NonNullable), ); - let len = input.len(); - let dtype = input.dtype().clone(); - let arr = onpair_compress(&input, len, &dtype, DEFAULT_DICT12_CONFIG).unwrap(); - assert_eq!(arr.len(), 0); let mut ctx = SESSION.create_execution_ctx(); - let canonical = arr - .into_array() - .execute::(&mut ctx) - .unwrap(); + let arr = onpair_compress(&input.into_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?; + assert_eq!(arr.len(), 0); + let canonical = arr.into_array().execute::(&mut ctx)?; assert_eq!(canonical.len(), 0); + Ok(()) } /// Filter must share the dictionary — never recompress (this is the @@ -244,7 +225,7 @@ fn test_onpair_empty() { /// and check that the result is bit-exact and still an OnPairArray. #[cfg_attr(miri, ignore)] #[test] -fn test_onpair_filter_shares_dict() { +fn test_onpair_filter_shares_dict() -> vortex_error::VortexResult<()> { let n = 5_000usize; let strings: Vec = (0..n) .map(|i| format!("https://www.example.com/items/{i:08}")) @@ -253,8 +234,8 @@ fn test_onpair_filter_shares_dict() { strings.iter().map(|s| Some(s.as_bytes())), DType::Utf8(Nullability::NonNullable), ); - let arr = - onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG).unwrap(); + let mut ctx = SESSION.create_execution_ctx(); + let arr = onpair_compress(&varbin.into_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?; let dict_bytes_before = arr.dict_bytes().clone(); let dict_offsets_len_before = arr.dict_offsets().len(); @@ -268,35 +249,33 @@ fn test_onpair_filter_shares_dict() { .collect(); let mut filter_ctx = SESSION.create_execution_ctx(); - let filtered = ::filter(arr.as_view(), &mask, &mut filter_ctx) - .unwrap() + let filtered = ::filter(arr.as_view(), &mask, &mut filter_ctx)? .expect("OnPair filter must return Some"); assert!( filtered.is::(), "filter dropped OnPair encoding: got {}", filtered.encoding_id() ); - let typed = filtered.try_downcast::().expect("OnPair"); + let typed = filtered + .try_downcast::() + .map_err(|_| vortex_error::vortex_err!("filter result was not OnPair"))?; // Dict must be byte-identical with the input — no retrain, no copy. assert_eq!(typed.dict_bytes().as_slice(), dict_bytes_before.as_slice()); assert_eq!(typed.dict_offsets().len(), dict_offsets_len_before); assert_eq!(typed.len(), expected.len()); - let mut ctx = SESSION.create_execution_ctx(); - let canonical = typed - .into_array() - .execute::(&mut ctx) - .unwrap(); - canonical - .with_iterator(|iter| { - let got: Vec>> = iter.map(|b| b.map(|s| s.to_vec())).collect(); - assert_eq!(got.len(), expected.len()); - for (i, want) in expected.iter().enumerate() { - assert_eq!(got[i].as_deref(), Some(want.as_bytes()), "row {i}"); - } - Ok::<_, vortex_error::VortexError>(()) - }) - .unwrap(); + let canonical = typed.into_array().execute::(&mut ctx)?; + let mask = canonical + .validity()? + .execute_mask(canonical.len(), &mut ctx)?; + let got: Vec>> = (0..canonical.len()) + .map(|i| mask.value(i).then(|| canonical.bytes_at(i).to_vec())) + .collect(); + assert_eq!(got.len(), expected.len()); + for (i, want) in expected.iter().enumerate() { + assert_eq!(got[i].as_deref(), Some(want.as_bytes()), "row {i}"); + } + Ok(()) } /// Rebuild an OnPair array, swapping `codes_offsets` for a narrowed @@ -348,7 +327,7 @@ fn narrow_codes_offsets(arr: &crate::OnPairArray, target: PType) -> crate::OnPai /// of type u16`. The fix dispatches via `match_each_integer_ptype!`. #[cfg_attr(miri, ignore)] #[test] -fn test_onpair_filter_with_narrowed_codes_offsets_u16() { +fn test_onpair_filter_with_narrowed_codes_offsets_u16() -> vortex_error::VortexResult<()> { let n = 200usize; // Short rows so per-row token counts stay small and codes_offsets // values fit in u16. (We narrow manually below regardless — this @@ -359,8 +338,8 @@ fn test_onpair_filter_with_narrowed_codes_offsets_u16() { strings.iter().map(|s| Some(s.as_bytes())), DType::Utf8(Nullability::NonNullable), ); - let arr = - onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG).unwrap(); + let mut ctx = SESSION.create_execution_ctx(); + let arr = onpair_compress(&varbin.into_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?; // Force `codes_offsets` to u16 so the panicking pre-fix // `as_slice::()` would fire. @@ -382,27 +361,25 @@ fn test_onpair_filter_with_narrowed_codes_offsets_u16() { let mut filter_ctx = SESSION.create_execution_ctx(); // Pre-fix: this call panics with "Attempted to get slice of type // u32 from array of type u16". Post-fix: succeeds. - let filtered = ::filter(arr.as_view(), &mask, &mut filter_ctx) - .unwrap() + let filtered = ::filter(arr.as_view(), &mask, &mut filter_ctx)? .expect("OnPair filter must return Some"); - let typed = filtered.try_downcast::().expect("OnPair"); + let typed = filtered + .try_downcast::() + .map_err(|_| vortex_error::vortex_err!("filter result was not OnPair"))?; assert_eq!(typed.len(), expected.len()); - let mut ctx = SESSION.create_execution_ctx(); - let canonical = typed - .into_array() - .execute::(&mut ctx) - .unwrap(); - canonical - .with_iterator(|iter| { - let got: Vec>> = iter.map(|b| b.map(|s| s.to_vec())).collect(); - assert_eq!(got.len(), expected.len()); - for (i, want) in expected.iter().enumerate() { - assert_eq!(got[i].as_deref(), Some(want.as_bytes()), "row {i}"); - } - Ok::<_, vortex_error::VortexError>(()) - }) - .unwrap(); + let canonical = typed.into_array().execute::(&mut ctx)?; + let mask = canonical + .validity()? + .execute_mask(canonical.len(), &mut ctx)?; + let got: Vec>> = (0..canonical.len()) + .map(|i| mask.value(i).then(|| canonical.bytes_at(i).to_vec())) + .collect(); + assert_eq!(got.len(), expected.len()); + for (i, want) in expected.iter().enumerate() { + assert_eq!(got[i].as_deref(), Some(want.as_bytes()), "row {i}"); + } + Ok(()) } /// Same regression, narrowed to u8 (smallest possible ptype) — extra @@ -410,25 +387,25 @@ fn test_onpair_filter_with_narrowed_codes_offsets_u16() { /// cascading compressor might pick. #[cfg_attr(miri, ignore)] #[test] -fn test_onpair_filter_with_narrowed_codes_offsets_u8() { +fn test_onpair_filter_with_narrowed_codes_offsets_u8() -> vortex_error::VortexResult<()> { let n = 100usize; let strings: Vec = (0..n).map(|i| format!("{i}")).collect(); let varbin = VarBinArray::from_iter( strings.iter().map(|s| Some(s.as_bytes())), DType::Utf8(Nullability::NonNullable), ); - let arr = - onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG).unwrap(); + let mut ctx = SESSION.create_execution_ctx(); + let arr = onpair_compress(&varbin.into_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?; let arr = narrow_codes_offsets(&arr, PType::U8); assert_eq!(arr.as_view().codes_offsets().dtype().as_ptype(), PType::U8); let mask = vortex_mask::Mask::from_iter((0..n).map(|i| i % 2 == 0)); let mut filter_ctx = SESSION.create_execution_ctx(); - let filtered = ::filter(arr.as_view(), &mask, &mut filter_ctx) - .unwrap() + let filtered = ::filter(arr.as_view(), &mask, &mut filter_ctx)? .expect("OnPair filter must return Some"); assert_eq!(filtered.len(), n / 2); + Ok(()) } /// Regression: canonicalising a *sliced* OnPair array. `slice` keeps the full @@ -450,8 +427,8 @@ fn test_onpair_slice_canonicalize() -> vortex_error::VortexResult<()> { strings.iter().map(|s| Some(s.as_bytes())), DType::Utf8(Nullability::NonNullable), ); - let arr = - onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG)?.into_array(); + let mut ctx = SESSION.create_execution_ctx(); + let arr = onpair_compress(&varbin.into_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?.into_array(); // interior (start>0, end0, // end=n), and a near-full window. @@ -464,20 +441,21 @@ fn test_onpair_slice_canonicalize() -> vortex_error::VortexResult<()> { sliced.encoding_id() ); - let mut ctx = SESSION.create_execution_ctx(); let canonical = sliced.execute::(&mut ctx)?; - canonical.with_iterator(|iter| { - let got: Vec>> = iter.map(|b| b.map(|s| s.to_vec())).collect(); - assert_eq!(got.len(), end - start, "window {start}..{end} length"); - for (i, want) in strings[start..end].iter().enumerate() { - assert_eq!( - got[i].as_deref(), - Some(want.as_bytes()), - "window {start}..{end} row {i}" - ); - } - Ok::<_, vortex_error::VortexError>(()) - })?; + let mask = canonical + .validity()? + .execute_mask(canonical.len(), &mut ctx)?; + let got: Vec>> = (0..canonical.len()) + .map(|i| mask.value(i).then(|| canonical.bytes_at(i).to_vec())) + .collect(); + assert_eq!(got.len(), end - start, "window {start}..{end} length"); + for (i, want) in strings[start..end].iter().enumerate() { + assert_eq!( + got[i].as_deref(), + Some(want.as_bytes()), + "window {start}..{end} row {i}" + ); + } } Ok(()) } diff --git a/encodings/experimental/onpair/tests/big_data.rs b/encodings/experimental/onpair/tests/big_data.rs index 9a38788378c..e7f74fdd782 100644 --- a/encodings/experimental/onpair/tests/big_data.rs +++ b/encodings/experimental/onpair/tests/big_data.rs @@ -17,7 +17,6 @@ use std::time::Instant; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; -use vortex_array::accessor::ArrayAccessor; use vortex_array::aggregate_fn::fns::sum::sum; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::VarBinArray; @@ -63,7 +62,7 @@ fn corpus(n: usize) -> Vec { #[test] #[cfg_attr(miri, ignore)] -fn smoke_100k_rows() { +fn smoke_100k_rows() -> vortex_error::VortexResult<()> { let n = 100_000; let strings = corpus(n); let raw_bytes: usize = strings.iter().map(|s| s.len()).sum(); @@ -73,9 +72,9 @@ fn smoke_100k_rows() { DType::Utf8(Nullability::NonNullable), ); + let mut ctx = SESSION.create_execution_ctx(); let t0 = Instant::now(); - let arr = onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG) - .expect("compress"); + let arr = onpair_compress(&varbin.into_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?; let compress_elapsed = t0.elapsed(); let bits = arr.bits(); eprintln!( @@ -84,26 +83,19 @@ fn smoke_100k_rows() { ); let arr_ref = arr.into_array(); - let mut ctx = SESSION.create_execution_ctx(); // Full canonical round-trip via the pure-Rust decoder. let t0 = Instant::now(); - let decoded = arr_ref - .clone() - .execute::(&mut ctx) - .expect("canonicalize"); + let decoded = arr_ref.clone().execute::(&mut ctx)?; eprintln!("canonicalized in {:?}", t0.elapsed()); assert_eq!(decoded.len(), n); - decoded - .with_iterator(|iter| { - for (i, got) in iter.enumerate() { - let want = strings[i].as_bytes(); - assert_eq!(got, Some(want), "row {} mismatch", i); - } - Ok::<_, vortex_error::VortexError>(()) - }) - .unwrap(); + let mask = decoded.validity()?.execute_mask(decoded.len(), &mut ctx)?; + for i in 0..decoded.len() { + let got = mask.value(i).then(|| decoded.bytes_at(i)); + let want = strings[i].as_bytes(); + assert_eq!(got.as_deref(), Some(want), "row {} mismatch", i); + } eprintln!("roundtrip OK on all {} rows", n); // Equality pushdown: pick a specific row's value and ensure the kernel @@ -115,16 +107,11 @@ fn smoke_100k_rows() { .binary( ConstantArray::new(needle.as_str(), n).into_array(), Operator::Eq, - ) - .unwrap() - .execute::(&mut ctx) - .unwrap() + )? + .execute::(&mut ctx)? .into_array(); - let eq_count = sum(&eq, &mut ctx) - .unwrap() - .as_primitive() - .as_::() - .unwrap(); + let eq_count = sum(&eq, &mut ctx)?.as_primitive().as_::().unwrap(); assert_eq!(eq_count, want_eq); eprintln!("eq pushdown matches reference count ({})", want_eq); + Ok(()) } diff --git a/encodings/fastlanes/src/for/array/for_compress.rs b/encodings/fastlanes/src/for/array/for_compress.rs index e39b4924e78..bf849ec241b 100644 --- a/encodings/fastlanes/src/for/array/for_compress.rs +++ b/encodings/fastlanes/src/for/array/for_compress.rs @@ -3,9 +3,8 @@ use num_traits::PrimInt; use num_traits::WrappingSub; +use vortex_array::ExecutionCtx; use vortex_array::IntoArray; -use vortex_array::LEGACY_SESSION; -use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; use vortex_array::dtype::NativePType; use vortex_array::expr::stats::Stat; @@ -16,16 +15,17 @@ use vortex_error::vortex_err; use crate::FoR; use crate::FoRArray; use crate::FoRData; + impl FoRData { - pub fn encode(array: PrimitiveArray) -> VortexResult { + pub fn encode(array: PrimitiveArray, ctx: &mut ExecutionCtx) -> VortexResult { let array_ref = array.clone().into_array(); let min = array_ref .statistics() - .compute_stat(Stat::Min, &mut LEGACY_SESSION.create_execution_ctx())? + .compute_stat(Stat::Min, ctx)? .ok_or_else(|| vortex_err!("Min stat not found"))?; let encoded = match_each_integer_ptype!(array.ptype(), |T| { - compress_primitive::(array, T::try_from(&min)?)?.into_array() + compress_primitive::(array, T::try_from(&min)?, ctx)?.into_array() }); FoR::try_new(encoded, min) } @@ -34,16 +34,20 @@ impl FoRData { fn compress_primitive( parray: PrimitiveArray, min: T, + ctx: &mut ExecutionCtx, ) -> VortexResult { // Set null values to the min value, ensuring that decompress into a value in the primitive // range (and stop them wrapping around). - let encoded = parray.map_each_with_validity::(|(v, bool)| { - if bool { - v.wrapping_sub(&min) - } else { - T::zero() - } - })?; + let encoded = parray.map_each_with_validity::( + |(v, bool)| { + if bool { + v.wrapping_sub(&min) + } else { + T::zero() + } + }, + ctx, + )?; Ok(encoded) } @@ -53,12 +57,14 @@ mod test { use itertools::Itertools; use vortex_array::VortexSessionExecute; + use vortex_array::array_session; use vortex_array::arrays::primitive::PrimitiveArrayExt; use vortex_array::assert_arrays_eq; use vortex_array::dtype::PType; use vortex_array::expr::stats::StatsProvider; use vortex_array::scalar::Scalar; use vortex_array::validity::Validity; + use vortex_buffer::Buffer; use vortex_buffer::buffer; use vortex_session::VortexSession; @@ -69,33 +75,30 @@ mod test { use crate::r#for::array::for_decompress::fused_decompress; static SESSION: LazyLock = LazyLock::new(|| { - let session = vortex_array::array_session(); + let session = array_session(); crate::initialize(&session); session }); #[test] fn test_compress_round_trip_small() { - let array = PrimitiveArray::new( - (1i32..10).collect::>(), - Validity::NonNullable, - ); - let compressed = FoRData::encode(array.clone()).unwrap(); + let mut ctx = SESSION.create_execution_ctx(); + let array = PrimitiveArray::new((1i32..10).collect::>(), Validity::NonNullable); + let compressed = FoRData::encode(array.clone(), &mut ctx).unwrap(); assert_eq!(i32::try_from(compressed.reference_scalar()).unwrap(), 1); - assert_arrays_eq!(compressed, array, &mut SESSION.create_execution_ctx()); + assert_arrays_eq!(compressed, array, &mut ctx); } #[test] fn test_compress() { + let mut ctx = SESSION.create_execution_ctx(); // Create a range offset by a million. let array = PrimitiveArray::new( - (0u32..10_000) - .map(|v| v + 1_000_000) - .collect::>(), + (0u32..10_000).map(|v| v + 1_000_000).collect::>(), Validity::NonNullable, ); - let compressed = FoRData::encode(array).unwrap(); + let compressed = FoRData::encode(array, &mut ctx).unwrap(); assert_eq!( u32::try_from(compressed.reference_scalar()).unwrap(), 1_000_000u32 @@ -104,28 +107,27 @@ mod test { #[test] fn test_zeros() { + let mut ctx = SESSION.create_execution_ctx(); let array = PrimitiveArray::new(buffer![0i32; 100], Validity::NonNullable); assert_eq!(array.statistics().len(), 0); let dtype = array.dtype().clone(); - let compressed = FoRData::encode(array).unwrap(); + let compressed = FoRData::encode(array, &mut ctx).unwrap(); assert_eq!(compressed.reference_scalar().dtype(), &dtype); assert!(compressed.reference_scalar().dtype().is_signed_int()); assert!(compressed.encoded().dtype().is_signed_int()); - let encoded = compressed - .encoded() - .execute_scalar(0, &mut SESSION.create_execution_ctx()) - .unwrap(); + let encoded = compressed.encoded().execute_scalar(0, &mut ctx).unwrap(); assert_eq!(encoded, Scalar::from(0i32)); } #[test] fn test_decompress() { + let mut ctx = SESSION.create_execution_ctx(); // Create a range offset by a million. let array = PrimitiveArray::from_iter((0u32..100_000).step_by(1024).map(|v| v + 1_000_000)); - let compressed = FoRData::encode(array.clone()).unwrap(); - assert_arrays_eq!(compressed, array, &mut SESSION.create_execution_ctx()); + let compressed = FoRData::encode(array.clone(), &mut ctx).unwrap(); + assert_arrays_eq!(compressed, array, &mut ctx); } #[test] @@ -156,7 +158,7 @@ mod test { fn test_overflow() -> VortexResult<()> { let mut ctx = SESSION.create_execution_ctx(); let array = PrimitiveArray::from_iter(i8::MIN..=i8::MAX); - let compressed = FoRData::encode(array.clone())?; + let compressed = FoRData::encode(array.clone(), &mut ctx)?; assert_eq!( i8::MIN, compressed diff --git a/encodings/fastlanes/src/for/compute/is_sorted.rs b/encodings/fastlanes/src/for/compute/is_sorted.rs index a850f25583e..7cf43a93da4 100644 --- a/encodings/fastlanes/src/for/compute/is_sorted.rs +++ b/encodings/fastlanes/src/for/compute/is_sorted.rs @@ -75,7 +75,7 @@ mod test { let mut ctx = array_session().create_execution_ctx(); let a = PrimitiveArray::new(buffer![-1, 0, i8::MAX], Validity::NonNullable); - let b = FoRData::encode(a).unwrap(); + let b = FoRData::encode(a, &mut ctx).unwrap(); assert!( is_sorted(&b.clone().into_array(), &mut ctx).unwrap(), "{}", @@ -83,7 +83,7 @@ mod test { ); let a = PrimitiveArray::new(buffer![i8::MIN, 0, i8::MAX], Validity::NonNullable); - let b = FoRData::encode(a).unwrap(); + let b = FoRData::encode(a, &mut ctx).unwrap(); assert!( is_sorted(&b.clone().into_array(), &mut ctx).unwrap(), "{}", @@ -91,7 +91,7 @@ mod test { ); let a = PrimitiveArray::new(buffer![i8::MIN, 0, 30, 127], Validity::NonNullable); - let b = FoRData::encode(a).unwrap(); + let b = FoRData::encode(a, &mut ctx).unwrap(); assert!( is_sorted(&b.clone().into_array(), &mut ctx).unwrap(), "{}", @@ -99,7 +99,7 @@ mod test { ); let a = PrimitiveArray::new(buffer![i8::MIN, -3, -1], Validity::NonNullable); - let b = FoRData::encode(a).unwrap(); + let b = FoRData::encode(a, &mut ctx).unwrap(); assert!( is_sorted(&b.clone().into_array(), &mut ctx).unwrap(), "{}", @@ -107,7 +107,7 @@ mod test { ); let a = PrimitiveArray::new(buffer![-10, -3, -1], Validity::NonNullable); - let b = FoRData::encode(a).unwrap(); + let b = FoRData::encode(a, &mut ctx).unwrap(); assert!( is_sorted(&b.clone().into_array(), &mut ctx).unwrap(), "{}", @@ -115,7 +115,7 @@ mod test { ); let a = PrimitiveArray::new(buffer![-10, -11, -1], Validity::NonNullable); - let b = FoRData::encode(a).unwrap(); + let b = FoRData::encode(a, &mut ctx).unwrap(); assert!( !is_sorted(&b.clone().into_array(), &mut ctx).unwrap(), "{}", @@ -123,7 +123,7 @@ mod test { ); let a = PrimitiveArray::new(buffer![-10, i8::MIN, -1], Validity::NonNullable); - let b = FoRData::encode(a).unwrap(); + let b = FoRData::encode(a, &mut ctx).unwrap(); assert!( !is_sorted(&b.clone().into_array(), &mut ctx).unwrap(), "{}", diff --git a/encodings/fastlanes/src/for/vtable/mod.rs b/encodings/fastlanes/src/for/vtable/mod.rs index d3a0bb84811..c840c0e0af1 100644 --- a/encodings/fastlanes/src/for/vtable/mod.rs +++ b/encodings/fastlanes/src/for/vtable/mod.rs @@ -173,8 +173,8 @@ impl FoR { } /// Encode a primitive array using Frame of Reference encoding. - pub fn encode(array: PrimitiveArray) -> VortexResult { - FoRData::encode(array) + pub fn encode(array: PrimitiveArray, ctx: &mut ExecutionCtx) -> VortexResult { + FoRData::encode(array, ctx) } } diff --git a/encodings/fastlanes/src/for/vtable/operations.rs b/encodings/fastlanes/src/for/vtable/operations.rs index b6deefbe038..e549431f973 100644 --- a/encodings/fastlanes/src/for/vtable/operations.rs +++ b/encodings/fastlanes/src/for/vtable/operations.rs @@ -57,8 +57,13 @@ mod test { #[test] fn for_scalar_at() { - let for_arr = FoRData::encode(PrimitiveArray::from_iter([-100, 1100, 1500, 1900])).unwrap(); + let mut ctx = SESSION.create_execution_ctx(); + let for_arr = FoRData::encode( + PrimitiveArray::from_iter([-100, 1100, 1500, 1900]), + &mut ctx, + ) + .unwrap(); let expected = PrimitiveArray::from_iter([-100, 1100, 1500, 1900]); - assert_arrays_eq!(for_arr, expected, &mut SESSION.create_execution_ctx()); + assert_arrays_eq!(for_arr, expected, &mut ctx); } } diff --git a/encodings/fsst/benches/fsst_url_compare.rs b/encodings/fsst/benches/fsst_url_compare.rs index a62dea8aa9e..9ff4e3debd5 100644 --- a/encodings/fsst/benches/fsst_url_compare.rs +++ b/encodings/fsst/benches/fsst_url_compare.rs @@ -13,6 +13,7 @@ use vortex_array::VortexSessionExecute; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::VarBinArray; use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::varbin::VarBinArrayExt; use vortex_array::builtins::ArrayBuiltins; use vortex_array::expr::like; use vortex_array::expr::lit; @@ -57,13 +58,17 @@ static URL_VIEW_DATA: LazyLock = LazyLock::new(|| { /// Pick a concrete URL from the dataset that uses the given domain. fn pick_url_with_domain(data: &VarBinArray, domain: &str) -> String { - use vortex_array::accessor::ArrayAccessor; - data.with_iterator(|iter| { - iter.flatten() - .map(|b| std::str::from_utf8(b).unwrap().to_string()) - .find(|u| u.contains(domain)) - .unwrap_or_else(|| format!("http://{domain}/missing")) - }) + let mut ctx = SESSION.create_execution_ctx(); + let mask = data + .validity() + .unwrap() + .execute_mask(data.len(), &mut ctx) + .unwrap(); + (0..data.len()) + .filter(|&i| mask.value(i)) + .map(|i| unsafe { String::from_utf8_unchecked(data.bytes_at(i).to_vec()) }) + .find(|u| u.contains(domain)) + .unwrap_or_else(|| format!("http://{domain}/missing")) } #[divan::bench] diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs index 2526f0a697f..532fea8bae5 100644 --- a/encodings/fsst/src/array.rs +++ b/encodings/fsst/src/array.rs @@ -840,7 +840,6 @@ mod test { use vortex_array::ArrayPlugin; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; - use vortex_array::accessor::ArrayAccessor; use vortex_array::array_session; use vortex_array::arrays::VarBinViewArray; use vortex_array::buffer::BufferHandle; @@ -849,7 +848,6 @@ mod test { use vortex_array::dtype::PType; use vortex_array::test_harness::check_metadata; use vortex_buffer::Buffer; - use vortex_error::VortexError; use vortex_error::VortexResult; use vortex_error::vortex_err; @@ -903,7 +901,7 @@ mod test { /// This test manually constructs an old-style FSST array and ensures that it can still be /// deserialized. #[test] - fn test_back_compat() { + fn test_back_compat() -> VortexResult<()> { let symbols = Buffer::::copy_from([ Symbol::from_slice(b"abc00000"), Symbol::from_slice(b"defghijk"), @@ -913,7 +911,7 @@ mod test { let compressor = Compressor::rebuild_from(symbols.as_slice(), symbol_lengths.as_slice()); let mut ctx = array_session().create_execution_ctx(); let input = VarBinViewArray::from_iter_str(["abcabcab", "defghijk"]); - let fsst_array = fsst_compress(&input.into_array(), &compressor, &mut ctx).unwrap(); + let fsst_array = fsst_compress(&input.into_array(), &compressor, &mut ctx)?; let compressed_codes = fsst_array.codes(); @@ -950,18 +948,17 @@ mod test { &buffers, &children.as_slice(), &array_session(), - ) - .unwrap(); - - let decompressed = fsst - .execute::(&mut array_session().create_execution_ctx()) - .unwrap(); - decompressed - .with_iterator(|it| { - assert_eq!(it.next().unwrap(), Some(b"abcabcab".as_ref())); - assert_eq!(it.next().unwrap(), Some(b"defghijk".as_ref())); - Ok::<_, VortexError>(()) - }) - .unwrap() + )?; + + let decompressed = + fsst.execute::(&mut array_session().create_execution_ctx())?; + let mask = decompressed + .validity()? + .execute_mask(decompressed.len(), &mut ctx)?; + assert!(mask.value(0)); + assert_eq!(decompressed.bytes_at(0).as_slice(), b"abcabcab".as_ref()); + assert!(mask.value(1)); + assert_eq!(decompressed.bytes_at(1).as_slice(), b"defghijk".as_ref()); + Ok(()) } } diff --git a/encodings/fsst/src/canonical.rs b/encodings/fsst/src/canonical.rs index 86caf98825e..43712aad7dd 100644 --- a/encodings/fsst/src/canonical.rs +++ b/encodings/fsst/src/canonical.rs @@ -96,7 +96,6 @@ mod tests { use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; - use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::ChunkedArray; use vortex_array::arrays::VarBinArray; use vortex_array::arrays::VarBinViewArray; @@ -182,8 +181,10 @@ mod tests { { let arr = builder.finish_into_canonical().into_varbinview(); - let res1 = - arr.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::>()); + let mask = arr.validity()?.execute_mask(arr.len(), &mut ctx)?; + let res1 = (0..arr.len()) + .map(|i| mask.value(i).then(|| arr.bytes_at(i).to_vec())) + .collect::>(); assert_eq!(data, res1); }; @@ -192,8 +193,10 @@ mod tests { .as_array() .clone() .execute::(&mut ctx)?; - let res2 = - arr2.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::>()); + let mask = arr2.validity()?.execute_mask(arr2.len(), &mut ctx)?; + let res2 = (0..arr2.len()) + .map(|i| mask.value(i).then(|| arr2.bytes_at(i).to_vec())) + .collect::>(); assert_eq!(data, res2) }; Ok(()) diff --git a/encodings/pco/src/array.rs b/encodings/pco/src/array.rs index d0804cd80c1..231b73264ba 100644 --- a/encodings/pco/src/array.rs +++ b/encodings/pco/src/array.rs @@ -543,6 +543,7 @@ impl PcoData { self.ptype, unsliced_validity.slice(self.slice_start..self.slice_stop)?, self.slice_stop - self.slice_start, + ctx, )) } diff --git a/encodings/zstd/src/array.rs b/encodings/zstd/src/array.rs index 87dc9af286e..c5b0ff19780 100644 --- a/encodings/zstd/src/array.rs +++ b/encodings/zstd/src/array.rs @@ -22,7 +22,6 @@ use vortex_array::EqMode; use vortex_array::ExecutionCtx; use vortex_array::ExecutionResult; use vortex_array::IntoArray; -use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::VarBinViewArray; @@ -410,17 +409,28 @@ fn collect_valid_vbv( + mask.true_count() * size_of::(), ); let mut value_byte_indices = Vec::new(); - vbv.with_iterator(|iterator| { - // by flattening, we should omit nulls - for value in iterator.flatten() { - value_byte_indices.push(buffer.len()); - // here's where we write the string lengths - buffer - .extend_trusted(ViewLen::try_from(value.len())?.to_le_bytes().into_iter()); - buffer.extend_from_slice(value); + let views = vbv.views(); + let buffers = vbv + .data_buffers() + .iter() + .map(|b| b.as_host()) + .collect::>(); + // skip nulls, writing only valid values + for (i, view) in views.iter().enumerate() { + if !mask.value(i) { + continue; } - Ok::<_, VortexError>(()) - })?; + let value = if view.is_inlined() { + view.as_inlined().value() + } else { + let view_ref = view.as_view(); + &buffers[view_ref.buffer_index as usize][view_ref.as_range()] + }; + value_byte_indices.push(buffer.len()); + // here's where we write the string lengths + buffer.extend_trusted(ViewLen::try_from(value.len())?.to_le_bytes().into_iter()); + buffer.extend_from_slice(value); + } (buffer.freeze(), value_byte_indices) } }; @@ -971,6 +981,7 @@ impl ZstdData { dtype.as_ptype(), slice_validity, slice_n_rows, + ctx, ); Ok(primitive.into_array()) diff --git a/fuzz/src/array/compare.rs b/fuzz/src/array/compare.rs index 53988426822..f594a9b4102 100644 --- a/fuzz/src/array/compare.rs +++ b/fuzz/src/array/compare.rs @@ -4,7 +4,6 @@ use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; -use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::BoolArray; use vortex_array::arrays::DecimalArray; use vortex_array::arrays::PrimitiveArray; @@ -20,6 +19,7 @@ use vortex_array::scalar_fn::fns::binary::scalar_cmp; use vortex_array::scalar_fn::fns::operators::CompareOperator; use vortex_array::validity::Validity; use vortex_buffer::BitBuffer; +use vortex_buffer::ByteBuffer; use vortex_error::VortexExpect; use vortex_error::vortex_panic; @@ -134,32 +134,45 @@ pub fn compare_canonical_array( .clone() .execute::(ctx) .vortex_expect("to varbinview"); - varbinview.with_iterator(|iter| { - let utf8_value = value.as_utf8(); - compare_to( - iter.map(|v| v.map(|b| unsafe { str::from_utf8_unchecked(b) })), - utf8_value.value().vortex_expect("nulls handled before"), - operator, - result_nullability, - ) - }) + let mask = varbinview + .validity() + .vortex_expect("validity") + .execute_mask(varbinview.len(), ctx) + .vortex_expect("varbinview mask"); + let values: Vec> = (0..varbinview.len()) + .map(|i| mask.value(i).then(|| varbinview.bytes_at(i))) + .collect(); + let utf8_value = value.as_utf8(); + compare_to( + values.iter().map(|v| { + v.as_ref() + .map(|b| unsafe { str::from_utf8_unchecked(b.as_slice()) }) + }), + utf8_value.value().vortex_expect("nulls handled before"), + operator, + result_nullability, + ) } DType::Binary(_) => { let varbinview = array .clone() .execute::(ctx) .vortex_expect("to varbinview"); - varbinview.with_iterator(|iter| { - let binary_value = value.as_binary(); - compare_to( - // Don't understand the lifetime problem here but identity map makes it go away - #[expect(clippy::map_identity)] - iter.map(|v| v), - binary_value.value().vortex_expect("nulls handled before"), - operator, - result_nullability, - ) - }) + let mask = varbinview + .validity() + .vortex_expect("validity") + .execute_mask(varbinview.len(), ctx) + .vortex_expect("varbinview mask"); + let values: Vec> = (0..varbinview.len()) + .map(|i| mask.value(i).then(|| varbinview.bytes_at(i))) + .collect(); + let binary_value = value.as_binary(); + compare_to( + values.iter().map(|v| v.as_deref()), + binary_value.value().vortex_expect("nulls handled before"), + operator, + result_nullability, + ) } DType::List(..) | DType::FixedSizeList(..) | DType::Struct(..) => { let scalar_vals: Vec = (0..array.len()) diff --git a/fuzz/src/array/filter.rs b/fuzz/src/array/filter.rs index 8de1786d25f..aa54b0d9488 100644 --- a/fuzz/src/array/filter.rs +++ b/fuzz/src/array/filter.rs @@ -4,7 +4,6 @@ use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; -use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::BoolArray; use vortex_array::arrays::DecimalArray; use vortex_array::arrays::PrimitiveArray; @@ -90,12 +89,12 @@ pub fn filter_canonical_array( } DType::Utf8(_) | DType::Binary(_) => { let utf8 = array.clone().execute::(ctx)?; - let values = utf8.with_iterator(|iter| { - iter.zip(filter.iter()) - .filter(|(_, f)| **f) - .map(|(v, _)| v.map(|u| u.to_vec())) - .collect::>() - }); + let mask = utf8.validity()?.execute_mask(utf8.len(), ctx)?; + let values = (0..utf8.len()) + .zip(filter.iter()) + .filter(|(_, f)| **f) + .map(|(i, _)| mask.value(i).then(|| utf8.bytes_at(i).to_vec())) + .collect::>(); Ok(VarBinViewArray::from_iter(values, array.dtype().clone()).into_array()) } DType::List(..) | DType::FixedSizeList(..) => { diff --git a/fuzz/src/array/search_sorted.rs b/fuzz/src/array/search_sorted.rs index 788e51bac46..762182b35fb 100644 --- a/fuzz/src/array/search_sorted.rs +++ b/fuzz/src/array/search_sorted.rs @@ -6,7 +6,6 @@ use std::fmt::Debug; use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; -use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::BoolArray; use vortex_array::arrays::DecimalArray; use vortex_array::arrays::PrimitiveArray; @@ -133,8 +132,10 @@ pub fn search_sorted_canonical_array( } DType::Utf8(_) | DType::Binary(_) => { let utf8 = array.clone().execute::(ctx)?; - let opt_values = - utf8.with_iterator(|iter| iter.map(|v| v.map(|u| u.to_vec())).collect::>()); + let mask = utf8.validity()?.execute_mask(utf8.len(), ctx)?; + let opt_values = (0..utf8.len()) + .map(|i| mask.value(i).then(|| utf8.bytes_at(i).to_vec())) + .collect::>(); let to_find = if matches!(array.dtype(), DType::Utf8(_)) { BufferString::try_from(scalar)?.as_str().as_bytes().to_vec() } else { diff --git a/fuzz/src/array/slice.rs b/fuzz/src/array/slice.rs index 7758d41219c..2a151002805 100644 --- a/fuzz/src/array/slice.rs +++ b/fuzz/src/array/slice.rs @@ -4,7 +4,6 @@ use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; -use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::BoolArray; use vortex_array::arrays::DecimalArray; use vortex_array::arrays::FixedSizeListArray; @@ -69,8 +68,10 @@ pub fn slice_canonical_array( } DType::Utf8(_) | DType::Binary(_) => { let utf8 = array.clone().execute::(ctx)?; - let values = - utf8.with_iterator(|iter| iter.map(|v| v.map(|u| u.to_vec())).collect::>()); + let mask = utf8.validity()?.execute_mask(utf8.len(), ctx)?; + let values = (0..utf8.len()) + .map(|i| mask.value(i).then(|| utf8.bytes_at(i).to_vec())) + .collect::>(); Ok(VarBinViewArray::from_iter( values[start..stop].iter().cloned(), array.dtype().clone(), diff --git a/fuzz/src/array/sort.rs b/fuzz/src/array/sort.rs index c77e06a196b..3f00fccd06e 100644 --- a/fuzz/src/array/sort.rs +++ b/fuzz/src/array/sort.rs @@ -6,7 +6,6 @@ use std::cmp::Ordering; use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; -use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::BoolArray; use vortex_array::arrays::DecimalArray; use vortex_array::arrays::PrimitiveArray; @@ -86,8 +85,10 @@ pub fn sort_canonical_array(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexR } DType::Utf8(_) | DType::Binary(_) => { let utf8 = array.clone().execute::(ctx)?; - let mut opt_values = - utf8.with_iterator(|iter| iter.map(|v| v.map(|u| u.to_vec())).collect::>()); + let mask = utf8.validity()?.execute_mask(utf8.len(), ctx)?; + let mut opt_values = (0..utf8.len()) + .map(|i| mask.value(i).then(|| utf8.bytes_at(i).to_vec())) + .collect::>(); opt_values.sort(); Ok(VarBinViewArray::from_iter(opt_values, array.dtype().clone()).into_array()) } diff --git a/fuzz/src/array/take.rs b/fuzz/src/array/take.rs index 7bfc2226ba0..c67ae6184d8 100644 --- a/fuzz/src/array/take.rs +++ b/fuzz/src/array/take.rs @@ -4,7 +4,6 @@ use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; -use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::BoolArray; use vortex_array::arrays::DecimalArray; use vortex_array::arrays::PrimitiveArray; @@ -103,8 +102,10 @@ pub fn take_canonical_array( } DType::Utf8(_) | DType::Binary(_) => { let utf8 = array.clone().execute::(ctx)?; - let values = - utf8.with_iterator(|iter| iter.map(|v| v.map(|u| u.to_vec())).collect::>()); + let mask = utf8.validity()?.execute_mask(utf8.len(), ctx)?; + let values = (0..utf8.len()) + .map(|i| mask.value(i).then(|| utf8.bytes_at(i).to_vec())) + .collect::>(); Ok(VarBinViewArray::from_iter( indices .iter() diff --git a/vortex-array/benches/dict_compare.rs b/vortex-array/benches/dict_compare.rs index 29e1516e2b6..446623939f1 100644 --- a/vortex-array/benches/dict_compare.rs +++ b/vortex-array/benches/dict_compare.rs @@ -3,19 +3,18 @@ #![expect(clippy::unwrap_used)] -use std::str::from_utf8; use std::sync::LazyLock; use vortex_array::Canonical; use vortex_array::IntoArray; use vortex_array::RecursiveCanonical; use vortex_array::VortexSessionExecute; -use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::VarBinArray; use vortex_array::arrays::VarBinViewArray; use vortex_array::arrays::dict_test::gen_primitive_for_dict; use vortex_array::arrays::dict_test::gen_varbin_words; +use vortex_array::arrays::varbin::VarBinArrayExt; use vortex_array::builders::dict::dict_encode; use vortex_array::builtins::ArrayBuiltins; use vortex_array::expr::eq; @@ -50,12 +49,9 @@ const LENGTH_AND_UNIQUE_VALUES: &[(usize, usize)] = &[ #[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)] fn bench_compare_primitive(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) { + let mut ctx = SESSION.create_execution_ctx(); let primitive_arr = gen_primitive_for_dict::(len, uniqueness); - let dict = dict_encode( - &primitive_arr.clone().into_array(), - &mut SESSION.create_execution_ctx(), - ) - .unwrap(); + let dict = dict_encode(&primitive_arr.clone().into_array(), &mut ctx).unwrap(); let value = primitive_arr.as_slice::()[0]; bencher @@ -72,14 +68,11 @@ fn bench_compare_primitive(bencher: divan::Bencher, (len, uniqueness): (usize, u #[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)] fn bench_compare_varbin(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) { + let mut ctx = SESSION.create_execution_ctx(); let varbin_arr = VarBinArray::from(gen_varbin_words(len, uniqueness)); - let dict = dict_encode( - &varbin_arr.clone().into_array(), - &mut SESSION.create_execution_ctx(), - ) - .unwrap(); - let bytes = varbin_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec()); - let value = from_utf8(bytes.as_slice()).unwrap(); + let dict = dict_encode(&varbin_arr.clone().into_array(), &mut ctx).unwrap(); + let const_bytes = varbin_arr.bytes_at(0); + let value = unsafe { str::from_utf8_unchecked(const_bytes.as_slice()) }; bencher .with_inputs(|| (&dict, SESSION.create_execution_ctx())) @@ -95,14 +88,11 @@ fn bench_compare_varbin(bencher: divan::Bencher, (len, uniqueness): (usize, usiz #[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)] fn bench_compare_varbinview(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) { + let mut ctx = SESSION.create_execution_ctx(); let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(len, uniqueness)); - let dict = dict_encode( - &varbinview_arr.clone().into_array(), - &mut SESSION.create_execution_ctx(), - ) - .unwrap(); - let bytes = varbinview_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec()); - let value = from_utf8(bytes.as_slice()).unwrap(); + let dict = dict_encode(&varbinview_arr.clone().into_array(), &mut ctx).unwrap(); + let const_bytes = varbinview_arr.bytes_at(0); + let value = unsafe { str::from_utf8_unchecked(const_bytes.as_slice()) }; bencher .with_inputs(|| (&dict, SESSION.create_execution_ctx())) @@ -133,12 +123,9 @@ fn bench_compare_sliced_dict_primitive( bencher: divan::Bencher, (codes_len, values_len): (usize, usize), ) { + let mut ctx = SESSION.create_execution_ctx(); let primitive_arr = gen_primitive_for_dict::(codes_len.max(values_len), values_len); - let dict = dict_encode( - &primitive_arr.clone().into_array(), - &mut SESSION.create_execution_ctx(), - ) - .unwrap(); + let dict = dict_encode(&primitive_arr.clone().into_array(), &mut ctx).unwrap(); let dict = dict.into_array().slice(0..codes_len).unwrap(); let value = primitive_arr.as_slice::()[0]; @@ -158,15 +145,12 @@ fn bench_compare_sliced_dict_varbinview( bencher: divan::Bencher, (codes_len, values_len): (usize, usize), ) { + let mut ctx = SESSION.create_execution_ctx(); let varbin_arr = VarBinArray::from(gen_varbin_words(codes_len.max(values_len), values_len)); - let dict = dict_encode( - &varbin_arr.clone().into_array(), - &mut SESSION.create_execution_ctx(), - ) - .unwrap(); + let dict = dict_encode(&varbin_arr.clone().into_array(), &mut ctx).unwrap(); let dict = dict.into_array().slice(0..codes_len).unwrap(); - let bytes = varbin_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec()); - let value = from_utf8(bytes.as_slice()).unwrap(); + let const_bytes = varbin_arr.bytes_at(0); + let value = unsafe { str::from_utf8_unchecked(const_bytes.as_slice()) }; bencher .with_inputs(|| (&dict, SESSION.create_execution_ctx())) diff --git a/vortex-array/src/accessor.rs b/vortex-array/src/accessor.rs deleted file mode 100644 index 6460f879cb3..00000000000 --- a/vortex-array/src/accessor.rs +++ /dev/null @@ -1,14 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -/// Trait for arrays that support iterative access to their elements. -pub trait ArrayAccessor { - /// Iterate over each element of the array, in-order. - /// - /// The function `f` will be passed an [`Iterator`], it can call [`Iterator::next`] on the - /// iterator `len` times. Iterator elements are `Option` types, - /// regardless of the nullability of the underlying array data. - fn with_iterator(&self, f: F) -> R - where - F: for<'a> FnOnce(&mut dyn Iterator>) -> R; -} diff --git a/vortex-array/src/aggregate_fn/fns/is_sorted/mod.rs b/vortex-array/src/aggregate_fn/fns/is_sorted/mod.rs index 57ee2d342a0..0aa44d010b7 100644 --- a/vortex-array/src/aggregate_fn/fns/is_sorted/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/is_sorted/mod.rs @@ -487,7 +487,7 @@ impl AggregateFnVTable for IsSorted { let batch_is_sorted = match c { Canonical::Primitive(p) => check_primitive_sorted(p, partial.strict, ctx)?, Canonical::Bool(b) => check_bool_sorted(b, partial.strict, ctx)?, - Canonical::VarBinView(v) => check_varbinview_sorted(v, partial.strict)?, + Canonical::VarBinView(v) => check_varbinview_sorted(v, partial.strict, ctx)?, Canonical::Decimal(d) => check_decimal_sorted(d, partial.strict, ctx)?, Canonical::Extension(e) => check_extension_sorted(e, partial.strict, ctx)?, Canonical::Null(_) => !partial.strict, diff --git a/vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs b/vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs index 84529ef16c7..f0392f73946 100644 --- a/vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs +++ b/vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs @@ -4,13 +4,19 @@ use vortex_error::VortexResult; use super::IsSortedIteratorExt; -use crate::accessor::ArrayAccessor; +use crate::ExecutionCtx; use crate::arrays::VarBinViewArray; -pub(super) fn check_varbinview_sorted(array: &VarBinViewArray, strict: bool) -> VortexResult { +pub(super) fn check_varbinview_sorted( + array: &VarBinViewArray, + strict: bool, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let mask = array.validity()?.execute_mask(array.len(), ctx)?; + let iter = (0..array.len()).map(|i| mask.value(i).then(|| array.bytes_at(i))); Ok(if strict { - array.with_iterator(|bytes_iter| bytes_iter.is_strict_sorted()) + iter.is_strict_sorted() } else { - array.with_iterator(|bytes_iter| bytes_iter.is_sorted()) + iter.is_sorted() }) } diff --git a/vortex-array/src/aggregate_fn/fns/min_max/mod.rs b/vortex-array/src/aggregate_fn/fns/min_max/mod.rs index 312ed1e1408..cc986f66eae 100644 --- a/vortex-array/src/aggregate_fn/fns/min_max/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/min_max/mod.rs @@ -411,7 +411,7 @@ impl AggregateFnVTable for MinMax { Columnar::Canonical(c) => match c { Canonical::Primitive(p) => accumulate_primitive(partial, p, ctx), Canonical::Bool(b) => accumulate_bool(partial, b, ctx), - Canonical::VarBinView(v) => accumulate_varbinview(partial, v), + Canonical::VarBinView(v) => accumulate_varbinview(partial, v, ctx), Canonical::Decimal(d) => accumulate_decimal(partial, d, ctx), Canonical::Extension(e) => accumulate_extension(partial, e, ctx), Canonical::Null(_) => Ok(()), diff --git a/vortex-array/src/aggregate_fn/fns/min_max/varbin.rs b/vortex-array/src/aggregate_fn/fns/min_max/varbin.rs index a64cf80a240..9bf12d7f56a 100644 --- a/vortex-array/src/aggregate_fn/fns/min_max/varbin.rs +++ b/vortex-array/src/aggregate_fn/fns/min_max/varbin.rs @@ -7,7 +7,7 @@ use vortex_error::vortex_panic; use super::MinMaxPartial; use super::MinMaxResult; -use crate::accessor::ArrayAccessor; +use crate::ExecutionCtx; use crate::arrays::VarBinViewArray; use crate::dtype::DType; use crate::dtype::Nullability::NonNullable; @@ -16,27 +16,34 @@ use crate::scalar::Scalar; pub(super) fn accumulate_varbinview( partial: &mut MinMaxPartial, array: &VarBinViewArray, + ctx: &mut ExecutionCtx, ) -> VortexResult<()> { - partial.merge(varbin_compute_min_max(array, array.dtype())); + partial.merge(varbin_compute_min_max(array, array.dtype(), ctx)?); Ok(()) } -fn varbin_compute_min_max>( - array: &T, +fn varbin_compute_min_max( + array: &VarBinViewArray, dtype: &DType, -) -> Option { - array.with_iterator(|iter| match iter.flatten().minmax() { + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let mask = array.validity()?.execute_mask(array.len(), ctx)?; + let minmax = (0..array.len()) + .filter(|&i| mask.value(i)) + .map(|i| array.bytes_at(i)) + .minmax(); + Ok(match minmax { itertools::MinMaxResult::NoElements => None, itertools::MinMaxResult::OneElement(value) => { - let scalar = make_scalar(dtype, value); + let scalar = make_scalar(dtype, value.as_slice()); Some(MinMaxResult { min: scalar.clone(), max: scalar, }) } itertools::MinMaxResult::MinMax(min, max) => Some(MinMaxResult { - min: make_scalar(dtype, min), - max: make_scalar(dtype, max), + min: make_scalar(dtype, min.as_slice()), + max: make_scalar(dtype, max.as_slice()), }), }) } diff --git a/vortex-array/src/arrays/chunked/tests.rs b/vortex-array/src/arrays/chunked/tests.rs index d3cae8be8f0..24942300d52 100644 --- a/vortex-array/src/arrays/chunked/tests.rs +++ b/vortex-array/src/arrays/chunked/tests.rs @@ -6,12 +6,12 @@ use std::sync::LazyLock; use vortex_buffer::Buffer; use vortex_buffer::buffer; +use vortex_error::VortexResult; use vortex_session::VortexSession; use crate::Canonical; use crate::IntoArray; use crate::VortexSessionExecute; -use crate::accessor::ArrayAccessor; use crate::arrays::Chunked; use crate::arrays::ChunkedArray; use crate::arrays::ListArray; @@ -352,24 +352,22 @@ fn scalar_at_empty_children_leading() { } #[test] -pub fn pack_nested_structs() { +pub fn pack_nested_structs() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); let struct_array = StructArray::try_new( ["a"].into(), vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()], 4, Validity::NonNullable, - ) - .unwrap(); + )?; let dtype = struct_array.dtype().clone(); let chunked = ChunkedArray::try_new( vec![ - ChunkedArray::try_new(vec![struct_array.clone().into_array()], dtype.clone()) - .unwrap() + ChunkedArray::try_new(vec![struct_array.clone().into_array()], dtype.clone())? .into_array(), ], dtype, - ) - .unwrap() + )? .into_array(); #[expect(deprecated)] let canonical_struct = chunked.to_struct(); @@ -377,11 +375,28 @@ pub fn pack_nested_structs() { let canonical_varbin = canonical_struct.unmasked_fields()[0].to_varbinview(); #[expect(deprecated)] let original_varbin = struct_array.unmasked_fields()[0].to_varbinview(); - let orig_values = - original_varbin.with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::>()); - let canon_values = - canonical_varbin.with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::>()); + let orig_mask = original_varbin + .validity()? + .execute_mask(original_varbin.len(), &mut ctx)?; + let orig_values = (0..original_varbin.len()) + .map(|i| { + orig_mask + .value(i) + .then(|| original_varbin.bytes_at(i).to_vec()) + }) + .collect::>(); + let canon_mask = canonical_varbin + .validity()? + .execute_mask(canonical_varbin.len(), &mut ctx)?; + let canon_values = (0..canonical_varbin.len()) + .map(|i| { + canon_mask + .value(i) + .then(|| canonical_varbin.bytes_at(i).to_vec()) + }) + .collect::>(); assert_eq!(orig_values, canon_values); + Ok(()) } #[test] diff --git a/vortex-array/src/arrays/chunked/vtable/canonical.rs b/vortex-array/src/arrays/chunked/vtable/canonical.rs index 3cc9505d16b..995d90bc0aa 100644 --- a/vortex-array/src/arrays/chunked/vtable/canonical.rs +++ b/vortex-array/src/arrays/chunked/vtable/canonical.rs @@ -299,7 +299,6 @@ mod tests { use crate::Canonical; use crate::IntoArray; use crate::VortexSessionExecute; - use crate::accessor::ArrayAccessor; use crate::arrays::ChunkedArray; use crate::arrays::ConstantArray; use crate::arrays::FixedSizeListArray; @@ -550,10 +549,30 @@ mod tests { .clone() .execute::(&mut ctx) .unwrap(); - let orig_values = original_varbin - .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::>()); - let canon_values = canonical_varbin - .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::>()); + let orig_mask = original_varbin + .validity() + .unwrap() + .execute_mask(original_varbin.len(), &mut ctx) + .unwrap(); + let orig_values = (0..original_varbin.len()) + .map(|i| { + orig_mask + .value(i) + .then(|| original_varbin.bytes_at(i).to_vec()) + }) + .collect::>(); + let canon_mask = canonical_varbin + .validity() + .unwrap() + .execute_mask(canonical_varbin.len(), &mut ctx) + .unwrap(); + let canon_values = (0..canonical_varbin.len()) + .map(|i| { + canon_mask + .value(i) + .then(|| canonical_varbin.bytes_at(i).to_vec()) + }) + .collect::>(); assert_eq!(orig_values, canon_values); } diff --git a/vortex-array/src/arrays/dict/compute/mod.rs b/vortex-array/src/arrays/dict/compute/mod.rs index defa44a0976..e2b0d07b97d 100644 --- a/vortex-array/src/arrays/dict/compute/mod.rs +++ b/vortex-array/src/arrays/dict/compute/mod.rs @@ -57,12 +57,12 @@ mod test { use std::sync::LazyLock; use vortex_buffer::buffer; + use vortex_error::VortexResult; use vortex_session::VortexSession; use crate::ArrayRef; use crate::IntoArray; use crate::VortexSessionExecute; - use crate::accessor::ArrayAccessor; use crate::arrays::ConstantArray; use crate::arrays::PrimitiveArray; use crate::arrays::VarBinArray; @@ -148,29 +148,37 @@ mod test { } #[test] - fn canonicalise_nullable_varbin() { + fn canonicalise_nullable_varbin() -> VortexResult<()> { let reference = VarBinViewArray::from_iter( vec![Some("a"), Some("b"), None, Some("a"), None, Some("b")], DType::Utf8(Nullability::Nullable), ); assert_eq!(reference.len(), 6); - let dict = dict_encode( - &reference.clone().into_array(), - &mut SESSION.create_execution_ctx(), - ) - .unwrap(); - let flattened_dict = dict - .into_array() - .execute::(&mut SESSION.create_execution_ctx()) - .unwrap(); - assert_eq!( - flattened_dict.with_iterator(|iter| iter - .map(|slice| slice.map(|s| s.to_vec())) - .collect::>()), - reference.with_iterator(|iter| iter - .map(|slice| slice.map(|s| s.to_vec())) - .collect::>()), - ); + let mut ctx = SESSION.create_execution_ctx(); + let dict = dict_encode(&reference.clone().into_array(), &mut ctx)?; + let flattened_dict = dict.into_array().execute::(&mut ctx)?; + let flattened_mask = flattened_dict + .validity()? + .execute_mask(flattened_dict.len(), &mut ctx)?; + let flattened_values = (0..flattened_dict.len()) + .map(|i| { + flattened_mask + .value(i) + .then(|| flattened_dict.bytes_at(i).to_vec()) + }) + .collect::>(); + let reference_mask = reference + .validity()? + .execute_mask(reference.len(), &mut ctx)?; + let reference_values = (0..reference.len()) + .map(|i| { + reference_mask + .value(i) + .then(|| reference.bytes_at(i).to_vec()) + }) + .collect::>(); + assert_eq!(flattened_values, reference_values); + Ok(()) } fn sliced_dict_array() -> ArrayRef { diff --git a/vortex-array/src/arrays/primitive/array/accessor.rs b/vortex-array/src/arrays/primitive/array/accessor.rs deleted file mode 100644 index f26b201ea2b..00000000000 --- a/vortex-array/src/arrays/primitive/array/accessor.rs +++ /dev/null @@ -1,41 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::iter; - -use vortex_error::VortexExpect; - -#[expect(deprecated)] -use crate::ToCanonical as _; -use crate::accessor::ArrayAccessor; -use crate::arrays::PrimitiveArray; -use crate::dtype::NativePType; -use crate::validity::Validity; - -impl ArrayAccessor for PrimitiveArray { - fn with_iterator(&self, f: F) -> R - where - F: for<'a> FnOnce(&mut dyn Iterator>) -> R, - { - match self - .validity() - .vortex_expect("primitive validity should be derivable") - { - Validity::NonNullable | Validity::AllValid => { - let mut iter = self.as_slice::().iter().map(Some); - f(&mut iter) - } - Validity::AllInvalid => f(&mut iter::repeat_n(None, self.len())), - Validity::Array(v) => { - #[expect(deprecated)] - let validity = v.to_bool().into_bit_buffer(); - let mut iter = self - .as_slice::() - .iter() - .zip(validity.iter()) - .map(|(value, valid)| valid.then_some(value)); - f(&mut iter) - } - } - } -} diff --git a/vortex-array/src/arrays/primitive/array/mod.rs b/vortex-array/src/arrays/primitive/array/mod.rs index 65ef3ca0490..588cbb555fd 100644 --- a/vortex-array/src/arrays/primitive/array/mod.rs +++ b/vortex-array/src/arrays/primitive/array/mod.rs @@ -3,7 +3,7 @@ use std::fmt::Display; use std::fmt::Formatter; -use std::iter; +use std::iter::repeat; use smallvec::smallvec; use vortex_buffer::Alignment; @@ -18,11 +18,10 @@ use vortex_error::vortex_panic; use crate::ArraySlots; use crate::ExecutionCtx; -#[expect(deprecated)] -use crate::ToCanonical as _; use crate::array::Array; use crate::array::ArrayParts; use crate::array::TypedArrayRef; +use crate::arrays::BoolArray; use crate::arrays::Primitive; use crate::arrays::PrimitiveArray; use crate::dtype::DType; @@ -32,7 +31,6 @@ use crate::dtype::PType; use crate::match_each_native_ptype; use crate::validity::Validity; -mod accessor; mod cast; mod conversion; mod patch; @@ -447,12 +445,18 @@ impl Array { ptype: PType, validity: Validity, n_rows: usize, + ctx: &mut ExecutionCtx, ) -> Self { let dtype = DType::Primitive(ptype, validity.nullability()); let len = n_rows; let slots = PrimitiveData::make_slots(&validity, len); - let data = - PrimitiveData::from_values_byte_buffer(valid_elems_buffer, ptype, validity, n_rows); + let data = PrimitiveData::from_values_byte_buffer( + valid_elems_buffer, + ptype, + validity, + n_rows, + ctx, + ); unsafe { Array::from_parts_unchecked( ArrayParts::new(Primitive, dtype, len, data).with_slots(slots), @@ -476,7 +480,7 @@ impl Array { } } - pub fn map_each_with_validity(self, f: F) -> VortexResult + pub fn map_each_with_validity(self, f: F, ctx: &mut ExecutionCtx) -> VortexResult where T: NativePType, R: NativePType, @@ -488,14 +492,13 @@ impl Array { let buffer = match &validity { Validity::NonNullable | Validity::AllValid => { - Buffer::::from_trusted_len_iter(buf_iter.zip(iter::repeat(true)).map(f)) + Buffer::::from_trusted_len_iter(buf_iter.zip(repeat(true)).map(f)) } Validity::AllInvalid => { - Buffer::::from_trusted_len_iter(buf_iter.zip(iter::repeat(false)).map(f)) + Buffer::::from_trusted_len_iter(buf_iter.zip(repeat(false)).map(f)) } Validity::Array(val) => { - #[expect(deprecated)] - let val = val.to_bool().into_bit_buffer(); + let val = val.clone().execute::(ctx)?.into_bit_buffer(); Buffer::::from_trusted_len_iter(buf_iter.zip(val.iter()).map(f)) } }; @@ -541,6 +544,7 @@ impl PrimitiveData { ptype: PType, validity: Validity, n_rows: usize, + ctx: &mut ExecutionCtx, ) -> Self { let byte_width = ptype.byte_width(); let alignment = Alignment::new(byte_width); @@ -548,8 +552,10 @@ impl PrimitiveData { Validity::AllValid | Validity::NonNullable => valid_elems_buffer.aligned(alignment), Validity::AllInvalid => ByteBuffer::zeroed_aligned(n_rows * byte_width, alignment), Validity::Array(is_valid) => { - #[expect(deprecated)] - let bool_array = is_valid.to_bool(); + let bool_array = is_valid + .clone() + .execute::(ctx) + .vortex_expect("must be a bool array"); let bool_buffer = bool_array.bit_buffer_view(); let mut bytes = ByteBufferMut::zeroed_aligned(n_rows * byte_width, alignment); for (i, valid_i) in bool_buffer.set_indices().enumerate() { diff --git a/vortex-array/src/arrays/primitive/mod.rs b/vortex-array/src/arrays/primitive/mod.rs index e291ff521fb..f1566f8174f 100644 --- a/vortex-array/src/arrays/primitive/mod.rs +++ b/vortex-array/src/arrays/primitive/mod.rs @@ -15,12 +15,13 @@ mod vtable; pub use compute::rules::PrimitiveMaskedValidityRule; pub use vtable::Primitive; -pub(crate) fn initialize(session: &vortex_session::VortexSession) { +pub(crate) fn initialize(session: &VortexSession) { vtable::initialize(session); } mod native_value; pub use native_value::NativeValue; +use vortex_session::VortexSession; #[cfg(test)] mod tests; diff --git a/vortex-array/src/arrays/varbin/accessor.rs b/vortex-array/src/arrays/varbin/accessor.rs deleted file mode 100644 index ee68f94bd55..00000000000 --- a/vortex-array/src/arrays/varbin/accessor.rs +++ /dev/null @@ -1,63 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::iter; - -use vortex_error::VortexExpect; - -#[expect(deprecated)] -use crate::ToCanonical as _; -use crate::accessor::ArrayAccessor; -use crate::arrays::VarBinArray; -use crate::arrays::varbin::VarBinArrayExt; -use crate::match_each_integer_ptype; -use crate::validity::Validity; - -impl ArrayAccessor<[u8]> for VarBinArray { - fn with_iterator(&self, f: F) -> R - where - F: for<'a> FnOnce(&mut dyn Iterator>) -> R, - { - #[expect(deprecated)] - let offsets = self.offsets().to_primitive(); - let validity = self - .validity() - .vortex_expect("varbin validity should be derivable"); - - let bytes = self.bytes(); - let bytes = bytes.as_slice(); - - match_each_integer_ptype!(offsets.ptype(), |T| { - let offsets = offsets.as_slice::(); - - #[allow(clippy::cast_possible_truncation)] - match validity { - Validity::NonNullable | Validity::AllValid => { - let mut iter = offsets - .windows(2) - .map(|w| Some(&bytes[w[0] as usize..w[1] as usize])); - f(&mut iter) - } - Validity::AllInvalid => f(&mut iter::repeat_n(None, self.len())), - Validity::Array(v) => { - #[expect(deprecated)] - let validity = v.to_bool().into_bit_buffer(); - let mut iter = offsets - .windows(2) - .zip(validity.iter()) - .map(|(w, valid)| valid.then(|| &bytes[w[0] as usize..w[1] as usize])); - f(&mut iter) - } - } - }) - } -} - -impl ArrayAccessor<[u8]> for &VarBinArray { - fn with_iterator(&self, f: F) -> R - where - F: for<'a> FnOnce(&mut dyn Iterator>) -> R, - { - >::with_iterator(*self, f) - } -} diff --git a/vortex-array/src/arrays/varbin/mod.rs b/vortex-array/src/arrays/varbin/mod.rs index f0ccbe4585c..4bca67fad81 100644 --- a/vortex-array/src/arrays/varbin/mod.rs +++ b/vortex-array/src/arrays/varbin/mod.rs @@ -18,8 +18,6 @@ pub(crate) fn initialize(session: &vortex_session::VortexSession) { pub mod builder; -mod accessor; - use vortex_buffer::ByteBuffer; use vortex_error::VortexExpect; use vortex_error::vortex_err; diff --git a/vortex-array/src/arrays/varbinview/accessor.rs b/vortex-array/src/arrays/varbinview/accessor.rs deleted file mode 100644 index cd42c3f2b58..00000000000 --- a/vortex-array/src/arrays/varbinview/accessor.rs +++ /dev/null @@ -1,72 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::iter; - -use vortex_error::VortexExpect; - -#[expect(deprecated)] -use crate::ToCanonical as _; -use crate::accessor::ArrayAccessor; -use crate::arrays::VarBinViewArray; -use crate::validity::Validity; - -impl ArrayAccessor<[u8]> for VarBinViewArray { - fn with_iterator FnOnce(&mut dyn Iterator>) -> R, R>( - &self, - f: F, - ) -> R { - let bytes = (0..self.data_buffers().len()) - .map(|i| self.buffer(i)) - .collect::>(); - - let views = self.views(); - - match self - .validity() - .vortex_expect("varbinview validity should be derivable") - { - Validity::NonNullable | Validity::AllValid => { - let mut iter = views.iter().map(|view| { - if view.is_inlined() { - Some(view.as_inlined().value()) - } else { - Some( - &bytes[view.as_view().buffer_index as usize][view.as_view().as_range()], - ) - } - }); - f(&mut iter) - } - Validity::AllInvalid => f(&mut iter::repeat_n(None, views.len())), - Validity::Array(v) => { - #[expect(deprecated)] - let validity = v.to_bool().into_bit_buffer(); - let mut iter = views.iter().zip(validity.iter()).map(|(view, valid)| { - if valid { - if view.is_inlined() { - Some(view.as_inlined().value()) - } else { - Some( - &bytes[view.as_view().buffer_index as usize] - [view.as_view().as_range()], - ) - } - } else { - None - } - }); - f(&mut iter) - } - } - } -} - -impl ArrayAccessor<[u8]> for &VarBinViewArray { - fn with_iterator(&self, f: F) -> R - where - F: for<'a> FnOnce(&mut dyn Iterator>) -> R, - { - >::with_iterator(*self, f) - } -} diff --git a/vortex-array/src/arrays/varbinview/compute/mod.rs b/vortex-array/src/arrays/varbinview/compute/mod.rs index 7a0d0b2d67f..0358ad01460 100644 --- a/vortex-array/src/arrays/varbinview/compute/mod.rs +++ b/vortex-array/src/arrays/varbinview/compute/mod.rs @@ -11,14 +11,14 @@ mod zip; #[cfg(test)] mod tests { use vortex_buffer::buffer; + use vortex_error::VortexResult; use crate::IntoArray; - use crate::accessor::ArrayAccessor; use crate::arrays::VarBinViewArray; #[expect(deprecated)] use crate::canonical::ToCanonical as _; #[test] - fn take_nullable() { + fn take_nullable() -> VortexResult<()> { let arr = VarBinViewArray::from_iter_nullable_str([ Some("one"), None, @@ -28,15 +28,21 @@ mod tests { Some("six"), ]); - let taken = arr.take(buffer![0, 3].into_array()).unwrap(); + let taken = arr.take(buffer![0, 3].into_array())?; assert!(taken.dtype().is_nullable()); + let mut ctx = array_session().create_execution_ctx(); #[expect(deprecated)] - let result = taken.to_varbinview().with_iterator(|it| { - it.map(|v| v.map(|b| unsafe { String::from_utf8_unchecked(b.to_vec()) })) - .collect::>() - }); + let taken = taken.to_varbinview(); + let mask = taken.validity()?.execute_mask(taken.len(), &mut ctx)?; + let result = (0..taken.len()) + .map(|i| { + mask.value(i) + .then(|| unsafe { String::from_utf8_unchecked(taken.bytes_at(i).to_vec()) }) + }) + .collect::>(); assert_eq!(result, [Some("one".to_string()), Some("four".to_string())]); + Ok(()) } // Consistency tests use rstest::rstest; diff --git a/vortex-array/src/arrays/varbinview/compute/take.rs b/vortex-array/src/arrays/varbinview/compute/take.rs index 5c5ff08e2d0..1b7435729f1 100644 --- a/vortex-array/src/arrays/varbinview/compute/take.rs +++ b/vortex-array/src/arrays/varbinview/compute/take.rs @@ -90,20 +90,20 @@ mod tests { use rstest::rstest; use vortex_buffer::BitBuffer; use vortex_buffer::buffer; + use vortex_error::VortexResult; use crate::IntoArray; - use crate::accessor::ArrayAccessor; + use crate::VortexSessionExecute; + use crate::array_session; use crate::arrays::VarBinViewArray; use crate::arrays::varbinview::compute::take::PrimitiveArray; - #[expect(deprecated)] - use crate::canonical::ToCanonical as _; use crate::compute::conformance::take::test_take_conformance; use crate::dtype::DType; use crate::dtype::Nullability::NonNullable; use crate::validity::Validity; #[test] - fn take_nullable() { + fn take_nullable() -> VortexResult<()> { let arr = VarBinViewArray::from_iter_nullable_str([ Some("one"), None, @@ -113,19 +113,24 @@ mod tests { Some("six"), ]); - let taken = arr.take(buffer![0, 3].into_array()).unwrap(); + let taken = arr.take(buffer![0, 3].into_array())?; assert!(taken.dtype().is_nullable()); - #[expect(deprecated)] - let result = taken.to_varbinview().with_iterator(|it| { - it.map(|v| v.map(|b| unsafe { String::from_utf8_unchecked(b.to_vec()) })) - .collect::>() - }); + let mut ctx = array_session().create_execution_ctx(); + let taken = taken.execute::(&mut ctx)?; + let mask = taken.validity()?.execute_mask(taken.len(), &mut ctx)?; + let result = (0..taken.len()) + .map(|i| { + mask.value(i) + .then(|| unsafe { String::from_utf8_unchecked(taken.bytes_at(i).to_vec()) }) + }) + .collect::>(); assert_eq!(result, [Some("one".to_string()), Some("four".to_string())]); + Ok(()) } #[test] - fn take_nullable_indices() { + fn take_nullable_indices() -> VortexResult<()> { let arr = VarBinViewArray::from_iter(["one", "two"].map(Some), DType::Utf8(NonNullable)); let indices = PrimitiveArray::new( @@ -134,15 +139,20 @@ mod tests { Validity::from(BitBuffer::from(vec![true, false])), ); - let taken = arr.take(indices.into_array()).unwrap(); + let taken = arr.take(indices.into_array())?; assert!(taken.dtype().is_nullable()); - #[expect(deprecated)] - let result = taken.to_varbinview().with_iterator(|it| { - it.map(|v| v.map(|b| unsafe { String::from_utf8_unchecked(b.to_vec()) })) - .collect::>() - }); + let mut ctx = array_session().create_execution_ctx(); + let taken = taken.execute::(&mut ctx)?; + let mask = taken.validity()?.execute_mask(taken.len(), &mut ctx)?; + let result = (0..taken.len()) + .map(|i| { + mask.value(i) + .then(|| unsafe { String::from_utf8_unchecked(taken.bytes_at(i).to_vec()) }) + }) + .collect::>(); assert_eq!(result, [Some("two".to_string()), None]); + Ok(()) } #[rstest] diff --git a/vortex-array/src/arrays/varbinview/compute/zip.rs b/vortex-array/src/arrays/varbinview/compute/zip.rs index 9c198fe6f2c..75e30808821 100644 --- a/vortex-array/src/arrays/varbinview/compute/zip.rs +++ b/vortex-array/src/arrays/varbinview/compute/zip.rs @@ -209,10 +209,12 @@ fn push_view( #[cfg(test)] mod tests { + use vortex_error::VortexResult; use vortex_mask::Mask; use crate::IntoArray; - use crate::accessor::ArrayAccessor; + use crate::VortexSessionExecute; + use crate::array_session; use crate::arrays::VarBinViewArray; use crate::builtins::ArrayBuiltins; #[expect(deprecated)] @@ -221,7 +223,7 @@ mod tests { use crate::dtype::Nullability; #[test] - fn zip_varbinview_kernel_zips() { + fn zip_varbinview_kernel_zips() -> VortexResult<()> { let a = VarBinViewArray::from_iter( [ Some("aaaaaaaaaaaaa_long"), // outlined @@ -252,14 +254,18 @@ mod tests { let zipped = mask .clone() .into_array() - .zip(a.into_array(), b.into_array()) - .unwrap() + .zip(a.into_array(), b.into_array())? .to_varbinview(); - let values = zipped.with_iterator(|it| { - it.map(|v| v.map(|bytes| String::from_utf8(bytes.to_vec()).unwrap())) - .collect::>() - }); + let mut ctx = array_session().create_execution_ctx(); + let validity_mask = zipped.validity()?.execute_mask(zipped.len(), &mut ctx)?; + let values = (0..zipped.len()) + .map(|i| { + validity_mask + .value(i) + .then(|| String::from_utf8(zipped.bytes_at(i).to_vec()).unwrap()) + }) + .collect::>(); assert_eq!( values, @@ -274,5 +280,6 @@ mod tests { ); assert_eq!(zipped.len(), mask.len()); assert_eq!(zipped.dtype(), &DType::Utf8(Nullability::Nullable)); + Ok(()) } } diff --git a/vortex-array/src/arrays/varbinview/mod.rs b/vortex-array/src/arrays/varbinview/mod.rs index 6f3398884b3..a6b8c9db46c 100644 --- a/vortex-array/src/arrays/varbinview/mod.rs +++ b/vortex-array/src/arrays/varbinview/mod.rs @@ -7,7 +7,6 @@ pub use array::VarBinViewData; pub use array::VarBinViewDataParts; pub use vtable::VarBinViewArray; -mod accessor; pub(crate) mod compact; pub(crate) mod compute; diff --git a/vortex-array/src/builders/dict/bytes.rs b/vortex-array/src/builders/dict/bytes.rs index 00f14388aa2..33b2e5225f7 100644 --- a/vortex-array/src/builders/dict/bytes.rs +++ b/vortex-array/src/builders/dict/bytes.rs @@ -310,17 +310,16 @@ impl DictEncoder for BytesDictBuilder { #[cfg(test)] mod test { - use std::str; use std::sync::Arc; use std::sync::LazyLock; use vortex_buffer::Buffer; use vortex_buffer::ByteBuffer; + use vortex_error::VortexResult; use vortex_session::VortexSession; use crate::IntoArray; use crate::VortexSessionExecute; - use crate::accessor::ArrayAccessor; use crate::arrays::PrimitiveArray; use crate::arrays::VarBinArray; use crate::arrays::VarBinViewArray; @@ -335,32 +334,24 @@ mod test { static SESSION: LazyLock = LazyLock::new(crate::array_session); #[test] - fn encode_varbin() { + fn encode_varbin() -> VortexResult<()> { let arr = VarBinViewArray::from_iter_str(vec!["hello", "world", "hello", "again", "world"]); - let dict = dict_encode(&arr.into_array(), &mut SESSION.create_execution_ctx()).unwrap(); - let codes = dict - .codes() - .clone() - .execute::(&mut SESSION.create_execution_ctx()) - .unwrap(); + let mut ctx = SESSION.create_execution_ctx(); + let dict = dict_encode(&arr.into_array(), &mut ctx)?; + let codes = dict.codes().clone().execute::(&mut ctx)?; assert_eq!(codes.as_slice::(), &[0, 1, 0, 2, 1]); - let values = dict - .values() - .clone() - .execute::(&mut SESSION.create_execution_ctx()) - .unwrap(); - values.with_iterator(|iter| { - assert_eq!( - iter.flatten() - .map(|b| unsafe { str::from_utf8_unchecked(b) }) - .collect::>(), - vec!["hello", "world", "again"] - ); - }); + let values = dict.values().clone().execute::(&mut ctx)?; + let mask = values.validity()?.execute_mask(values.len(), &mut ctx)?; + let decoded = (0..values.len()) + .filter(|&i| mask.value(i)) + .map(|i| unsafe { String::from_utf8_unchecked(values.bytes_at(i).to_vec()) }) + .collect::>(); + assert_eq!(decoded, vec!["hello", "world", "again"]); + Ok(()) } #[test] - fn encode_varbin_nulls() { + fn encode_varbin_nulls() -> VortexResult<()> { let arr: VarBinViewArray = vec![ Some("hello"), None, @@ -373,25 +364,28 @@ mod test { ] .into_iter() .collect(); - let dict = dict_encode(&arr.into_array(), &mut SESSION.create_execution_ctx()).unwrap(); - let codes = dict - .codes() - .clone() - .execute::(&mut SESSION.create_execution_ctx()) - .unwrap(); + let mut ctx = SESSION.create_execution_ctx(); + let dict = dict_encode(&arr.into_array(), &mut ctx)?; + let codes = dict.codes().clone().execute::(&mut ctx)?; assert_eq!(codes.as_slice::(), &[0, 1, 2, 0, 1, 3, 2, 1]); - let values = dict - .values() - .clone() - .execute::(&mut SESSION.create_execution_ctx()) - .unwrap(); - values.with_iterator(|iter| { - assert_eq!( - iter.map(|b| b.map(|v| unsafe { str::from_utf8_unchecked(v) })) - .collect::>(), - vec![Some("hello"), None, Some("world"), Some("again")] - ); - }); + let values = dict.values().clone().execute::(&mut ctx)?; + let mask = values.validity()?.execute_mask(values.len(), &mut ctx)?; + let decoded = (0..values.len()) + .map(|i| { + mask.value(i) + .then(|| unsafe { String::from_utf8_unchecked(values.bytes_at(i).to_vec()) }) + }) + .collect::>(); + assert_eq!( + decoded, + vec![ + Some("hello".to_string()), + None, + Some("world".to_string()), + Some("again".to_string()) + ] + ); + Ok(()) } #[test] @@ -421,27 +415,19 @@ mod test { } #[test] - fn repeated_values() { + fn repeated_values() -> VortexResult<()> { let arr = VarBinArray::from(vec!["a", "a", "b", "b", "a", "b", "a", "b"]); - let dict = dict_encode(&arr.into_array(), &mut SESSION.create_execution_ctx()).unwrap(); - let values = dict - .values() - .clone() - .execute::(&mut SESSION.create_execution_ctx()) - .unwrap(); - values.with_iterator(|iter| { - assert_eq!( - iter.flatten() - .map(|b| unsafe { str::from_utf8_unchecked(b) }) - .collect::>(), - vec!["a", "b"] - ); - }); - let codes = dict - .codes() - .clone() - .execute::(&mut SESSION.create_execution_ctx()) - .unwrap(); + let mut ctx = SESSION.create_execution_ctx(); + let dict = dict_encode(&arr.into_array(), &mut ctx)?; + let values = dict.values().clone().execute::(&mut ctx)?; + let mask = values.validity()?.execute_mask(values.len(), &mut ctx)?; + let decoded = (0..values.len()) + .filter(|&i| mask.value(i)) + .map(|i| unsafe { String::from_utf8_unchecked(values.bytes_at(i).to_vec()) }) + .collect::>(); + assert_eq!(decoded, vec!["a", "b"]); + let codes = dict.codes().clone().execute::(&mut ctx)?; assert_eq!(codes.as_slice::(), &[0, 0, 1, 1, 0, 1, 0, 1]); + Ok(()) } } diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index df5ca44c555..d5ac9c6d227 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -103,7 +103,6 @@ use crate::scalar_fn::session::ScalarFnSession; use crate::session::ArraySession; use crate::stats::session::StatsSession; -pub mod accessor; pub mod aggregate_fn; #[doc(hidden)] pub mod aliases; diff --git a/vortex-btrblocks/src/schemes/integer/for_.rs b/vortex-btrblocks/src/schemes/integer/for_.rs index 86accdb5e71..ec23f1114db 100644 --- a/vortex-btrblocks/src/schemes/integer/for_.rs +++ b/vortex-btrblocks/src/schemes/integer/for_.rs @@ -123,7 +123,7 @@ impl Scheme for FoRScheme { exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let primitive = data.array().clone().execute::(exec_ctx)?; - let for_array = FoR::encode(primitive)?; + let for_array = FoR::encode(primitive, exec_ctx)?; let biased = for_array .encoded() .clone() diff --git a/vortex-btrblocks/src/schemes/string/onpair.rs b/vortex-btrblocks/src/schemes/string/onpair.rs index 123601f7bb1..7bbe480d059 100644 --- a/vortex-btrblocks/src/schemes/string/onpair.rs +++ b/vortex-btrblocks/src/schemes/string/onpair.rs @@ -73,7 +73,7 @@ impl Scheme for OnPairScheme { exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let utf8 = data.array_as_varbinview().into_owned(); - let onpair_array = onpair_compress(&utf8, utf8.len(), utf8.dtype(), DEFAULT_DICT12_CONFIG)?; + let onpair_array = onpair_compress(utf8.as_array(), DEFAULT_DICT12_CONFIG, exec_ctx)?; let dict_offsets = compress_offsets_child( compressor, diff --git a/vortex-btrblocks/tests/onpair_roundtrip.rs b/vortex-btrblocks/tests/onpair_roundtrip.rs index 31fb700a2d6..272fa930bd0 100644 --- a/vortex-btrblocks/tests/onpair_roundtrip.rs +++ b/vortex-btrblocks/tests/onpair_roundtrip.rs @@ -17,7 +17,6 @@ use std::sync::LazyLock; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; -use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::VarBinViewArray; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; @@ -72,19 +71,21 @@ fn nonnullable_roundtrip_via_default_compressor() { .execute::(&mut SESSION.create_execution_ctx()) .expect("decompress"); assert_eq!(decoded.len(), n); - decoded - .with_iterator(|iter| { - for (i, got) in iter.enumerate() { - assert_eq!( - got, - Some(strings[i].as_bytes()), - "mismatch at row {i}: got {:?}", - got.map(|b| String::from_utf8_lossy(b).into_owned()), - ); - } - Ok::<_, vortex_error::VortexError>(()) - }) + let mask = decoded + .validity() + .unwrap() + .execute_mask(decoded.len(), &mut SESSION.create_execution_ctx()) .unwrap(); + for i in 0..decoded.len() { + let got = mask.value(i).then(|| decoded.bytes_at(i)); + assert_eq!( + got.as_deref(), + Some(strings[i].as_bytes()), + "mismatch at row {i}: got {:?}", + got.as_deref() + .map(|b| String::from_utf8_lossy(b).into_owned()), + ); + } } #[test] @@ -112,15 +113,16 @@ fn nullable_roundtrip_via_default_compressor() { .execute::(&mut SESSION.create_execution_ctx()) .expect("decompress"); assert_eq!(decoded.len(), n); - decoded - .with_iterator(|iter| { - for (i, got) in iter.enumerate() { - let want = strings[i].as_deref().map(str::as_bytes); - assert_eq!(got, want, "mismatch at row {i}"); - } - Ok::<_, vortex_error::VortexError>(()) - }) + let mask = decoded + .validity() + .unwrap() + .execute_mask(decoded.len(), &mut SESSION.create_execution_ctx()) .unwrap(); + for i in 0..decoded.len() { + let got = mask.value(i).then(|| decoded.bytes_at(i)); + let want = strings[i].as_deref().map(str::as_bytes); + assert_eq!(got.as_deref(), want, "mismatch at row {i}"); + } } /// Larger corpus that exercises the offsets-narrowing / delta-encoding paths @@ -145,14 +147,15 @@ fn large_unique_short_strings_roundtrip() { .execute::(&mut SESSION.create_execution_ctx()) .expect("decompress"); assert_eq!(decoded.len(), n); - decoded - .with_iterator(|iter| { - for (i, got) in iter.enumerate() { - assert_eq!(got, Some(strings[i].as_bytes()), "row {i}"); - } - Ok::<_, vortex_error::VortexError>(()) - }) + let mask = decoded + .validity() + .unwrap() + .execute_mask(decoded.len(), &mut SESSION.create_execution_ctx()) .unwrap(); + for i in 0..decoded.len() { + let got = mask.value(i).then(|| decoded.bytes_at(i)); + assert_eq!(got.as_deref(), Some(strings[i].as_bytes()), "row {i}"); + } } #[test] @@ -171,15 +174,17 @@ fn empty_and_short_string_roundtrip() { let decoded = compressed .execute::(&mut SESSION.create_execution_ctx()) .expect("decompress"); - decoded - .with_iterator(|iter| { - let got: Vec<_> = iter.collect(); - for (i, want) in strings.iter().enumerate() { - assert_eq!(got[i], Some(want.as_bytes()), "row {i}"); - } - Ok::<_, vortex_error::VortexError>(()) - }) + let mask = decoded + .validity() + .unwrap() + .execute_mask(decoded.len(), &mut SESSION.create_execution_ctx()) .unwrap(); + let got: Vec>> = (0..decoded.len()) + .map(|i| mask.value(i).then(|| decoded.bytes_at(i).to_vec())) + .collect(); + for (i, want) in strings.iter().enumerate() { + assert_eq!(got[i].as_deref(), Some(want.as_bytes()), "row {i}"); + } } /// Regression for the Euro2016 compress-bench panic @@ -215,12 +220,13 @@ fn delta_dict_offsets_roundtrip() { .execute::(&mut SESSION.create_execution_ctx()) .expect("decompress"); assert_eq!(decoded.len(), n); - decoded - .with_iterator(|iter| { - for (i, got) in iter.enumerate() { - assert_eq!(got, Some(strings[i].as_bytes()), "row {i}"); - } - Ok::<_, vortex_error::VortexError>(()) - }) + let mask = decoded + .validity() + .unwrap() + .execute_mask(decoded.len(), &mut SESSION.create_execution_ctx()) .unwrap(); + for i in 0..decoded.len() { + let got = mask.value(i).then(|| decoded.bytes_at(i)); + assert_eq!(got.as_deref(), Some(strings[i].as_bytes()), "row {i}"); + } } diff --git a/vortex-cuda/benches/dynamic_dispatch_cuda.rs b/vortex-cuda/benches/dynamic_dispatch_cuda.rs index 9945b016190..fb02bc1f3b2 100644 --- a/vortex-cuda/benches/dynamic_dispatch_cuda.rs +++ b/vortex-cuda/benches/dynamic_dispatch_cuda.rs @@ -359,6 +359,7 @@ fn bench_dict_bp_codes_alp_for_bp_values_dynanmic_dispatch(c: &mut Criterion) { .clone() .execute::(&mut ctx) .vortex_expect("to primitive"), + &mut ctx, ) .vortex_expect("for encode"); let bp = BitPackedData::encode(for_arr.encoded(), values_bit_width, &mut ctx) @@ -673,7 +674,7 @@ fn bench_dict_bp_codes_alp_for_bp_values_composed_standalone(c: &mut Criterion) .clone() .execute::(&mut ctx) .vortex_expect("to primitive"); - let for_arr = FoRData::encode(alp_encoded.clone()).vortex_expect("for encode"); + let for_arr = FoRData::encode(alp_encoded.clone(), &mut ctx).vortex_expect("for encode"); let bp = BitPackedData::encode(for_arr.encoded(), values_bit_width, &mut ctx) .vortex_expect("bitpack values"); let values_bp = bp; @@ -759,6 +760,7 @@ fn bench_alp_for_bitpacked_f64(c: &mut Criterion) { .clone() .execute::(&mut ctx) .vortex_expect("to primitive"), + &mut ctx, ) .vortex_expect("for encode"); let bp = BitPackedData::encode(for_arr.encoded(), bit_width, &mut ctx) @@ -881,6 +883,7 @@ fn bench_alp_for_bitpacked(c: &mut Criterion) { .clone() .execute::(&mut ctx) .vortex_expect("to primitive"), + &mut ctx, ) .vortex_expect("for encode"); let bp = BitPackedData::encode(for_arr.encoded(), bit_width, &mut ctx) diff --git a/vortex-cuda/src/dynamic_dispatch/mod.rs b/vortex-cuda/src/dynamic_dispatch/mod.rs index f4d374f7b00..329ea19c061 100644 --- a/vortex-cuda/src/dynamic_dispatch/mod.rs +++ b/vortex-cuda/src/dynamic_dispatch/mod.rs @@ -967,7 +967,10 @@ mod tests { let alp = alp_encode(float_prim.as_view(), Some(exponents), &mut ctx)?; assert!(alp.patches().is_none()); - let for_arr = FoR::encode(alp.encoded().clone().execute::(&mut ctx)?)?; + let for_arr = FoR::encode( + alp.encoded().clone().execute::(&mut ctx)?, + &mut ctx, + )?; let bp = BitPacked::encode(for_arr.encoded(), 6, &mut ctx)?; let tree = ALP::new( @@ -1897,7 +1900,10 @@ mod tests { let alp = alp_encode(float_prim.as_view(), Some(exponents), &mut ctx)?; assert!(alp.patches().is_none()); - let for_arr = FoR::encode(alp.encoded().clone().execute::(&mut ctx)?)?; + let for_arr = FoR::encode( + alp.encoded().clone().execute::(&mut ctx)?, + &mut ctx, + )?; let bp = BitPacked::encode(for_arr.encoded(), 6, &mut ctx)?; let tree = ALP::new( diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index 32ec3c75f8d..b196f68dc96 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -14,7 +14,6 @@ use rstest::rstest; use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; -use vortex_array::accessor::ArrayAccessor; use vortex_array::array_session; use vortex_array::arrays::ChunkedArray; use vortex_array::arrays::ConstantArray; @@ -1645,12 +1644,16 @@ async fn test_writer_with_complex_types() -> VortexResult<()> { .execute::(&mut ctx)? .unmasked_field_by_name("strings") .cloned()?; - let strings = strings_field - .execute::(&mut ctx)? - .with_iterator(|iter| { - iter.map(|s| s.map(|st| unsafe { String::from_utf8_unchecked(st.to_vec()) })) - .collect::>() - }); + let strings_view = strings_field.execute::(&mut ctx)?; + let mask = strings_view + .validity()? + .execute_mask(strings_view.len(), &mut ctx)?; + let strings = (0..strings_view.len()) + .map(|i| { + mask.value(i) + .then(|| unsafe { String::from_utf8_unchecked(strings_view.bytes_at(i).to_vec()) }) + }) + .collect::>(); assert_eq!( strings, vec![ diff --git a/vortex-row/src/codec.rs b/vortex-row/src/codec.rs index 4848a750e52..fa0959943db 100644 --- a/vortex-row/src/codec.rs +++ b/vortex-row/src/codec.rs @@ -703,7 +703,7 @@ fn encode_varbinview( // Cache the data-buffer slices once. Inlined views (len <= 12) carry their bytes inline, // so they never touch `buffers`; referenced views index into the pre-validated buffer at // `offset..offset + len`. Walking views directly avoids the per-row bounds and branch work - // of `with_iterator`. + // of a generic per-element iterator. let buffers: smallvec::SmallVec<[&[u8]; 4]> = (0..arr.data_buffers().len()) .map(|i| arr.buffer(i).as_slice()) .collect(); diff --git a/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/for_.rs b/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/for_.rs index d4884ad99f6..620b9253c8d 100644 --- a/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/for_.rs +++ b/vortex-test/compat-gen/src/fixtures/arrays/synthetic/encodings/for_.rs @@ -31,7 +31,7 @@ impl FlatLayoutFixture for FoRFixture { vec![FoR.id()] } - fn build(&self, _ctx: &mut ExecutionCtx) -> VortexResult { + fn build(&self, ctx: &mut ExecutionCtx) -> VortexResult { let clustered_i32: PrimitiveArray = (0..N as i32).map(|i| 1_000_000 + (i % 100)).collect(); let clustered_u64: PrimitiveArray = (0..N as u64).map(|i| 10_000_000_000 + (i % 256)).collect(); @@ -70,16 +70,16 @@ impl FlatLayoutFixture for FoRFixture { "near_max_u64", ]), vec![ - FoR::encode(clustered_i32)?.into_array(), - FoR::encode(clustered_u64)?.into_array(), - FoR::encode(clustered_i64)?.into_array(), - FoR::encode(negative_i32)?.into_array(), - FoR::encode(nullable_i32)?.into_array(), - FoR::encode(clustered_i16)?.into_array(), - FoR::encode(constant_offsets)?.into_array(), - FoR::encode(zero_crossing_i32)?.into_array(), - FoR::encode(far_outlier_i64)?.into_array(), - FoR::encode(near_max_u64)?.into_array(), + FoR::encode(clustered_i32, ctx)?.into_array(), + FoR::encode(clustered_u64, ctx)?.into_array(), + FoR::encode(clustered_i64, ctx)?.into_array(), + FoR::encode(negative_i32, ctx)?.into_array(), + FoR::encode(nullable_i32, ctx)?.into_array(), + FoR::encode(clustered_i16, ctx)?.into_array(), + FoR::encode(constant_offsets, ctx)?.into_array(), + FoR::encode(zero_crossing_i32, ctx)?.into_array(), + FoR::encode(far_outlier_i64, ctx)?.into_array(), + FoR::encode(near_max_u64, ctx)?.into_array(), ], N, Validity::NonNullable, diff --git a/vortex/benches/common_encoding_tree_throughput.rs b/vortex/benches/common_encoding_tree_throughput.rs index 4025c43fc46..1d1dcf86956 100644 --- a/vortex/benches/common_encoding_tree_throughput.rs +++ b/vortex/benches/common_encoding_tree_throughput.rs @@ -102,7 +102,7 @@ mod setup { pub fn for_bp_u64() -> ArrayRef { let mut ctx = SESSION.create_execution_ctx(); let (uint_array, ..) = setup_primitive_arrays(); - let compressed = FoR::encode(uint_array).unwrap(); + let compressed = FoR::encode(uint_array, &mut ctx).unwrap(); let inner = compressed.encoded(); let bp = BitPacked::encode(inner, 8, &mut ctx).unwrap(); FoR::try_new(bp.into_array(), compressed.reference_scalar().clone()) @@ -122,7 +122,7 @@ mod setup { .clone() .execute::(&mut ctx) .unwrap(); - let for_array = FoR::encode(alp_encoded_prim).unwrap(); + let for_array = FoR::encode(alp_encoded_prim, &mut ctx).unwrap(); let inner = for_array.encoded(); let bp = BitPacked::encode(inner, 8, &mut ctx).unwrap(); let for_with_bp = @@ -200,7 +200,7 @@ mod setup { .clone() .execute::(&mut ctx) .unwrap(); - let ends_for = FoR::encode(ends_prim).unwrap(); + let ends_for = FoR::encode(ends_prim, &mut ctx).unwrap(); let ends_inner = ends_for.encoded(); let ends_bp = BitPacked::encode(ends_inner, 8, &mut ctx).unwrap(); let compressed_ends = @@ -343,7 +343,7 @@ mod setup { .clone() .execute::(&mut ctx) .unwrap(); - let days_for = FoR::encode(days_prim).unwrap(); + let days_for = FoR::encode(days_prim, &mut ctx).unwrap(); let days_inner = days_for.encoded(); let days_bp = BitPacked::encode(days_inner, 16, &mut ctx).unwrap(); let compressed_days = @@ -357,7 +357,7 @@ mod setup { .clone() .execute::(&mut ctx) .unwrap(); - let seconds_for = FoR::encode(seconds_prim).unwrap(); + let seconds_for = FoR::encode(seconds_prim, &mut ctx).unwrap(); let seconds_inner = seconds_for.encoded(); let seconds_bp = BitPacked::encode(seconds_inner, 17, &mut ctx).unwrap(); let compressed_seconds = FoR::try_new( @@ -372,7 +372,7 @@ mod setup { .subseconds .execute::(&mut ctx) .unwrap(); - let subseconds_for = FoR::encode(subseconds_prim).unwrap(); + let subseconds_for = FoR::encode(subseconds_prim, &mut ctx).unwrap(); let subseconds_inner = subseconds_for.encoded(); let subseconds_bp = BitPacked::encode(subseconds_inner, 20, &mut ctx).unwrap(); let compressed_subseconds = FoR::try_new( diff --git a/vortex/benches/single_encoding_throughput.rs b/vortex/benches/single_encoding_throughput.rs index cc81b52d081..a20777af583 100644 --- a/vortex/benches/single_encoding_throughput.rs +++ b/vortex/benches/single_encoding_throughput.rs @@ -191,14 +191,14 @@ fn bench_for_compress_i32(bencher: Bencher) { let (_, int_array, _) = setup_primitive_arrays(); with_byte_counter(bencher, NUM_VALUES * 4) - .with_inputs(|| int_array.clone()) - .bench_values(|a| FoR::encode(a).unwrap()); + .with_inputs(|| (int_array.clone(), SESSION.create_execution_ctx())) + .bench_values(|(a, mut ctx)| FoR::encode(a, &mut ctx).unwrap()); } #[divan::bench(name = "for_decompress_i32")] fn bench_for_decompress_i32(bencher: Bencher) { let (_, int_array, _) = setup_primitive_arrays(); - let compressed = FoR::encode(int_array).unwrap(); + let compressed = FoR::encode(int_array, &mut SESSION.create_execution_ctx()).unwrap(); with_byte_counter(bencher, NUM_VALUES * 4) .with_inputs(|| (&compressed, SESSION.create_execution_ctx())) From 60015925caaa70072fa8bd40466f1b3cdc116b1b Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 26 Jun 2026 15:24:12 +0100 Subject: [PATCH 2/4] fixes Signed-off-by: Robert Kruszewski --- encodings/experimental/onpair/src/compress.rs | 7 ++-- .../src/aggregate_fn/fns/is_sorted/varbin.rs | 24 +++++++++++-- .../src/aggregate_fn/fns/min_max/varbin.rs | 34 +++++++++++++++---- 3 files changed, 54 insertions(+), 11 deletions(-) diff --git a/encodings/experimental/onpair/src/compress.rs b/encodings/experimental/onpair/src/compress.rs index ed0833995f6..adf9e73461c 100644 --- a/encodings/experimental/onpair/src/compress.rs +++ b/encodings/experimental/onpair/src/compress.rs @@ -18,6 +18,7 @@ use vortex_buffer::Buffer; use vortex_buffer::BufferMut; use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_mask::AllOr; @@ -69,7 +70,8 @@ where let bytes = view_bytes(view, &buffers); flat.extend_from_slice(bytes); offsets.push(O::from_usize(flat.len())); - uncompressed_lengths.push(view.len() as i32); + uncompressed_lengths + .push(i32::try_from(view.len()).vortex_expect("must fit in i32")); } } AllOr::None => { @@ -81,7 +83,8 @@ where let bytes = view_bytes(view, &buffers); flat.extend_from_slice(bytes); offsets.push(O::from_usize(flat.len())); - uncompressed_lengths.push(view.len() as i32); + uncompressed_lengths + .push(i32::try_from(view.len()).vortex_expect("must fit in i32")); } else { offsets.push(O::from_usize(flat.len())); uncompressed_lengths.push(0); diff --git a/vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs b/vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs index f0392f73946..500e1857bbe 100644 --- a/vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs +++ b/vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs @@ -12,8 +12,28 @@ pub(super) fn check_varbinview_sorted( strict: bool, ctx: &mut ExecutionCtx, ) -> VortexResult { - let mask = array.validity()?.execute_mask(array.len(), ctx)?; - let iter = (0..array.len()).map(|i| mask.value(i).then(|| array.bytes_at(i))); + let mask = array + .validity()? + .execute_mask(array.len(), ctx)? + .to_bit_buffer(); + // Walk the views directly, borrowing each value (inlined bytes or a slice of a data + // buffer) rather than materializing an owned `ByteBuffer` per element. + let views = array.views(); + let buffers = array + .data_buffers() + .iter() + .map(|b| b.as_host()) + .collect::>(); + let iter = views.iter().zip(mask.iter()).map(|(view, valid)| { + valid.then(|| { + if view.is_inlined() { + view.as_inlined().value() + } else { + let view_ref = view.as_view(); + &buffers[view_ref.buffer_index as usize][view_ref.as_range()] + } + }) + }); Ok(if strict { iter.is_strict_sorted() } else { diff --git a/vortex-array/src/aggregate_fn/fns/min_max/varbin.rs b/vortex-array/src/aggregate_fn/fns/min_max/varbin.rs index 9bf12d7f56a..c4ca29f4742 100644 --- a/vortex-array/src/aggregate_fn/fns/min_max/varbin.rs +++ b/vortex-array/src/aggregate_fn/fns/min_max/varbin.rs @@ -27,23 +27,43 @@ fn varbin_compute_min_max( dtype: &DType, ctx: &mut ExecutionCtx, ) -> VortexResult> { - let mask = array.validity()?.execute_mask(array.len(), ctx)?; - let minmax = (0..array.len()) - .filter(|&i| mask.value(i)) - .map(|i| array.bytes_at(i)) + let mask = array + .validity()? + .execute_mask(array.len(), ctx)? + .to_bit_buffer(); + // Walk the views directly, borrowing each value (inlined bytes or a slice of a data + // buffer) rather than materializing an owned `ByteBuffer` per element. + let views = array.views(); + let buffers = array + .data_buffers() + .iter() + .map(|b| b.as_host()) + .collect::>(); + let minmax = views + .iter() + .zip(mask.iter()) + .filter(|(_, v)| *v) + .map(|(view, _)| { + if view.is_inlined() { + view.as_inlined().value() + } else { + let view_ref = view.as_view(); + &buffers[view_ref.buffer_index as usize][view_ref.as_range()] + } + }) .minmax(); Ok(match minmax { itertools::MinMaxResult::NoElements => None, itertools::MinMaxResult::OneElement(value) => { - let scalar = make_scalar(dtype, value.as_slice()); + let scalar = make_scalar(dtype, value); Some(MinMaxResult { min: scalar.clone(), max: scalar, }) } itertools::MinMaxResult::MinMax(min, max) => Some(MinMaxResult { - min: make_scalar(dtype, min.as_slice()), - max: make_scalar(dtype, max.as_slice()), + min: make_scalar(dtype, min), + max: make_scalar(dtype, max), }), }) } From 1edb7f8125e3ac3798bb2fe0849a34d812fdaa85 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 26 Jun 2026 15:24:28 +0100 Subject: [PATCH 3/4] less Signed-off-by: Robert Kruszewski --- vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs | 2 -- vortex-array/src/aggregate_fn/fns/min_max/varbin.rs | 2 -- 2 files changed, 4 deletions(-) diff --git a/vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs b/vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs index 500e1857bbe..9e17e12e291 100644 --- a/vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs +++ b/vortex-array/src/aggregate_fn/fns/is_sorted/varbin.rs @@ -16,8 +16,6 @@ pub(super) fn check_varbinview_sorted( .validity()? .execute_mask(array.len(), ctx)? .to_bit_buffer(); - // Walk the views directly, borrowing each value (inlined bytes or a slice of a data - // buffer) rather than materializing an owned `ByteBuffer` per element. let views = array.views(); let buffers = array .data_buffers() diff --git a/vortex-array/src/aggregate_fn/fns/min_max/varbin.rs b/vortex-array/src/aggregate_fn/fns/min_max/varbin.rs index c4ca29f4742..5d4de12cd11 100644 --- a/vortex-array/src/aggregate_fn/fns/min_max/varbin.rs +++ b/vortex-array/src/aggregate_fn/fns/min_max/varbin.rs @@ -31,8 +31,6 @@ fn varbin_compute_min_max( .validity()? .execute_mask(array.len(), ctx)? .to_bit_buffer(); - // Walk the views directly, borrowing each value (inlined bytes or a slice of a data - // buffer) rather than materializing an owned `ByteBuffer` per element. let views = array.views(); let buffers = array .data_buffers() From ec211685a3bb4f7df279c5e3315ecea8ada1d1a7 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 26 Jun 2026 15:25:34 +0100 Subject: [PATCH 4/4] format Signed-off-by: Robert Kruszewski --- .../fastlanes/src/for/array/for_compress.rs | 17 +++++++---------- vortex-array/src/arrays/primitive/array/mod.rs | 2 +- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/encodings/fastlanes/src/for/array/for_compress.rs b/encodings/fastlanes/src/for/array/for_compress.rs index bf849ec241b..9c2d5bbe423 100644 --- a/encodings/fastlanes/src/for/array/for_compress.rs +++ b/encodings/fastlanes/src/for/array/for_compress.rs @@ -38,16 +38,13 @@ fn compress_primitive( ) -> VortexResult { // Set null values to the min value, ensuring that decompress into a value in the primitive // range (and stop them wrapping around). - let encoded = parray.map_each_with_validity::( - |(v, bool)| { - if bool { - v.wrapping_sub(&min) - } else { - T::zero() - } - }, - ctx, - )?; + let encoded = parray.map_each_with_validity::(ctx, |(v, bool)| { + if bool { + v.wrapping_sub(&min) + } else { + T::zero() + } + })?; Ok(encoded) } diff --git a/vortex-array/src/arrays/primitive/array/mod.rs b/vortex-array/src/arrays/primitive/array/mod.rs index 588cbb555fd..867c7d0c783 100644 --- a/vortex-array/src/arrays/primitive/array/mod.rs +++ b/vortex-array/src/arrays/primitive/array/mod.rs @@ -480,7 +480,7 @@ impl Array { } } - pub fn map_each_with_validity(self, f: F, ctx: &mut ExecutionCtx) -> VortexResult + pub fn map_each_with_validity(self, ctx: &mut ExecutionCtx, f: F) -> VortexResult where T: NativePType, R: NativePType,