diff --git a/Cargo.lock b/Cargo.lock index cc3efe58550..06792ff57ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5138,6 +5138,7 @@ dependencies = [ "divan", "itertools 0.14.0", "num-traits", + "rand", "rstest", "serde", "vortex-array", diff --git a/encodings/alp/Cargo.toml b/encodings/alp/Cargo.toml index f7462689693..b5328894246 100644 --- a/encodings/alp/Cargo.toml +++ b/encodings/alp/Cargo.toml @@ -30,6 +30,7 @@ vortex-scalar = { workspace = true } [dev-dependencies] divan = { workspace = true } +rand = { workspace = true } rstest = { workspace = true } vortex-array = { workspace = true, features = ["test-harness"] } diff --git a/encodings/alp/benches/alp_compress.rs b/encodings/alp/benches/alp_compress.rs index d51d34038de..d63140c7727 100644 --- a/encodings/alp/benches/alp_compress.rs +++ b/encodings/alp/benches/alp_compress.rs @@ -1,27 +1,101 @@ #![allow(clippy::unwrap_used)] use divan::Bencher; -use vortex_alp::{ALPFloat, ALPRDFloat, Exponents, RDEncoder}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng as _}; +use vortex_alp::{alp_encode, ALPFloat, ALPRDFloat, RDEncoder}; use vortex_array::array::PrimitiveArray; use vortex_array::validity::Validity; use vortex_array::IntoCanonical; -use vortex_buffer::{buffer, Buffer}; +use vortex_buffer::buffer; +use vortex_dtype::NativePType; fn main() { divan::main(); } -#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])] -fn compress_alp(n: usize) -> (Exponents, Buffer, Buffer, Buffer) { - let values: Vec = vec![T::from(1.234).unwrap(); n]; - T::encode(values.as_slice(), None) +#[divan::bench(types = [f32, f64], args = [ + (100_000, 0.0, 0.25), + (100_000, 0.01, 0.25), + (100_000, 0.1, 0.25), + (10_000_000, 0.0, 0.25), + (10_000_000, 0.01, 0.25), + (10_000_000, 0.1, 0.25), + (100_000, 0.0, 0.95), + (100_000, 0.01, 0.95), + (100_000, 0.1, 0.95), + (10_000_000, 0.0, 0.95), + (10_000_000, 0.01, 0.95), + (10_000_000, 0.1, 0.95), + (100_000, 0.0, 1.0), + (100_000, 0.01, 1.0), + (100_000, 0.1, 1.0), + (10_000_000, 0.0, 1.0), + (10_000_000, 0.01, 1.0), + (10_000_000, 0.1, 1.0), +])] +fn compress_alp(bencher: Bencher, args: (usize, f64, f64)) { + let (n, fraction_patch, fraction_valid) = args; + let mut rng = StdRng::seed_from_u64(0); + let mut values = buffer![T::from(1.234).unwrap(); n].into_mut(); + if fraction_patch > 0.0 { + for index in 0..values.len() { + if rng.gen_bool(fraction_patch) { + values[index] = T::from(1000.0).unwrap() + } + } + } + let validity = if fraction_valid < 1.0 { + Validity::from_iter((0..values.len()).map(|_| rng.gen_bool(fraction_valid))) + } else { + Validity::NonNullable + }; + let values = values.freeze(); + + bencher.bench_local(move || { + alp_encode(&PrimitiveArray::new(values.clone(), validity.clone())).unwrap() + }) } -#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])] -fn decompress_alp(bencher: Bencher, n: usize) { - let values: Vec = vec![T::from(1.234).unwrap(); n]; - let (exponents, encoded, ..) = T::encode(values.as_slice(), None); - bencher.bench_local(move || T::decode(&encoded, exponents)); +#[divan::bench(types = [f32, f64], args = [ + (100_000, 0.0, 0.25), + (100_000, 0.01, 0.25), + (100_000, 0.1, 0.25), + (10_000_000, 0.0, 0.25), + (10_000_000, 0.01, 0.25), + (10_000_000, 0.1, 0.25), + (100_000, 0.0, 0.95), + (100_000, 0.01, 0.95), + (100_000, 0.1, 0.95), + (10_000_000, 0.0, 0.95), + (10_000_000, 0.01, 0.95), + (10_000_000, 0.1, 0.95), + (100_000, 0.0, 1.0), + (100_000, 0.01, 1.0), + (100_000, 0.1, 1.0), + (10_000_000, 0.0, 1.0), + (10_000_000, 0.01, 1.0), + (10_000_000, 0.1, 1.0), +])] +fn decompress_alp(bencher: Bencher, args: (usize, f64, f64)) { + let (n, fraction_patch, fraction_valid) = args; + let mut rng = StdRng::seed_from_u64(0); + let mut values = buffer![T::from(1.234).unwrap(); n].into_mut(); + if fraction_patch > 0.0 { + for index in 0..values.len() { + if rng.gen_bool(fraction_patch) { + values[index] = T::from(1000.0).unwrap() + } + } + } + let validity = if fraction_valid < 1.0 { + Validity::from_iter((0..values.len()).map(|_| rng.gen_bool(fraction_valid))) + } else { + Validity::NonNullable + }; + let values = values.freeze(); + let array = alp_encode(&PrimitiveArray::new(values, validity)).unwrap(); + bencher.bench_local(move || array.clone().into_canonical().unwrap()); } #[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])] diff --git a/encodings/alp/src/alp/array.rs b/encodings/alp/src/alp/array.rs index d596e471489..6be83f42b11 100644 --- a/encodings/alp/src/alp/array.rs +++ b/encodings/alp/src/alp/array.rs @@ -46,6 +46,14 @@ impl ALPArray { let mut children = Vec::with_capacity(2); children.push(encoded); if let Some(patches) = &patches { + if patches.dtype() != &dtype { + vortex_bail!(MismatchedTypes: dtype, patches.dtype()); + } + + if !patches.values().all_valid()? { + vortex_bail!("ALPArray: patches must not contain invalid entries"); + } + children.push(patches.indices().clone()); children.push(patches.values().clone()); } diff --git a/encodings/alp/src/alp/compress.rs b/encodings/alp/src/alp/compress.rs index 3a40cd57079..2d4b14e1ff5 100644 --- a/encodings/alp/src/alp/compress.rs +++ b/encodings/alp/src/alp/compress.rs @@ -1,9 +1,13 @@ +use itertools::Itertools as _; use vortex_array::array::PrimitiveArray; use vortex_array::patches::Patches; +use vortex_array::validity::Validity; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{Array, IntoArray, IntoArrayVariant}; +use vortex_buffer::{Buffer, BufferMut}; use vortex_dtype::{NativePType, PType}; -use vortex_error::{vortex_bail, VortexResult, VortexUnwrap}; +use vortex_error::{vortex_bail, VortexResult}; +use vortex_mask::Mask; use vortex_scalar::ScalarType; use crate::alp::{ALPArray, ALPFloat}; @@ -24,39 +28,74 @@ macro_rules! match_each_alp_float_ptype { }) } -pub fn alp_encode_components( +pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult { + let (exponents, encoded, patches) = alp_encode_components(parray)?; + ALPArray::try_new(encoded, exponents, patches) +} + +pub fn alp_encode_components( + parray: &PrimitiveArray, +) -> VortexResult<(Exponents, Array, Option)> { + match parray.ptype() { + PType::F32 => alp_encode_components_typed::(parray), + PType::F64 => alp_encode_components_typed::(parray), + _ => vortex_bail!("ALP can only encode f32 and f64"), + } +} + +#[allow(clippy::cast_possible_truncation)] +fn alp_encode_components_typed( values: &PrimitiveArray, - exponents: Option, -) -> (Exponents, Array, Option) +) -> VortexResult<(Exponents, Array, Option)> where T: ALPFloat + NativePType, T::ALPInt: NativePType, T: ScalarType, { - let (exponents, encoded, exc_pos, exc) = T::encode(values.as_slice::(), exponents); - let len = encoded.len(); - ( - exponents, - PrimitiveArray::new(encoded, values.validity()).into_array(), - (!exc.is_empty()).then(|| { - let position_arr = exc_pos.into_array(); - let patch_validity = values.validity().take(&position_arr).vortex_unwrap(); - Patches::new( - len, - position_arr, - PrimitiveArray::new(exc, patch_validity).into_array(), - ) - }), - ) -} + let values_slice = values.as_slice::(); -pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult { - let (exponents, encoded, patches) = match parray.ptype() { - PType::F32 => alp_encode_components::(parray, None), - PType::F64 => alp_encode_components::(parray, None), - _ => vortex_bail!("ALP can only encode f32 and f64"), + let (exponents, encoded, exceptional_positions, exceptional_values) = + T::encode(values_slice, None); + + let encoded_array = PrimitiveArray::new(encoded, values.validity()).into_array(); + + let validity = values.validity_mask()?; + // exceptional_positions may contain exceptions at invalid positions (which contain garbage + // data). We remove invalid exceptional positions in order to keep the Patches small. + let (valid_exceptional_positions, valid_exceptional_values): (Buffer, Buffer) = + match validity { + Mask::AllTrue(_) => (exceptional_positions, exceptional_values), + Mask::AllFalse(_) => { + // no valid positions, ergo nothing worth patching + (Buffer::empty(), Buffer::empty()) + } + Mask::Values(is_valid) => { + let (pos, vals): (BufferMut, BufferMut) = exceptional_positions + .into_iter() + .zip_eq(exceptional_values) + .filter(|(index, _)| is_valid.value(*index as usize)) + .unzip(); + (pos.freeze(), vals.freeze()) + } + }; + let patches = if valid_exceptional_positions.is_empty() { + None + } else { + let patches_validity = if values.dtype().is_nullable() { + Validity::AllValid + } else { + Validity::NonNullable + }; + let valid_exceptional_values = + PrimitiveArray::new(valid_exceptional_values, patches_validity).into_array(); + + Some(Patches::new( + values_slice.len(), + valid_exceptional_positions.into_array(), + valid_exceptional_values, + )) }; - ALPArray::try_new(encoded, exponents, patches) + Ok((exponents, encoded_array, patches)) } pub fn decompress(array: ALPArray) -> VortexResult { @@ -85,6 +124,7 @@ mod tests { use vortex_array::compute::scalar_at; use vortex_array::validity::Validity; use vortex_buffer::{buffer, Buffer}; + use vortex_scalar::Scalar; use super::*; @@ -128,7 +168,7 @@ mod tests { } #[test] - #[allow(clippy::approx_constant)] // ALP doesn't like E + #[allow(clippy::approx_constant)] // Clippy objects to 2.718, an approximation of e, the base of the natural logarithm. fn test_patched_compress() { let values = buffer![1.234f64, 2.718, f64::consts::PI, 4.0]; let array = PrimitiveArray::new(values.clone(), Validity::NonNullable); @@ -140,7 +180,7 @@ mod tests { .into_primitive() .unwrap() .as_slice::(), - vec![1234i64, 2718, 1234, 4000] // fill forward + vec![1234i64, 2718, 1234, 4000] ); assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 }); @@ -148,6 +188,39 @@ mod tests { assert_eq!(values.as_slice(), decoded.as_slice::()); } + #[test] + #[allow(clippy::approx_constant)] // Clippy objects to 2.718, an approximation of e, the base of the natural logarithm. + fn test_compress_ignores_invalid_exceptional_values() { + let values = buffer![1.234f64, 2.718, f64::consts::PI, 4.0]; + let array = PrimitiveArray::new(values, Validity::from_iter([true, true, false, true])); + let encoded = alp_encode(&array).unwrap(); + assert!(encoded.patches().is_none()); + assert_eq!( + encoded + .encoded() + .into_primitive() + .unwrap() + .as_slice::(), + vec![1234i64, 2718, 1234, 4000] + ); + assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 }); + + let decoded = decompress(encoded).unwrap(); + assert_eq!( + scalar_at(&decoded, 0).unwrap(), + scalar_at(&array, 0).unwrap() + ); + assert_eq!( + scalar_at(&decoded, 1).unwrap(), + scalar_at(&array, 1).unwrap() + ); + assert!(!decoded.is_valid(2).unwrap()); + assert_eq!( + scalar_at(&decoded, 3).unwrap(), + scalar_at(&array, 3).unwrap() + ); + } + #[test] #[allow(clippy::approx_constant)] // ALP doesn't like E fn test_nullable_patched_scalar_at() { @@ -168,6 +241,7 @@ mod tests { assert!(s.is_valid()); } + assert!(!encoded.is_valid(4).unwrap()); let s = scalar_at(encoded.as_ref(), 4).unwrap(); assert!(s.is_null()); @@ -190,7 +264,23 @@ mod tests { ); let alp_arr = alp_encode(&original).unwrap(); let decompressed = alp_arr.into_primitive().unwrap(); - assert_eq!(original.as_slice::(), decompressed.as_slice::()); + assert_eq!( + // The second and third values become exceptions and are replaced + [195.26274, 195.26274, 195.26274], + decompressed.as_slice::() + ); assert_eq!(original.validity(), decompressed.validity()); + assert_eq!( + scalar_at(&original, 0).unwrap(), + Scalar::null_typed::() + ); + assert_eq!( + scalar_at(&original, 1).unwrap(), + Scalar::null_typed::() + ); + assert_eq!( + scalar_at(&original, 2).unwrap(), + Scalar::null_typed::() + ); } } diff --git a/encodings/alp/src/alp/compute/mod.rs b/encodings/alp/src/alp/compute/mod.rs index bc857290d48..dc189c111a9 100644 --- a/encodings/alp/src/alp/compute/mod.rs +++ b/encodings/alp/src/alp/compute/mod.rs @@ -36,9 +36,13 @@ impl ComputeVTable for ALPEncoding { impl ScalarAtFn for ALPEncoding { fn scalar_at(&self, array: &ALPArray, index: usize) -> VortexResult { + if !array.encoded().is_valid(index)? { + return Ok(Scalar::null(array.dtype().clone())); + } + if let Some(patches) = array.patches() { if let Some(patch) = patches.get_patched(index)? { - return Ok(patch); + return patch.cast(array.dtype()); } } diff --git a/vortex-sampling-compressor/src/compressors/alp.rs b/vortex-sampling-compressor/src/compressors/alp.rs index 4e82c88ed1b..9fb53c17dae 100644 --- a/vortex-sampling-compressor/src/compressors/alp.rs +++ b/vortex-sampling-compressor/src/compressors/alp.rs @@ -1,6 +1,4 @@ -use vortex_alp::{ - alp_encode_components, match_each_alp_float_ptype, ALPArray, ALPEncoding, ALPRDEncoding, -}; +use vortex_alp::{alp_encode_components, ALPArray, ALPEncoding, ALPRDEncoding}; use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::PrimitiveArray; use vortex_array::variants::PrimitiveArrayTrait; @@ -43,12 +41,8 @@ impl EncodingCompressor for ALPCompressor { like: Option>, ctx: SamplingCompressor<'a>, ) -> VortexResult> { - let parray = array.clone().into_primitive()?; - - let (exponents, encoded, patches) = match_each_alp_float_ptype!( - parray.ptype(), |$T| { - alp_encode_components::<$T>(&parray, None) - }); + let (exponents, encoded, patches) = + alp_encode_components(&array.clone().into_primitive()?)?; let compressed_encoded = ctx .named("packed")