From f226bfb8d0338129d979dcf81d2306ccc12395aa Mon Sep 17 00:00:00 2001 From: Daniel King Date: Tue, 14 Jan 2025 14:41:09 -0500 Subject: [PATCH 01/12] feat: teach ALPArray to store validity only in the encoded array MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The patches are now always non-nullable. This required PrimitiveArray::patch to gracefully handle non-nullable patches when the array is nullable. I modified the benchmarks to include patch manipulation time, but notice that the test data has no patches. The benchmarks measure the overhead of `is_valid`. If we had test data where the invalid positions contained exceptional values, I would expect a modest improvement in both decompression and compression time. As discussed [in slack](https://spiraldb.slack.com/archives/C07BV3GKAJ2/p1736894376100079), the `is_valid` is expensive for two reasons: (a) creation of a Scalar and (b) conversion to an Arrow BooleanBuffer. @gatesn is working on a FilterMask change that should subsume all our boolean compression strategies into a new "BoolMask". With that in place, we can require Validity::Array to hold a "BoolMask" and thus check validity without either of the issues above. This PR ------- ``` Timer precision: 41 ns alp_compress fastest │ slowest │ median │ mean │ samples │ iters ├─ compress_alp │ │ │ │ │ │ ├─ f32 │ │ │ │ │ │ │ ├─ (100000, 0.25) 170.2 µs │ 442.1 µs │ 170.7 µs │ 173.6 µs │ 100 │ 100 │ │ ├─ (100000, 0.95) 169.3 µs │ 202.2 µs │ 170.4 µs │ 171.7 µs │ 100 │ 100 │ │ ├─ (100000, 1.0) 136.3 µs │ 147.1 µs │ 136.9 µs │ 137.2 µs │ 100 │ 100 │ │ ├─ (10000000, 0.25) 13.6 ms │ 16.46 ms │ 14.39 ms │ 14.23 ms │ 100 │ 100 │ │ ├─ (10000000, 0.95) 13.64 ms │ 17.7 ms │ 14.44 ms │ 14.38 ms │ 100 │ 100 │ │ ╰─ (10000000, 1.0) 13.55 ms │ 14.97 ms │ 14.35 ms │ 14.23 ms │ 100 │ 100 │ ╰─ f64 │ │ │ │ │ │ ├─ (100000, 0.25) 240.7 µs │ 385.7 µs │ 247.8 µs │ 249.1 µs │ 100 │ 100 │ ├─ (100000, 0.95) 240.2 µs │ 253.2 µs │ 243.5 µs │ 244.4 µs │ 100 │ 100 │ ├─ (100000, 1.0) 172.6 µs │ 184.4 µs │ 175.2 µs │ 175.5 µs │ 100 │ 100 │ ├─ (10000000, 0.25) 15.95 ms │ 24.24 ms │ 16.61 ms │ 17.25 ms │ 100 │ 100 │ ├─ (10000000, 0.95) 15.95 ms │ 21.34 ms │ 16.39 ms │ 16.85 ms │ 100 │ 100 │ ╰─ (10000000, 1.0) 15.92 ms │ 23.41 ms │ 16.46 ms │ 17.04 ms │ 100 │ 100 ╰─ decompress_alp │ │ │ │ │ ├─ f32 │ │ │ │ │ │ ├─ (100000, 0.25) 12.2 µs │ 34.7 µs │ 12.29 µs │ 12.52 µs │ 100 │ 100 │ ├─ (100000, 0.95) 12.12 µs │ 12.74 µs │ 12.35 µs │ 12.37 µs │ 100 │ 100 │ ├─ (100000, 1.0) 12.16 µs │ 12.95 µs │ 12.37 µs │ 12.4 µs │ 100 │ 100 │ ├─ (10000000, 0.25) 2.117 ms │ 4.544 ms │ 2.637 ms │ 2.674 ms │ 100 │ 100 │ ├─ (10000000, 0.95) 2.085 ms │ 4.458 ms │ 2.362 ms │ 2.504 ms │ 100 │ 100 │ ╰─ (10000000, 1.0) 2.097 ms │ 3.875 ms │ 2.229 ms │ 2.338 ms │ 100 │ 100 ╰─ f64 │ │ │ │ │ ├─ (100000, 0.25) 23.41 µs │ 25.16 µs │ 23.66 µs │ 23.68 µs │ 100 │ 100 ├─ (100000, 0.95) 23.2 µs │ 24.7 µs │ 24.06 µs │ 24.05 µs │ 100 │ 100 ├─ (100000, 1.0) 22.79 µs │ 26.08 µs │ 22.91 µs │ 22.95 µs │ 100 │ 100 ├─ (10000000, 0.25) 4.216 ms │ 6.862 ms │ 4.416 ms │ 4.568 ms │ 100 │ 100 ├─ (10000000, 0.95) 4.242 ms │ 7.647 ms │ 4.59 ms │ 4.827 ms │ 100 │ 100 ╰─ (10000000, 1.0) 4.236 ms │ 8.129 ms │ 4.377 ms │ 4.507 ms │ 100 │ 100 ``` Develop ------- I patched develop with this PR's benchmark code ``` Timer precision: 41 ns alp_compress fastest │ slowest │ median │ mean │ samples │ iters ├─ compress_alp │ │ │ │ │ │ ├─ f32 │ │ │ │ │ │ │ ├─ (100000, 0.25) 136 µs │ 279.7 µs │ 136.8 µs │ 138.5 µs │ 100 │ 100 │ │ ├─ (100000, 0.95) 136.2 µs │ 149.7 µs │ 136.9 µs │ 137.6 µs │ 100 │ 100 │ │ ├─ (100000, 1.0) 136 µs │ 148.5 µs │ 136.6 µs │ 137.1 µs │ 100 │ 100 │ │ ├─ (10000000, 0.25) 13.68 ms │ 15.06 ms │ 14.07 ms │ 14.14 ms │ 100 │ 100 │ │ ├─ (10000000, 0.95) 13.67 ms │ 19.4 ms │ 14.05 ms │ 14.18 ms │ 100 │ 100 │ │ ╰─ (10000000, 1.0) 13.66 ms │ 14.73 ms │ 13.87 ms │ 14.04 ms │ 100 │ 100 │ ╰─ f64 │ │ │ │ │ │ ├─ (100000, 0.25) 167.7 µs │ 301.4 µs │ 172.7 µs │ 173.2 µs │ 100 │ 100 │ ├─ (100000, 0.95) 167.7 µs │ 187.7 µs │ 170.7 µs │ 171.1 µs │ 100 │ 100 │ ├─ (100000, 1.0) 167.6 µs │ 183.7 µs │ 170.9 µs │ 171.1 µs │ 100 │ 100 │ ├─ (10000000, 0.25) 15.54 ms │ 21.9 ms │ 15.92 ms │ 16.05 ms │ 100 │ 100 │ ├─ (10000000, 0.95) 15.61 ms │ 16.74 ms │ 15.97 ms │ 16.03 ms │ 100 │ 100 │ ╰─ (10000000, 1.0) 15.64 ms │ 18.85 ms │ 16.1 ms │ 16.23 ms │ 100 │ 100 ╰─ decompress_alp │ │ │ │ │ ├─ f32 │ │ │ │ │ │ ├─ (100000, 0.25) 12.37 µs │ 85.49 µs │ 12.49 µs │ 13.22 µs │ 100 │ 100 │ ├─ (100000, 0.95) 12.29 µs │ 12.74 µs │ 12.43 µs │ 12.44 µs │ 100 │ 100 │ ├─ (100000, 1.0) 11.7 µs │ 11.95 µs │ 11.83 µs │ 11.81 µs │ 100 │ 100 │ ├─ (10000000, 0.25) 2.081 ms │ 3.003 ms │ 2.175 ms │ 2.263 ms │ 100 │ 100 │ ├─ (10000000, 0.95) 2.082 ms │ 2.228 ms │ 2.109 ms │ 2.124 ms │ 100 │ 100 │ ╰─ (10000000, 1.0) 2.082 ms │ 2.904 ms │ 2.13 ms │ 2.202 ms │ 100 │ 100 ╰─ f64 │ │ │ │ │ ├─ (100000, 0.25) 23.66 µs │ 25.66 µs │ 23.83 µs │ 23.86 µs │ 100 │ 100 ├─ (100000, 0.95) 23.16 µs │ 24.62 µs │ 24.04 µs │ 23.89 µs │ 100 │ 100 ├─ (100000, 1.0) 22.87 µs │ 23.49 µs │ 23.04 µs │ 23.03 µs │ 100 │ 100 ├─ (10000000, 0.25) 4.221 ms │ 5.59 ms │ 4.326 ms │ 4.469 ms │ 100 │ 100 ├─ (10000000, 0.95) 4.242 ms │ 5.319 ms │ 4.545 ms │ 4.536 ms │ 100 │ 100 ╰─ (10000000, 1.0) 4.228 ms │ 5.652 ms │ 4.342 ms │ 4.519 ms │ 100 │ 100 ``` --- Cargo.lock | 1 + encodings/alp/Cargo.toml | 1 + encodings/alp/benches/alp_compress.rs | 55 ++++++++++++---- encodings/alp/src/alp/array.rs | 7 +- encodings/alp/src/alp/compress.rs | 59 ++++++++++++----- encodings/alp/src/alp/compute/mod.rs | 7 +- encodings/alp/src/alp/mod.rs | 79 ++++++++++++++++------- vortex-array/src/array/primitive/patch.rs | 10 ++- 8 files changed, 164 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fc6baf533aa..2f151e2cfef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4873,6 +4873,7 @@ dependencies = [ "divan", "itertools 0.14.0", "num-traits", + "rand", "rstest", "serde", "vortex-array", diff --git a/encodings/alp/Cargo.toml b/encodings/alp/Cargo.toml index fd7993c50aa..00d74c0822c 100644 --- a/encodings/alp/Cargo.toml +++ b/encodings/alp/Cargo.toml @@ -29,6 +29,7 @@ vortex-scalar = { workspace = true } [dev-dependencies] divan = { workspace = true } +rand = { workspace = true } rstest = { workspace = true } vortex-array = { workspace = true, features = ["test-harness"] } diff --git a/encodings/alp/benches/alp_compress.rs b/encodings/alp/benches/alp_compress.rs index d51d34038de..91aa4042ae5 100644 --- a/encodings/alp/benches/alp_compress.rs +++ b/encodings/alp/benches/alp_compress.rs @@ -1,27 +1,60 @@ #![allow(clippy::unwrap_used)] use divan::Bencher; -use vortex_alp::{ALPFloat, ALPRDFloat, Exponents, RDEncoder}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng as _}; +use vortex_alp::{alp_encode, ALPFloat, ALPRDFloat, RDEncoder}; use vortex_array::array::PrimitiveArray; use vortex_array::validity::Validity; use vortex_array::IntoCanonical; -use vortex_buffer::{buffer, Buffer}; +use vortex_buffer::buffer; +use vortex_dtype::NativePType; fn main() { divan::main(); } -#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])] -fn compress_alp(n: usize) -> (Exponents, Buffer, Buffer, Buffer) { - let values: Vec = vec![T::from(1.234).unwrap(); n]; - T::encode(values.as_slice(), None) +#[divan::bench(types = [f32, f64], args = [ + (100_000, 1.0), + (10_000_000, 1.0), + (100_000, 0.25), + (10_000_000, 0.25), + (100_000, 0.95), + (10_000_000, 0.95), +])] +fn compress_alp(bencher: Bencher, args: (usize, f64)) -> () { + let (n, fraction_valid) = args; + let mut rng = StdRng::seed_from_u64(0); + let values = buffer![T::from(1.234).unwrap(); n]; + let validity = if fraction_valid < 1.0 { + Validity::from_iter((0..values.len()).map(|_| rng.gen_bool(fraction_valid))) + } else { + Validity::NonNullable + }; + bencher.bench_local(move || { + alp_encode(&PrimitiveArray::new(values.clone(), validity.clone())).unwrap() + }) } -#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])] -fn decompress_alp(bencher: Bencher, n: usize) { - let values: Vec = vec![T::from(1.234).unwrap(); n]; - let (exponents, encoded, ..) = T::encode(values.as_slice(), None); - bencher.bench_local(move || T::decode(&encoded, exponents)); +#[divan::bench(types = [f32, f64], args = [ + (100_000, 1.0), + (10_000_000, 1.0), + (100_000, 0.25), + (10_000_000, 0.25), + (100_000, 0.95), + (10_000_000, 0.95), +])] +fn decompress_alp(bencher: Bencher, args: (usize, f64)) { + let (n, fraction_valid) = args; + let mut rng = StdRng::seed_from_u64(0); + let values = buffer![T::from(1.234).unwrap(); n]; + let validity = if fraction_valid < 1.0 { + Validity::from_iter((0..values.len()).map(|_| rng.gen_bool(fraction_valid))) + } else { + Validity::NonNullable + }; + let array = alp_encode(&PrimitiveArray::new(values, validity)).unwrap(); + bencher.bench_local(move || array.clone().into_canonical().unwrap()); } #[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])] diff --git a/encodings/alp/src/alp/array.rs b/encodings/alp/src/alp/array.rs index e494cdc673f..ef53d611372 100644 --- a/encodings/alp/src/alp/array.rs +++ b/encodings/alp/src/alp/array.rs @@ -48,13 +48,16 @@ impl ALPArray { let mut children = Vec::with_capacity(2); children.push(encoded); if let Some(patches) = &patches { + if patches.dtype().is_nullable() { + vortex_bail!(MismatchedTypes: "patches should be non-nullable", patches.dtype()); + } children.push(patches.indices().clone()); children.push(patches.values().clone()); } let patches = patches .as_ref() - .map(|p| p.to_metadata(length, &dtype)) + .map(|p| p.to_metadata(length, &dtype.as_nonnullable())) .transpose()?; Self::try_from_parts( @@ -93,7 +96,7 @@ impl ALPArray { .child(1, &p.indices_dtype(), p.len()) .vortex_expect("ALPArray: patch indices"), self.as_ref() - .child(2, self.dtype(), p.len()) + .child(2, &self.dtype().as_nonnullable(), p.len()) .vortex_expect("ALPArray: patch values"), ) }) diff --git a/encodings/alp/src/alp/compress.rs b/encodings/alp/src/alp/compress.rs index a86dc589618..764bf94c67a 100644 --- a/encodings/alp/src/alp/compress.rs +++ b/encodings/alp/src/alp/compress.rs @@ -3,7 +3,7 @@ use vortex_array::patches::Patches; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; use vortex_dtype::{NativePType, PType}; -use vortex_error::{vortex_bail, VortexResult, VortexUnwrap}; +use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::ScalarType; use crate::alp::{ALPArray, ALPFloat}; @@ -27,33 +27,29 @@ macro_rules! match_each_alp_float_ptype { pub fn alp_encode_components( values: &PrimitiveArray, exponents: Option, -) -> (Exponents, ArrayData, Option) +) -> VortexResult<(Exponents, ArrayData, Option)> where T: ALPFloat + NativePType, T::ALPInt: NativePType, T: ScalarType, { - let (exponents, encoded, exc_pos, exc) = T::encode(values.as_slice::(), exponents); + let (exponents, encoded, exc_pos, exc) = + T::encode(values.as_slice::(), &values.validity(), exponents)?; let len = encoded.len(); - ( + Ok(( exponents, PrimitiveArray::new(encoded, values.validity()).into_array(), (!exc.is_empty()).then(|| { let position_arr = exc_pos.into_array(); - let patch_validity = values.validity().take(&position_arr).vortex_unwrap(); - Patches::new( - len, - position_arr, - PrimitiveArray::new(exc, patch_validity).into_array(), - ) + Patches::new(len, position_arr, exc.into_array()) }), - ) + )) } pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult { let (exponents, encoded, patches) = match parray.ptype() { - PType::F32 => alp_encode_components::(parray, None), - PType::F64 => alp_encode_components::(parray, None), + PType::F32 => alp_encode_components::(parray, None)?, + PType::F64 => alp_encode_components::(parray, None)?, _ => vortex_bail!("ALP can only encode f32 and f64"), }; ALPArray::try_new(encoded, exponents, patches) @@ -83,7 +79,7 @@ mod tests { use core::f64; use vortex_array::compute::scalar_at; - use vortex_array::validity::Validity; + use vortex_array::validity::{ArrayValidity as _, Validity}; use vortex_buffer::{buffer, Buffer}; use super::*; @@ -148,6 +144,39 @@ mod tests { assert_eq!(values.as_slice(), decoded.as_slice::()); } + #[test] + #[allow(clippy::approx_constant)] // ALP doesn't like E + fn test_compress_ignores_invalid_exceptional_values() { + let values = buffer![1.234f64, 2.718, f64::consts::PI, 4.0]; + let array = PrimitiveArray::new(values, Validity::from_iter([true, true, false, true])); + let encoded = alp_encode(&array).unwrap(); + assert!(encoded.patches().is_none()); + assert_eq!( + encoded + .encoded() + .into_primitive() + .unwrap() + .as_slice::(), + vec![1234i64, 2718, 3142, 4000] // fill forward + ); + assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 }); + + let decoded = decompress(encoded).unwrap(); + assert_eq!( + scalar_at(&decoded, 0).unwrap(), + scalar_at(&array, 0).unwrap() + ); + assert_eq!( + scalar_at(&decoded, 1).unwrap(), + scalar_at(&array, 1).unwrap() + ); + assert!(!decoded.is_valid(2)); + assert_eq!( + scalar_at(&decoded, 3).unwrap(), + scalar_at(&array, 3).unwrap() + ); + } + #[test] #[allow(clippy::approx_constant)] // ALP doesn't like E fn test_nullable_patched_scalar_at() { @@ -168,6 +197,7 @@ mod tests { assert!(s.is_valid()); } + assert!(!encoded.is_valid(4)); let s = scalar_at(encoded.as_ref(), 4).unwrap(); assert!(s.is_null()); @@ -190,7 +220,6 @@ mod tests { ); let alp_arr = alp_encode(&original).unwrap(); let decompressed = alp_arr.into_primitive().unwrap(); - assert_eq!(original.as_slice::(), decompressed.as_slice::()); assert_eq!(original.validity(), decompressed.validity()); } } diff --git a/encodings/alp/src/alp/compute/mod.rs b/encodings/alp/src/alp/compute/mod.rs index 74c0546ff6f..2e3b47c6cd5 100644 --- a/encodings/alp/src/alp/compute/mod.rs +++ b/encodings/alp/src/alp/compute/mod.rs @@ -2,6 +2,7 @@ use vortex_array::compute::{ filter, scalar_at, slice, take, ComputeVTable, FilterFn, FilterMask, ScalarAtFn, SliceFn, TakeFn, }; +use vortex_array::validity::ArrayValidity as _; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ArrayDType, ArrayData, IntoArrayData}; use vortex_error::VortexResult; @@ -29,9 +30,13 @@ impl ComputeVTable for ALPEncoding { impl ScalarAtFn for ALPEncoding { fn scalar_at(&self, array: &ALPArray, index: usize) -> VortexResult { + if !array.encoded().is_valid(index) { + return Ok(Scalar::null(array.dtype().clone())); + } + if let Some(patches) = array.patches() { if let Some(patch) = patches.get_patched(index)? { - return Ok(patch); + return patch.cast(array.dtype()); } } diff --git a/encodings/alp/src/alp/mod.rs b/encodings/alp/src/alp/mod.rs index 06ba4876cb2..8ec7486813c 100644 --- a/encodings/alp/src/alp/mod.rs +++ b/encodings/alp/src/alp/mod.rs @@ -11,7 +11,11 @@ mod compute; pub use array::*; pub use compress::*; +use vortex_array::array::PrimitiveArray; +use vortex_array::validity::Validity; +use vortex_array::IntoArrayData as _; use vortex_buffer::{Buffer, BufferMut}; +use vortex_error::VortexResult; const SAMPLE_SIZE: usize = 32; @@ -55,24 +59,45 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { /// Convert from the integer type back to the float type using `as`. fn from_int(n: Self::ALPInt) -> Self; - fn find_best_exponents(values: &[Self]) -> Exponents { - let mut best_exp = Exponents { e: 0, f: 0 }; - let mut best_nbytes: usize = usize::MAX; - - let sample = (values.len() > SAMPLE_SIZE).then(|| { - values + fn sampled_find_best_exponents( + values: &[Self], + validity: &Validity, + ) -> VortexResult { + if values.len() <= SAMPLE_SIZE { + Self::find_best_exponents(values, validity) + } else { + let validity = validity.take( + &PrimitiveArray::from_iter( + (0..values.len()) + .step_by(values.len() / SAMPLE_SIZE) + .map(|x| x as u64), + ) + .into_array(), + )?; + let values = values .iter() .step_by(values.len() / SAMPLE_SIZE) .cloned() - .collect_vec() - }); + .collect_vec(); + Self::find_best_exponents(&values, &validity) + } + } + + fn find_best_exponents(values: &[Self], validity: &Validity) -> VortexResult { + let mut best_exp = Exponents { e: 0, f: 0 }; + let mut best_nbytes: usize = usize::MAX; + + assert!( + values.len() <= SAMPLE_SIZE, + "{} <= {}", + values.len(), + SAMPLE_SIZE + ); for e in (0..Self::MAX_EXPONENT).rev() { for f in 0..e { - let (_, encoded, _, exc_patches) = Self::encode( - sample.as_deref().unwrap_or(values), - Some(Exponents { e, f }), - ); + let (_, encoded, _, exc_patches) = + Self::encode(values, validity, Some(Exponents { e, f }))?; let size = Self::estimate_encoded_size(&encoded, &exc_patches); if size < best_nbytes { @@ -84,7 +109,7 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { } } - best_exp + Ok(best_exp) } #[inline] @@ -112,11 +137,16 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { encoded_bytes + patch_bytes } + #[allow(clippy::type_complexity)] fn encode( values: &[Self], + validity: &Validity, exponents: Option, - ) -> (Exponents, Buffer, Buffer, Buffer) { - let exp = exponents.unwrap_or_else(|| Self::find_best_exponents(values)); + ) -> VortexResult<(Exponents, Buffer, Buffer, Buffer)> { + let exponents = match exponents { + Some(exponents) => exponents, + None => Self::sampled_find_best_exponents(values, validity)?, + }; let mut encoded_output = BufferMut::::with_capacity(values.len()); let mut patch_indices = BufferMut::::with_capacity(values.len()); @@ -129,20 +159,21 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { for chunk in values.chunks(encode_chunk_size) { encode_chunk_unchecked( chunk, - exp, + exponents, &mut encoded_output, &mut patch_indices, &mut patch_values, &mut fill_value, + validity, ); } - ( - exp, + Ok(( + exponents, encoded_output.freeze(), patch_indices.freeze(), patch_values.freeze(), - ) + )) } #[inline] @@ -191,6 +222,7 @@ fn encode_chunk_unchecked( patch_indices: &mut BufferMut, patch_values: &mut BufferMut, fill_value: &mut Option, + validity: &Validity, ) { let num_prev_encoded = encoded_output.len(); let num_prev_patches = patch_indices.len(); @@ -224,12 +256,13 @@ fn encode_chunk_unchecked( // write() is only safe to call more than once because the values are primitive (i.e., Drop is a no-op) patch_indices_mut[chunk_patch_index].write(i as u64); patch_values_mut[chunk_patch_index].write(chunk[i - num_prev_encoded]); - chunk_patch_index += (decoded != chunk[i - num_prev_encoded]) as usize; + let is_valid_and_an_exception = + (decoded != chunk[i - num_prev_encoded]) && validity.is_valid(i); + chunk_patch_index += is_valid_and_an_exception as usize; } - assert_eq!(chunk_patch_index, chunk_patch_count); unsafe { - patch_indices.set_len(num_prev_patches + chunk_patch_count); - patch_values.set_len(num_prev_patches + chunk_patch_count); + patch_indices.set_len(num_prev_patches + chunk_patch_index); + patch_values.set_len(num_prev_patches + chunk_patch_index); } } diff --git a/vortex-array/src/array/primitive/patch.rs b/vortex-array/src/array/primitive/patch.rs index e4bb6a6d6bd..b16fd2ae31a 100644 --- a/vortex-array/src/array/primitive/patch.rs +++ b/vortex-array/src/array/primitive/patch.rs @@ -15,9 +15,13 @@ impl PrimitiveArray { let patch_indices = patch_indices.into_primitive()?; let patch_values = patch_values.into_primitive()?; - let patched_validity = - self.validity() - .patch(self.len(), patch_indices.as_ref(), patch_values.validity())?; + let patched_validity = match patch_values.validity() { + Validity::NonNullable => self.validity(), + patch_validity => { + self.validity() + .patch(self.len(), patch_indices.as_ref(), patch_validity)? + } + }; match_each_integer_ptype!(patch_indices.ptype(), |$I| { match_each_native_ptype!(self.ptype(), |$T| { From 88b28f0db0e37e94c02085e9f069f8590df42254 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Wed, 15 Jan 2025 10:38:20 -0500 Subject: [PATCH 02/12] minor fixes --- encodings/alp/src/alp/mod.rs | 4 +++- vortex-sampling-compressor/src/compressors/alp.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/encodings/alp/src/alp/mod.rs b/encodings/alp/src/alp/mod.rs index 8ec7486813c..bd368903a76 100644 --- a/encodings/alp/src/alp/mod.rs +++ b/encodings/alp/src/alp/mod.rs @@ -70,13 +70,15 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { &PrimitiveArray::from_iter( (0..values.len()) .step_by(values.len() / SAMPLE_SIZE) - .map(|x| x as u64), + .map(|x| x as u64) + .take(SAMPLE_SIZE), ) .into_array(), )?; let values = values .iter() .step_by(values.len() / SAMPLE_SIZE) + .take(SAMPLE_SIZE) .cloned() .collect_vec(); Self::find_best_exponents(&values, &validity) diff --git a/vortex-sampling-compressor/src/compressors/alp.rs b/vortex-sampling-compressor/src/compressors/alp.rs index dbcfe4fe464..5bb62a7d002 100644 --- a/vortex-sampling-compressor/src/compressors/alp.rs +++ b/vortex-sampling-compressor/src/compressors/alp.rs @@ -49,7 +49,7 @@ impl EncodingCompressor for ALPCompressor { let (exponents, encoded, patches) = match_each_alp_float_ptype!( parray.ptype(), |$T| { alp_encode_components::<$T>(&parray, None) - }); + })?; let compressed_encoded = ctx .named("packed") From ef3a022b940e3446a443197487ca6c305971ac54 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Wed, 15 Jan 2025 11:41:27 -0500 Subject: [PATCH 03/12] clippy and python test --- docs/quickstart.rst | 2 +- encodings/alp/benches/alp_compress.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/quickstart.rst b/docs/quickstart.rst index 6708ec6830b..e04e8d91c8f 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -46,7 +46,7 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the >>> cvtx = vortex.compress(vtx) >>> cvtx.nbytes - 16604 + 16596 >>> cvtx.nbytes / vtx.nbytes 0.11... diff --git a/encodings/alp/benches/alp_compress.rs b/encodings/alp/benches/alp_compress.rs index 91aa4042ae5..a2505efeb38 100644 --- a/encodings/alp/benches/alp_compress.rs +++ b/encodings/alp/benches/alp_compress.rs @@ -22,7 +22,7 @@ fn main() { (100_000, 0.95), (10_000_000, 0.95), ])] -fn compress_alp(bencher: Bencher, args: (usize, f64)) -> () { +fn compress_alp(bencher: Bencher, args: (usize, f64)) { let (n, fraction_valid) = args; let mut rng = StdRng::seed_from_u64(0); let values = buffer![T::from(1.234).unwrap(); n]; From 2c0af3b06fc100f0ad08ba4e4febc59ddf87693b Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 16 Jan 2025 11:34:47 -0500 Subject: [PATCH 04/12] patches has same dtype as array --- encodings/alp/src/alp/array.rs | 16 ++++++++++++---- encodings/alp/src/alp/compress.rs | 12 +++++++++++- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/encodings/alp/src/alp/array.rs b/encodings/alp/src/alp/array.rs index ef53d611372..1ac11a7a78f 100644 --- a/encodings/alp/src/alp/array.rs +++ b/encodings/alp/src/alp/array.rs @@ -48,16 +48,24 @@ impl ALPArray { let mut children = Vec::with_capacity(2); children.push(encoded); if let Some(patches) = &patches { - if patches.dtype().is_nullable() { - vortex_bail!(MismatchedTypes: "patches should be non-nullable", patches.dtype()); + if patches.dtype() != &dtype { + vortex_bail!(MismatchedTypes: dtype, patches.dtype()); } + + if !matches!( + patches.values().logical_validity(), + LogicalValidity::AllValid(_) + ) { + vortex_bail!("ALPArray: patches must not contain invalid entries"); + } + children.push(patches.indices().clone()); children.push(patches.values().clone()); } let patches = patches .as_ref() - .map(|p| p.to_metadata(length, &dtype.as_nonnullable())) + .map(|p| p.to_metadata(length, &dtype)) .transpose()?; Self::try_from_parts( @@ -96,7 +104,7 @@ impl ALPArray { .child(1, &p.indices_dtype(), p.len()) .vortex_expect("ALPArray: patch indices"), self.as_ref() - .child(2, &self.dtype().as_nonnullable(), p.len()) + .child(2, &self.dtype(), p.len()) .vortex_expect("ALPArray: patch values"), ) }) diff --git a/encodings/alp/src/alp/compress.rs b/encodings/alp/src/alp/compress.rs index 764bf94c67a..531bbc78bb4 100644 --- a/encodings/alp/src/alp/compress.rs +++ b/encodings/alp/src/alp/compress.rs @@ -1,5 +1,6 @@ use vortex_array::array::PrimitiveArray; use vortex_array::patches::Patches; +use vortex_array::validity::Validity; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; use vortex_dtype::{NativePType, PType}; @@ -36,12 +37,21 @@ where let (exponents, encoded, exc_pos, exc) = T::encode(values.as_slice::(), &values.validity(), exponents)?; let len = encoded.len(); + let patches_validity = if values.dtype().is_nullable() { + Validity::AllValid + } else { + Validity::NonNullable + }; Ok(( exponents, PrimitiveArray::new(encoded, values.validity()).into_array(), (!exc.is_empty()).then(|| { let position_arr = exc_pos.into_array(); - Patches::new(len, position_arr, exc.into_array()) + Patches::new( + len, + position_arr, + PrimitiveArray::new(exc, patches_validity).into_array(), + ) }), )) } From d78ec6ab627ef7d21e46f33cb615b7a22551268b Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 16 Jan 2025 12:11:49 -0500 Subject: [PATCH 05/12] pull patch filter to alp_encode_components --- encodings/alp/src/alp/compress.rs | 27 ++++++-- encodings/alp/src/alp/mod.rs | 80 +++++++---------------- vortex-array/src/array/primitive/patch.rs | 10 +-- 3 files changed, 46 insertions(+), 71 deletions(-) diff --git a/encodings/alp/src/alp/compress.rs b/encodings/alp/src/alp/compress.rs index 531bbc78bb4..bcbf49552aa 100644 --- a/encodings/alp/src/alp/compress.rs +++ b/encodings/alp/src/alp/compress.rs @@ -1,8 +1,10 @@ +use itertools::Itertools as _; use vortex_array::array::PrimitiveArray; use vortex_array::patches::Patches; -use vortex_array::validity::Validity; +use vortex_array::validity::{ArrayValidity as _, Validity}; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; +use vortex_buffer::BufferMut; use vortex_dtype::{NativePType, PType}; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::ScalarType; @@ -25,6 +27,7 @@ macro_rules! match_each_alp_float_ptype { }) } +#[allow(clippy::cast_possible_truncation)] pub fn alp_encode_components( values: &PrimitiveArray, exponents: Option, @@ -34,23 +37,33 @@ where T::ALPInt: NativePType, T: ScalarType, { - let (exponents, encoded, exc_pos, exc) = - T::encode(values.as_slice::(), &values.validity(), exponents)?; + let (exponents, encoded, exc_pos, exc) = T::encode(values.as_slice::(), exponents); let len = encoded.len(); let patches_validity = if values.dtype().is_nullable() { Validity::AllValid } else { Validity::NonNullable }; + + let mut exc_pos_valid_only = + BufferMut::::with_capacity_aligned(exc_pos.len(), exc_pos.alignment()); + let mut exc_valid_only = BufferMut::::with_capacity_aligned(exc.len(), exc.alignment()); + for (position, value) in exc_pos.into_iter().zip_eq(exc.into_iter()) { + if values.is_valid(position as usize) { + exc_pos_valid_only.push(position); + exc_valid_only.push(value); + } + } + Ok(( exponents, PrimitiveArray::new(encoded, values.validity()).into_array(), - (!exc.is_empty()).then(|| { - let position_arr = exc_pos.into_array(); + (!exc_valid_only.is_empty()).then(|| { + let position_arr = exc_pos_valid_only.freeze().into_array(); Patches::new( len, position_arr, - PrimitiveArray::new(exc, patches_validity).into_array(), + PrimitiveArray::new(exc_valid_only.freeze(), patches_validity).into_array(), ) }), )) @@ -167,7 +180,7 @@ mod tests { .into_primitive() .unwrap() .as_slice::(), - vec![1234i64, 2718, 3142, 4000] // fill forward + vec![1234i64, 2718, 1234, 4000] // fill forward ); assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 }); diff --git a/encodings/alp/src/alp/mod.rs b/encodings/alp/src/alp/mod.rs index bd368903a76..0b95990aac5 100644 --- a/encodings/alp/src/alp/mod.rs +++ b/encodings/alp/src/alp/mod.rs @@ -11,11 +11,7 @@ mod compute; pub use array::*; pub use compress::*; -use vortex_array::array::PrimitiveArray; -use vortex_array::validity::Validity; -use vortex_array::IntoArrayData as _; use vortex_buffer::{Buffer, BufferMut}; -use vortex_error::VortexResult; const SAMPLE_SIZE: usize = 32; @@ -59,47 +55,25 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { /// Convert from the integer type back to the float type using `as`. fn from_int(n: Self::ALPInt) -> Self; - fn sampled_find_best_exponents( - values: &[Self], - validity: &Validity, - ) -> VortexResult { - if values.len() <= SAMPLE_SIZE { - Self::find_best_exponents(values, validity) - } else { - let validity = validity.take( - &PrimitiveArray::from_iter( - (0..values.len()) - .step_by(values.len() / SAMPLE_SIZE) - .map(|x| x as u64) - .take(SAMPLE_SIZE), - ) - .into_array(), - )?; - let values = values + fn find_best_exponents(values: &[Self]) -> Exponents { + let mut best_exp = Exponents { e: 0, f: 0 }; + let mut best_nbytes: usize = usize::MAX; + + let sample = (values.len() > SAMPLE_SIZE).then(|| { + values .iter() .step_by(values.len() / SAMPLE_SIZE) .take(SAMPLE_SIZE) .cloned() - .collect_vec(); - Self::find_best_exponents(&values, &validity) - } - } - - fn find_best_exponents(values: &[Self], validity: &Validity) -> VortexResult { - let mut best_exp = Exponents { e: 0, f: 0 }; - let mut best_nbytes: usize = usize::MAX; - - assert!( - values.len() <= SAMPLE_SIZE, - "{} <= {}", - values.len(), - SAMPLE_SIZE - ); + .collect_vec() + }); for e in (0..Self::MAX_EXPONENT).rev() { for f in 0..e { - let (_, encoded, _, exc_patches) = - Self::encode(values, validity, Some(Exponents { e, f }))?; + let (_, encoded, _, exc_patches) = Self::encode( + sample.as_deref().unwrap_or(values), + Some(Exponents { e, f }), + ); let size = Self::estimate_encoded_size(&encoded, &exc_patches); if size < best_nbytes { @@ -111,7 +85,7 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { } } - Ok(best_exp) + best_exp } #[inline] @@ -139,16 +113,11 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { encoded_bytes + patch_bytes } - #[allow(clippy::type_complexity)] fn encode( values: &[Self], - validity: &Validity, exponents: Option, - ) -> VortexResult<(Exponents, Buffer, Buffer, Buffer)> { - let exponents = match exponents { - Some(exponents) => exponents, - None => Self::sampled_find_best_exponents(values, validity)?, - }; + ) -> (Exponents, Buffer, Buffer, Buffer) { + let exp = exponents.unwrap_or_else(|| Self::find_best_exponents(values)); let mut encoded_output = BufferMut::::with_capacity(values.len()); let mut patch_indices = BufferMut::::with_capacity(values.len()); @@ -161,21 +130,20 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { for chunk in values.chunks(encode_chunk_size) { encode_chunk_unchecked( chunk, - exponents, + exp, &mut encoded_output, &mut patch_indices, &mut patch_values, &mut fill_value, - validity, ); } - Ok(( - exponents, + ( + exp, encoded_output.freeze(), patch_indices.freeze(), patch_values.freeze(), - )) + ) } #[inline] @@ -224,7 +192,6 @@ fn encode_chunk_unchecked( patch_indices: &mut BufferMut, patch_values: &mut BufferMut, fill_value: &mut Option, - validity: &Validity, ) { let num_prev_encoded = encoded_output.len(); let num_prev_patches = patch_indices.len(); @@ -258,13 +225,12 @@ fn encode_chunk_unchecked( // write() is only safe to call more than once because the values are primitive (i.e., Drop is a no-op) patch_indices_mut[chunk_patch_index].write(i as u64); patch_values_mut[chunk_patch_index].write(chunk[i - num_prev_encoded]); - let is_valid_and_an_exception = - (decoded != chunk[i - num_prev_encoded]) && validity.is_valid(i); - chunk_patch_index += is_valid_and_an_exception as usize; + chunk_patch_index += (decoded != chunk[i - num_prev_encoded]) as usize; } + assert_eq!(chunk_patch_index, chunk_patch_count); unsafe { - patch_indices.set_len(num_prev_patches + chunk_patch_index); - patch_values.set_len(num_prev_patches + chunk_patch_index); + patch_indices.set_len(num_prev_patches + chunk_patch_count); + patch_values.set_len(num_prev_patches + chunk_patch_count); } } diff --git a/vortex-array/src/array/primitive/patch.rs b/vortex-array/src/array/primitive/patch.rs index b16fd2ae31a..e4bb6a6d6bd 100644 --- a/vortex-array/src/array/primitive/patch.rs +++ b/vortex-array/src/array/primitive/patch.rs @@ -15,13 +15,9 @@ impl PrimitiveArray { let patch_indices = patch_indices.into_primitive()?; let patch_values = patch_values.into_primitive()?; - let patched_validity = match patch_values.validity() { - Validity::NonNullable => self.validity(), - patch_validity => { - self.validity() - .patch(self.len(), patch_indices.as_ref(), patch_validity)? - } - }; + let patched_validity = + self.validity() + .patch(self.len(), patch_indices.as_ref(), patch_values.validity())?; match_each_integer_ptype!(patch_indices.ptype(), |$I| { match_each_native_ptype!(self.ptype(), |$T| { From e99193070bc563244ccbee57a06487ebcf04f23c Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 16 Jan 2025 14:07:13 -0500 Subject: [PATCH 06/12] ensure patches null count is zero --- docs/quickstart.rst | 2 +- encodings/alp/src/alp/array.rs | 15 +++++++----- encodings/runend/src/array.rs | 45 +++++++++++++++++++++++++++++++++- vortex-array/src/validity.rs | 2 +- 4 files changed, 55 insertions(+), 9 deletions(-) diff --git a/docs/quickstart.rst b/docs/quickstart.rst index e04e8d91c8f..6708ec6830b 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -46,7 +46,7 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the >>> cvtx = vortex.compress(vtx) >>> cvtx.nbytes - 16596 + 16604 >>> cvtx.nbytes / vtx.nbytes 0.11... diff --git a/encodings/alp/src/alp/array.rs b/encodings/alp/src/alp/array.rs index 1ac11a7a78f..dcb246a548c 100644 --- a/encodings/alp/src/alp/array.rs +++ b/encodings/alp/src/alp/array.rs @@ -2,6 +2,7 @@ use std::fmt::{Debug, Display}; use serde::{Deserialize, Serialize}; use vortex_array::array::PrimitiveArray; +use vortex_array::compute::scalar_at; use vortex_array::encoding::ids; use vortex_array::patches::{Patches, PatchesMetadata}; use vortex_array::stats::StatisticsVTable; @@ -52,11 +53,13 @@ impl ALPArray { vortex_bail!(MismatchedTypes: dtype, patches.dtype()); } - if !matches!( - patches.values().logical_validity(), - LogicalValidity::AllValid(_) - ) { - vortex_bail!("ALPArray: patches must not contain invalid entries"); + if patches.values().logical_validity().null_count()? != 0 { + vortex_bail!( + "ALPArray: patches must not contain invalid entries {:?}", + (0..patches.values().len()) + .map(|index| format!("{}", scalar_at(patches.values(), index).unwrap())) + .collect::>() + ); } children.push(patches.indices().clone()); @@ -104,7 +107,7 @@ impl ALPArray { .child(1, &p.indices_dtype(), p.len()) .vortex_expect("ALPArray: patch indices"), self.as_ref() - .child(2, &self.dtype(), p.len()) + .child(2, self.dtype(), p.len()) .vortex_expect("ALPArray: patch values"), ) }) diff --git a/encodings/runend/src/array.rs b/encodings/runend/src/array.rs index d714c9dbc08..3668904502d 100644 --- a/encodings/runend/src/array.rs +++ b/encodings/runend/src/array.rs @@ -15,7 +15,7 @@ use vortex_array::{ IntoArrayVariant, IntoCanonical, }; use vortex_buffer::Buffer; -use vortex_dtype::{DType, PType}; +use vortex_dtype::{match_each_unsigned_integer_ptype, DType, PType}; use vortex_error::{vortex_bail, VortexExpect as _, VortexResult}; use vortex_scalar::Scalar; @@ -235,6 +235,49 @@ impl StatisticsVTable for RunEndEncoding { .unwrap_or(false) && array.logical_validity().all_valid(), )), + Stat::TrueCount => match array.dtype() { + DType::Bool(_) => { + let ends = array.ends().into_primitive()?; + let bools = array.values().into_bool()?.boolean_buffer(); + let true_count: u64 = match_each_unsigned_integer_ptype!(ends.ptype(), |$P| { + let mut begin: $P = 0; + ends + .as_slice::<$P>() + .iter() + .enumerate() + .map(|(index, end)| { + let len = *end - begin; + begin = *end; + (len as u64) * (bools.value(index as usize) as u64) + }) + .sum() + }); + Some(Scalar::from(true_count)) + } + DType::Primitive(..) => None, + dtype => vortex_bail!("invalid dtype: {}", dtype), + }, + Stat::NullCount => { + let ends = array.ends().into_primitive()?; + let null_count: u64 = match array.values().logical_validity() { + LogicalValidity::AllValid(_) => 0_u64, + LogicalValidity::AllInvalid(len) => len as u64, + LogicalValidity::Array(is_valid) => { + let is_valid = is_valid.into_bool()?.boolean_buffer(); + match_each_unsigned_integer_ptype!(ends.ptype(), |$P| { + ends + .as_slice::<$P>() + .iter() + .enumerate() + .map(|(index, end)| { + (*end as u64) * ((!is_valid.value(index as usize)) as u64) + }) + .sum() + }) + } + }; + Some(Scalar::from(null_count)) + } _ => None, }; diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index 6128796a790..94d4b362385 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -516,7 +516,7 @@ impl LogicalValidity { Self::AllInvalid(len) => Ok(*len), Self::Array(a) => { let true_count = a.statistics().compute_true_count().ok_or_else(|| { - vortex_err!("Failed to compute true count from validity array") + vortex_err!("Failed to compute true count from validity array {:?}", a) })?; Ok(a.len() - true_count) } From b273378d050441f846ff22326f1efcd0e7d1363d Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 16 Jan 2025 16:13:34 -0500 Subject: [PATCH 07/12] do not print the bad patches values array --- encodings/alp/src/alp/array.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/encodings/alp/src/alp/array.rs b/encodings/alp/src/alp/array.rs index dcb246a548c..345182be2cc 100644 --- a/encodings/alp/src/alp/array.rs +++ b/encodings/alp/src/alp/array.rs @@ -54,12 +54,7 @@ impl ALPArray { } if patches.values().logical_validity().null_count()? != 0 { - vortex_bail!( - "ALPArray: patches must not contain invalid entries {:?}", - (0..patches.values().len()) - .map(|index| format!("{}", scalar_at(patches.values(), index).unwrap())) - .collect::>() - ); + vortex_bail!("ALPArray: patches must not contain invalid entries"); } children.push(patches.indices().clone()); From b3f865ba103bc08a49467857ab0046d2d01312df Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 16 Jan 2025 16:17:19 -0500 Subject: [PATCH 08/12] clippy --- encodings/alp/src/alp/array.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/encodings/alp/src/alp/array.rs b/encodings/alp/src/alp/array.rs index 345182be2cc..54e98efefcd 100644 --- a/encodings/alp/src/alp/array.rs +++ b/encodings/alp/src/alp/array.rs @@ -2,7 +2,6 @@ use std::fmt::{Debug, Display}; use serde::{Deserialize, Serialize}; use vortex_array::array::PrimitiveArray; -use vortex_array::compute::scalar_at; use vortex_array::encoding::ids; use vortex_array::patches::{Patches, PatchesMetadata}; use vortex_array::stats::StatisticsVTable; From a7f18b581b805589004e5ec95ae3174df4e1ef07 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Fri, 17 Jan 2025 15:42:35 -0500 Subject: [PATCH 09/12] wip: alp encode handles nulls better --- Cargo.lock | 1 + encodings/alp/Cargo.toml | 1 + encodings/alp/src/alp/compress.rs | 95 +++++++++++-------- encodings/alp/src/alp/mod.rs | 65 ++++++------- .../src/compressors/alp.rs | 11 +-- 5 files changed, 87 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f151e2cfef..807a1104888 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4870,6 +4870,7 @@ dependencies = [ name = "vortex-alp" version = "0.21.1" dependencies = [ + "arrow-array", "divan", "itertools 0.14.0", "num-traits", diff --git a/encodings/alp/Cargo.toml b/encodings/alp/Cargo.toml index 00d74c0822c..a4a86b79c5a 100644 --- a/encodings/alp/Cargo.toml +++ b/encodings/alp/Cargo.toml @@ -17,6 +17,7 @@ readme = { workspace = true } workspace = true [dependencies] +arrow-array = { workspace = true } itertools = { workspace = true } num-traits = { workspace = true } serde = { workspace = true, features = ["derive"] } diff --git a/encodings/alp/src/alp/compress.rs b/encodings/alp/src/alp/compress.rs index bcbf49552aa..1dbfd814876 100644 --- a/encodings/alp/src/alp/compress.rs +++ b/encodings/alp/src/alp/compress.rs @@ -1,10 +1,9 @@ -use itertools::Itertools as _; use vortex_array::array::PrimitiveArray; use vortex_array::patches::Patches; -use vortex_array::validity::{ArrayValidity as _, Validity}; +use vortex_array::validity::{ArrayValidity as _, LogicalValidity, Validity}; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant}; -use vortex_buffer::BufferMut; +use vortex_buffer::Buffer; use vortex_dtype::{NativePType, PType}; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::ScalarType; @@ -27,55 +26,69 @@ macro_rules! match_each_alp_float_ptype { }) } +pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult { + let (exponents, encoded, patches) = alp_encode_components(parray)?; + ALPArray::try_new(encoded, exponents, patches) +} + +pub fn alp_encode_components( + parray: &PrimitiveArray, +) -> VortexResult<(Exponents, ArrayData, Option)> { + match parray.ptype() { + PType::F32 => alp_encode_components_typed::(parray), + PType::F64 => alp_encode_components_typed::(parray), + _ => vortex_bail!("ALP can only encode f32 and f64"), + } +} + #[allow(clippy::cast_possible_truncation)] -pub fn alp_encode_components( +fn alp_encode_components_typed( values: &PrimitiveArray, - exponents: Option, ) -> VortexResult<(Exponents, ArrayData, Option)> where T: ALPFloat + NativePType, T::ALPInt: NativePType, T: ScalarType, { - let (exponents, encoded, exc_pos, exc) = T::encode(values.as_slice::(), exponents); - let len = encoded.len(); - let patches_validity = if values.dtype().is_nullable() { - Validity::AllValid - } else { - Validity::NonNullable - }; + let values_slice = values.as_slice::(); - let mut exc_pos_valid_only = - BufferMut::::with_capacity_aligned(exc_pos.len(), exc_pos.alignment()); - let mut exc_valid_only = BufferMut::::with_capacity_aligned(exc.len(), exc.alignment()); - for (position, value) in exc_pos.into_iter().zip_eq(exc.into_iter()) { - if values.is_valid(position as usize) { - exc_pos_valid_only.push(position); - exc_valid_only.push(value); - } - } + let exponents = T::find_best_exponents(values_slice); + let (encoded, exceptional_positions) = T::encode(values.as_slice::(), exponents); - Ok(( - exponents, - PrimitiveArray::new(encoded, values.validity()).into_array(), - (!exc_valid_only.is_empty()).then(|| { - let position_arr = exc_pos_valid_only.freeze().into_array(); - Patches::new( - len, - position_arr, - PrimitiveArray::new(exc_valid_only.freeze(), patches_validity).into_array(), - ) - }), - )) -} - -pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult { - let (exponents, encoded, patches) = match parray.ptype() { - PType::F32 => alp_encode_components::(parray, None)?, - PType::F64 => alp_encode_components::(parray, None)?, - _ => vortex_bail!("ALP can only encode f32 and f64"), + let encoded_array = PrimitiveArray::new(encoded, values.validity()).into_array(); + let exceptional_positions = match values.logical_validity() { + LogicalValidity::AllValid(_) => exceptional_positions, + LogicalValidity::AllInvalid(_) => Buffer::empty(), + LogicalValidity::Array(is_valid) => { + let is_valid_buf = is_valid.into_bool()?.boolean_buffer(); + exceptional_positions + .into_iter() + // index is a valid usize because it is an index into values.as_slice::() + .filter(|index| is_valid_buf.value(*index as usize)) + .collect() + } }; - ALPArray::try_new(encoded, exponents, patches) + let patches = if exceptional_positions.is_empty() { + None + } else { + let patches_validity = if values.dtype().is_nullable() { + Validity::AllValid + } else { + Validity::NonNullable + }; + let exceptional_values: Buffer = exceptional_positions + .iter() + .map(|index| values_slice[*index as usize]) + .collect(); + let exceptional_values = + PrimitiveArray::new(exceptional_values, patches_validity).into_array(); + Some(Patches::new( + values_slice.len(), + exceptional_positions.into_array(), + exceptional_values, + )) + }; + Ok((exponents, encoded_array, patches)) } pub fn decompress(array: ALPArray) -> VortexResult { diff --git a/encodings/alp/src/alp/mod.rs b/encodings/alp/src/alp/mod.rs index 0b95990aac5..e99176b012e 100644 --- a/encodings/alp/src/alp/mod.rs +++ b/encodings/alp/src/alp/mod.rs @@ -70,12 +70,10 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { for e in (0..Self::MAX_EXPONENT).rev() { for f in 0..e { - let (_, encoded, _, exc_patches) = Self::encode( - sample.as_deref().unwrap_or(values), - Some(Exponents { e, f }), - ); + let (encoded, exceptional_positions) = + Self::encode(sample.as_deref().unwrap_or(values), Exponents { e, f }); - let size = Self::estimate_encoded_size(&encoded, &exc_patches); + let size = Self::estimate_encoded_size(&encoded, exceptional_positions.len()); if size < best_nbytes { best_nbytes = size; best_exp = Exponents { e, f }; @@ -89,7 +87,7 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { } #[inline] - fn estimate_encoded_size(encoded: &[Self::ALPInt], patches: &[Self]) -> usize { + fn estimate_encoded_size(encoded: &[Self::ALPInt], n_exceptions: usize) -> usize { let bits_per_encoded = encoded .iter() .minmax() @@ -108,42 +106,33 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { let encoded_bytes = (encoded.len() * bits_per_encoded + 7) / 8; // each patch is a value + a position // in practice, patch positions are in [0, u16::MAX] because of how we chunk - let patch_bytes = patches.len() * (size_of::() + size_of::()); + let patch_bytes = n_exceptions * (size_of::() + size_of::()); encoded_bytes + patch_bytes } - fn encode( - values: &[Self], - exponents: Option, - ) -> (Exponents, Buffer, Buffer, Buffer) { - let exp = exponents.unwrap_or_else(|| Self::find_best_exponents(values)); - - let mut encoded_output = BufferMut::::with_capacity(values.len()); - let mut patch_indices = BufferMut::::with_capacity(values.len()); - let mut patch_values = BufferMut::::with_capacity(values.len()); - let mut fill_value: Option = None; - - // this is intentionally branchless - // we batch this into 32KB of values at a time to make it more L1 cache friendly - let encode_chunk_size: usize = (32 << 10) / size_of::(); - for chunk in values.chunks(encode_chunk_size) { - encode_chunk_unchecked( - chunk, - exp, - &mut encoded_output, - &mut patch_indices, - &mut patch_values, - &mut fill_value, - ); - } + /// ALP encode the given values using the given exponents. + /// + /// The index of each value for which encode-decode is not the identity function is returned. + fn encode(values: &[Self], exponents: Exponents) -> (Buffer, Buffer) { + let (encoded, needs_patch): (BufferMut, Vec) = values + .iter() + .map(|value| { + let maybe_encoded = unsafe { Self::encode_single_unchecked(*value, exponents) }; + let maybe_decoded = Self::decode_single(maybe_encoded, exponents); + let needs_patch = maybe_decoded != *value; + (maybe_encoded, needs_patch) + }) + .unzip(); + + let patch_indices: BufferMut = needs_patch + .into_iter() + .enumerate() + .filter(|(_, needs_patch)| *needs_patch) + .map(|(index, _)| index as u64) + .collect(); - ( - exp, - encoded_output.freeze(), - patch_indices.freeze(), - patch_values.freeze(), - ) + (encoded.freeze(), patch_indices.freeze()) } #[inline] @@ -185,7 +174,7 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { } #[allow(clippy::cast_possible_truncation)] -fn encode_chunk_unchecked( +fn _encode_chunk_unchecked( chunk: &[T], exp: Exponents, encoded_output: &mut BufferMut, diff --git a/vortex-sampling-compressor/src/compressors/alp.rs b/vortex-sampling-compressor/src/compressors/alp.rs index 5bb62a7d002..878642c7c0a 100644 --- a/vortex-sampling-compressor/src/compressors/alp.rs +++ b/vortex-sampling-compressor/src/compressors/alp.rs @@ -1,5 +1,6 @@ use vortex_alp::{ - alp_encode_components, match_each_alp_float_ptype, ALPArray, ALPEncoding, ALPRDEncoding, + alp_encode_components, alp_encode_components_typed, match_each_alp_float_ptype, ALPArray, + ALPEncoding, ALPRDEncoding, }; use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::PrimitiveArray; @@ -44,12 +45,8 @@ impl EncodingCompressor for ALPCompressor { like: Option>, ctx: SamplingCompressor<'a>, ) -> VortexResult> { - let parray = array.clone().into_primitive()?; - - let (exponents, encoded, patches) = match_each_alp_float_ptype!( - parray.ptype(), |$T| { - alp_encode_components::<$T>(&parray, None) - })?; + let (exponents, encoded, patches) = + alp_encode_components(&array.clone().into_primitive()?)?; let compressed_encoded = ctx .named("packed") From 50e55ac09b0515b1a0ab06936b1a533092a53555 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Fri, 17 Jan 2025 16:18:44 -0500 Subject: [PATCH 10/12] chunked_encode matches latency of old code --- encodings/alp/src/alp/compress.rs | 2 +- encodings/alp/src/alp/mod.rs | 38 +++++++++++++++++++++++++------ 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/encodings/alp/src/alp/compress.rs b/encodings/alp/src/alp/compress.rs index 1dbfd814876..87c909abe38 100644 --- a/encodings/alp/src/alp/compress.rs +++ b/encodings/alp/src/alp/compress.rs @@ -53,7 +53,7 @@ where let values_slice = values.as_slice::(); let exponents = T::find_best_exponents(values_slice); - let (encoded, exceptional_positions) = T::encode(values.as_slice::(), exponents); + let (encoded, exceptional_positions) = T::chunked_encode(values.as_slice::(), exponents); let encoded_array = PrimitiveArray::new(encoded, values.validity()).into_array(); let exceptional_positions = match values.logical_validity() { diff --git a/encodings/alp/src/alp/mod.rs b/encodings/alp/src/alp/mod.rs index e99176b012e..a879331f80d 100644 --- a/encodings/alp/src/alp/mod.rs +++ b/encodings/alp/src/alp/mod.rs @@ -111,28 +111,52 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { encoded_bytes + patch_bytes } + /// A quantity of [Self] expected to fit into L1 cache. + const ENCODE_CHUNK_SIZE: usize = (32 << 10) / size_of::(); + + /// ALP encode chunk-by-chunk. + /// + /// Unlike [Self::encode], this operation processes no more than [Self::ENCODE_CHUNK_SIZE] + /// elements at once which can make better use of the L1 cache because [Self::encode] makes two + /// passes over `values`: first to encode and second to extract the exceptional values. + fn chunked_encode( + values: &[Self], + exponents: Exponents, + ) -> (Buffer, Buffer) { + let mut encoded = BufferMut::::with_capacity(values.len()); + let mut patch_indices = BufferMut::::empty(); + for chunk in values.chunks(Self::ENCODE_CHUNK_SIZE) { + let (encoded_chunk, patches_indices_chunk) = Self::encode(chunk, exponents); + encoded.extend(encoded_chunk); + patch_indices.extend(patches_indices_chunk); + } + (encoded.freeze(), patch_indices.freeze()) + } + /// ALP encode the given values using the given exponents. /// /// The index of each value for which encode-decode is not the identity function is returned. - fn encode(values: &[Self], exponents: Exponents) -> (Buffer, Buffer) { - let (encoded, needs_patch): (BufferMut, Vec) = values + /// + /// See also: [Self::chunked_encode]. + fn encode(values: &[Self], exponents: Exponents) -> (Vec, Vec) { + let (encoded, needs_patch): (Vec, Vec) = values .iter() .map(|value| { - let maybe_encoded = unsafe { Self::encode_single_unchecked(*value, exponents) }; - let maybe_decoded = Self::decode_single(maybe_encoded, exponents); + let encoded = unsafe { Self::encode_single_unchecked(*value, exponents) }; + let maybe_decoded = Self::decode_single(encoded, exponents); let needs_patch = maybe_decoded != *value; - (maybe_encoded, needs_patch) + (encoded, needs_patch) }) .unzip(); - let patch_indices: BufferMut = needs_patch + let patch_indices: Vec = needs_patch .into_iter() .enumerate() .filter(|(_, needs_patch)| *needs_patch) .map(|(index, _)| index as u64) .collect(); - (encoded.freeze(), patch_indices.freeze()) + (encoded, patch_indices) } #[inline] From 1e0367029b94df0b42ea0255a4aedbe5a14e0e38 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Fri, 17 Jan 2025 16:38:35 -0500 Subject: [PATCH 11/12] account for nulls in true count --- encodings/runend/src/array.rs | 46 ++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/encodings/runend/src/array.rs b/encodings/runend/src/array.rs index 3668904502d..a78989b932a 100644 --- a/encodings/runend/src/array.rs +++ b/encodings/runend/src/array.rs @@ -239,19 +239,41 @@ impl StatisticsVTable for RunEndEncoding { DType::Bool(_) => { let ends = array.ends().into_primitive()?; let bools = array.values().into_bool()?.boolean_buffer(); - let true_count: u64 = match_each_unsigned_integer_ptype!(ends.ptype(), |$P| { - let mut begin: $P = 0; - ends - .as_slice::<$P>() - .iter() - .enumerate() - .map(|(index, end)| { - let len = *end - begin; - begin = *end; - (len as u64) * (bools.value(index as usize) as u64) + + let true_count: u64 = match array.values().logical_validity() { + LogicalValidity::AllValid(_) => { + match_each_unsigned_integer_ptype!(ends.ptype(), |$P| { + let mut begin: $P = 0; + ends + .as_slice::<$P>() + .iter() + .enumerate() + .map(|(index, end)| { + let len = *end - begin; + begin = *end; + (len as u64) * (bools.value(index as usize) as u64) + }) + .sum() + }) + } + LogicalValidity::AllInvalid(_) => 0, + LogicalValidity::Array(is_valid) => { + let is_valid = is_valid.into_bool()?.boolean_buffer(); + match_each_unsigned_integer_ptype!(ends.ptype(), |$P| { + let mut begin: $P = 0; + ends + .as_slice::<$P>() + .iter() + .enumerate() + .map(|(index, end)| { + let len = *end - begin; + begin = *end; + (len as u64) * (bools.value(index as usize) as u64) * (is_valid.value(index as usize) as u64) + }) + .sum() }) - .sum() - }); + } + }; Some(Scalar::from(true_count)) } DType::Primitive(..) => None, From f427d8d556f850926fee58f468c3cc168ed57337 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Fri, 17 Jan 2025 17:11:39 -0500 Subject: [PATCH 12/12] feat: teach RunEndArray NullCount and TrueCount --- encodings/runend/src/array.rs | 201 +++++++++++++++++++++++++++++++--- 1 file changed, 184 insertions(+), 17 deletions(-) diff --git a/encodings/runend/src/array.rs b/encodings/runend/src/array.rs index 8a6c8a25080..caa06b8946a 100644 --- a/encodings/runend/src/array.rs +++ b/encodings/runend/src/array.rs @@ -16,7 +16,7 @@ use vortex_array::{ IntoCanonical, }; use vortex_buffer::Buffer; -use vortex_dtype::{DType, PType}; +use vortex_dtype::{match_each_unsigned_integer_ptype, DType, PType}; use vortex_error::{vortex_bail, VortexExpect as _, VortexResult}; use vortex_scalar::Scalar; @@ -227,31 +227,113 @@ impl VisitorVTable for RunEndEncoding { impl StatisticsVTable for RunEndEncoding { fn compute_statistics(&self, array: &RunEndArray, stat: Stat) -> VortexResult { - let maybe_stat = match stat { - Stat::Min | Stat::Max => array.values().statistics().compute(stat), - Stat::IsSorted => Some(Scalar::from( - array - .values() - .statistics() - .compute_is_sorted() - .unwrap_or(false) - && array.logical_validity().all_valid(), - )), - _ => None, + let mut stats = StatsSet::default(); + + match stat { + Stat::Min | Stat::Max => { + if let Some(extrema) = array.values().statistics().compute(stat) { + stats.set(stat, extrema); + } + } + Stat::IsSorted => { + let is_sorted = Scalar::from( + array + .values() + .statistics() + .compute_is_sorted() + .unwrap_or(false) + && array.logical_validity().all_valid(), + ); + stats.set(stat, is_sorted); + } + Stat::TrueCount => match array.dtype() { + DType::Bool(_) => { + let ends = array.ends().into_primitive()?; + let bools = array.values().into_bool()?.boolean_buffer(); + let mut true_count: u64 = 0; + let mut null_count: u64 = 0; + + match array.values().logical_validity() { + LogicalValidity::AllValid(_) => { + null_count = 0; + true_count = match_each_unsigned_integer_ptype!(ends.ptype(), |$P| { + let mut begin = array.offset() as $P; + ends + .as_slice::<$P>() + .iter() + .enumerate() + .map(|(index, end)| { + let len = *end - begin; + begin = *end; + (len as u64) * (bools.value(index as usize) as u64) + }) + .sum() + }); + } + LogicalValidity::AllInvalid(_) => { + null_count = array.len() as u64; + true_count = 0; + } + LogicalValidity::Array(is_valid) => { + let is_valid = is_valid.into_bool()?.boolean_buffer(); + + match_each_unsigned_integer_ptype!(ends.ptype(), |$P| { + let mut begin = array.offset() as $P; + for (index, end) in ends.as_slice::<$P>().iter().enumerate() { + let len = *end - begin; + begin = *end; + true_count += (len as u64) * (bools.value(index as usize) as u64) * (is_valid.value(index as usize) as u64); + null_count += (len as u64) * (is_valid.value(index as usize) as u64); + } + }); + } + }; + + stats.set(Stat::TrueCount, true_count); + stats.set(Stat::NullCount, null_count); + } + DType::Primitive(..) => {} + dtype => vortex_bail!("invalid dtype: {}", dtype), + }, + Stat::NullCount => { + let ends = array.ends().into_primitive()?; + let null_count: u64 = match array.values().logical_validity() { + LogicalValidity::AllValid(_) => 0_u64, + LogicalValidity::AllInvalid(_) => array.len() as u64, + LogicalValidity::Array(is_valid) => { + let is_valid = is_valid.into_bool()?.boolean_buffer(); + match_each_unsigned_integer_ptype!(ends.ptype(), |$P| { + let mut begin = array.offset() as $P; + ends + .as_slice::<$P>() + .iter() + .enumerate() + .map(|(index, end)| { + let len = *end - begin; + begin = *end; + (len as u64) * ((!is_valid.value(index as usize)) as u64) + }) + .sum() + }) + } + }; + stats.set(stat, null_count); + } + _ => {} }; - let mut stats = StatsSet::default(); - if let Some(stat_value) = maybe_stat { - stats.set(stat, stat_value); - } Ok(stats) } } #[cfg(test)] mod tests { - use vortex_array::compute::scalar_at; + use arrow_buffer::BooleanBuffer; + use vortex_array::array::BoolArray; + use vortex_array::compute::{scalar_at, slice}; + use vortex_array::stats::{ArrayStatistics as _, Stat}; use vortex_array::test_harness::check_metadata; + use vortex_array::validity::Validity; use vortex_array::{ArrayDType, ArrayLen, IntoArrayData}; use vortex_buffer::buffer; use vortex_dtype::{DType, Nullability, PType}; @@ -292,4 +374,89 @@ mod tests { assert_eq!(scalar_at(arr.as_ref(), 5).unwrap(), 3.into()); assert_eq!(scalar_at(arr.as_ref(), 9).unwrap(), 3.into()); } + + #[test] + fn test_runend_int_stats() { + let arr = RunEndArray::try_new( + buffer![2u32, 5, 10].into_array(), + buffer![1i32, 2, 3].into_array(), + ) + .unwrap(); + + assert_eq!(arr.statistics().compute_as::(Stat::Min).unwrap(), 1); + assert_eq!(arr.statistics().compute_as::(Stat::Max).unwrap(), 3); + assert_eq!( + arr.statistics().compute_as::(Stat::NullCount).unwrap(), + 0 + ); + assert!(arr.statistics().compute_as::(Stat::IsSorted).unwrap()); + } + + #[test] + fn test_runend_bool_stats() { + let arr = RunEndArray::try_new( + buffer![2u32, 5, 10].into_array(), + BoolArray::try_new( + BooleanBuffer::from_iter([true, true, false]), + Validity::Array(BoolArray::from_iter([true, false, true]).into_array()), + ) + .unwrap() + .into_array(), + ) + .unwrap(); + + assert!(!arr.statistics().compute_as::(Stat::Min).unwrap()); + assert!(arr.statistics().compute_as::(Stat::Max).unwrap()); + assert_eq!( + arr.statistics().compute_as::(Stat::NullCount).unwrap(), + 3 + ); + assert!(!arr.statistics().compute_as::(Stat::IsSorted).unwrap()); + assert_eq!( + arr.statistics().compute_as::(Stat::TrueCount).unwrap(), + 2 + ); + + let sliced = slice(arr, 4, 7).unwrap(); + + assert!(!sliced.statistics().compute_as::(Stat::Min).unwrap()); + assert!(!sliced.statistics().compute_as::(Stat::Max).unwrap()); + assert_eq!( + sliced + .statistics() + .compute_as::(Stat::NullCount) + .unwrap(), + 1 + ); + // Not sorted because null must come last + assert!(!sliced + .statistics() + .compute_as::(Stat::IsSorted) + .unwrap()); + assert_eq!( + sliced + .statistics() + .compute_as::(Stat::TrueCount) + .unwrap(), + 0 + ); + } + + #[test] + fn test_all_invalid_true_count() { + let arr = RunEndArray::try_new( + buffer![2u32, 5, 10].into_array(), + BoolArray::from_iter([None, None, None]).into_array(), + ) + .unwrap() + .into_array(); + assert_eq!( + arr.statistics().compute_as::(Stat::TrueCount).unwrap(), + 0 + ); + assert_eq!( + arr.statistics().compute_as::(Stat::NullCount).unwrap(), + 10 + ); + } }