diff --git a/encodings/alp/src/alp.rs b/encodings/alp/src/alp.rs index 6cf8f6b1b94..a7f6a8b1055 100644 --- a/encodings/alp/src/alp.rs +++ b/encodings/alp/src/alp.rs @@ -2,9 +2,8 @@ use std::fmt::{Display, Formatter}; use std::mem::size_of; use itertools::Itertools; -use num_traits::{CheckedSub, Float, NumCast, PrimInt, ToPrimitive, Zero}; +use num_traits::{CheckedSub, Float, PrimInt, ToPrimitive}; use serde::{Deserialize, Serialize}; -use vortex_error::vortex_panic; const SAMPLE_SIZE: usize = 32; @@ -35,10 +34,11 @@ pub trait ALPFloat: Float + Display + 'static { (self + Self::SWEET) - Self::SWEET } - #[inline] - fn as_int(self) -> Option { - ::from(self) - } + /// Equivalent to calling `as` to cast the primitive float to the target integer type. + fn as_int(self) -> Self::ALPInt; + + /// Convert from the integer type back to the float type using `as`. + fn from_int(n: Self::ALPInt) -> Self; fn find_best_exponents(values: &[Self]) -> Exponents { let mut best_exp = Exponents { e: 0, f: 0 }; @@ -72,7 +72,7 @@ pub trait ALPFloat: Float + Display + 'static { best_exp } - #[inline(always)] + #[inline] fn estimate_encoded_size(encoded: &[Self::ALPInt], patches: &[Self]) -> usize { let bits_per_encoded = encoded .iter() @@ -103,56 +103,126 @@ pub trait ALPFloat: Float + Display + 'static { ) -> (Exponents, Vec, Vec, Vec) { let exp = exponents.unwrap_or_else(|| Self::find_best_exponents(values)); - let mut exc_pos = Vec::new(); - let mut exc_value = Vec::new(); - let mut prev = Self::ALPInt::zero(); - let encoded = values - .iter() - .enumerate() - .map(|(i, v)| { - match Self::encode_single(*v, exp) { - Ok(fi) => { - prev = fi; - fi - } - Err(exc) => { - exc_pos.push(i as u64); - exc_value.push(exc); - // Emit the last known good value. This helps with run-end encoding. - prev - } - } - }) - .collect_vec(); + let mut encoded_output = Vec::with_capacity(values.len()); + let mut patch_indices = Vec::new(); + let mut patch_values = Vec::new(); + let mut fill_value: Option = None; - (exp, encoded, exc_pos, exc_value) + // this is intentionally branchless + // we batch this into 32KB of values at a time to make it more L1 cache friendly + let encode_chunk_size: usize = (32 << 10) / size_of::(); + for chunk in values.chunks(encode_chunk_size) { + encode_chunk_unchecked( + chunk, + exp, + &mut encoded_output, + &mut patch_indices, + &mut patch_values, + &mut fill_value, + ); + } + + (exp, encoded_output, patch_indices, patch_values) } #[inline] fn encode_single(value: Self, exponents: Exponents) -> Result { - let encoded = (value * Self::F10[exponents.e as usize] * Self::IF10[exponents.f as usize]) - .fast_round(); - if let Some(e) = encoded.as_int() { - let decoded = Self::decode_single(e, exponents); - if decoded == value { - return Ok(e); - } + let encoded = unsafe { Self::encode_single_unchecked(value, exponents) }; + let decoded = Self::decode_single(encoded, exponents); + if decoded == value { + return Ok(encoded); } - Err(value) } #[inline] fn decode_single(encoded: Self::ALPInt, exponents: Exponents) -> Self { - let encoded_float: Self = Self::from(encoded).unwrap_or_else(|| { - vortex_panic!( - "Failed to convert encoded value {} from {} to {} in ALPFloat::decode_single", - encoded, - std::any::type_name::(), - std::any::type_name::() - ) - }); - encoded_float * Self::F10[exponents.f as usize] * Self::IF10[exponents.e as usize] + Self::from_int(encoded) * Self::F10[exponents.f as usize] * Self::IF10[exponents.e as usize] + } + + /// # Safety + /// + /// The returned value may not decode back to the original value. + #[inline(always)] + unsafe fn encode_single_unchecked(value: Self, exponents: Exponents) -> Self::ALPInt { + (value * Self::F10[exponents.e as usize] * Self::IF10[exponents.f as usize]) + .fast_round() + .as_int() + } +} + +fn encode_chunk_unchecked( + chunk: &[T], + exp: Exponents, + encoded_output: &mut Vec, + patch_indices: &mut Vec, + patch_values: &mut Vec, + fill_value: &mut Option, +) { + let num_prev_encoded = encoded_output.len(); + let num_prev_patches = patch_indices.len(); + assert_eq!(patch_indices.len(), patch_values.len()); + let has_filled = fill_value.is_some(); + + // encode the chunk, counting the number of patches + let mut chunk_patch_count = 0; + encoded_output.extend(chunk.iter().map(|v| { + let encoded = unsafe { T::encode_single_unchecked(*v, exp) }; + let decoded = T::decode_single(encoded, exp); + let neq = (decoded != *v) as usize; + chunk_patch_count += neq; + encoded + })); + let chunk_patch_count = chunk_patch_count; // immutable hereafter + assert_eq!(encoded_output.len(), num_prev_encoded + chunk.len()); + + // find the first successfully encoded value (i.e., not patched) + // this is our fill value for missing values + if fill_value.is_none() && (num_prev_encoded + chunk_patch_count < encoded_output.len()) { + assert_eq!(num_prev_encoded, num_prev_patches); + for i in num_prev_encoded..encoded_output.len() { + if i >= patch_indices.len() || patch_indices[i] != i as u64 { + *fill_value = Some(encoded_output[i]); + break; + } + } + } + + // if there are no patches, we are done + if chunk_patch_count == 0 { + return; + } + + // we need to gather the patches for this chunk + // preallocate space for the patches (plus one because our loop may attempt to write one past the end) + patch_indices.reserve(chunk_patch_count + 1); + patch_values.reserve(chunk_patch_count + 1); + + // record the patches in this chunk + let patch_indices_mut = patch_indices.spare_capacity_mut(); + let patch_values_mut = patch_values.spare_capacity_mut(); + let mut chunk_patch_index = 0; + for i in num_prev_encoded..encoded_output.len() { + let decoded = T::decode_single(encoded_output[i], exp); + // write() is only safe to call more than once because the values are primitive (i.e., Drop is a no-op) + patch_indices_mut[chunk_patch_index].write(i as u64); + patch_values_mut[chunk_patch_index].write(chunk[i - num_prev_encoded]); + chunk_patch_index += (decoded != chunk[i - num_prev_encoded]) as usize; + } + assert_eq!(chunk_patch_index, chunk_patch_count); + unsafe { + patch_indices.set_len(num_prev_patches + chunk_patch_count); + patch_values.set_len(num_prev_patches + chunk_patch_count); + } + + // replace the patched values in the encoded array with the fill value + // for better downstream compression + if let Some(fill_value) = fill_value { + // handle the edge case where the first N >= 1 chunks are all patches + let start_patch = if !has_filled { 0 } else { num_prev_patches }; + for patch_idx in &patch_indices[start_patch..] { + encoded_output[*patch_idx as usize] = *fill_value; + } } } @@ -189,6 +259,16 @@ impl ALPFloat for f32 { 0.000000001, 0.0000000001, // 10^-10 ]; + + #[inline(always)] + fn as_int(self) -> Self::ALPInt { + self as _ + } + + #[inline(always)] + fn from_int(n: Self::ALPInt) -> Self { + n as _ + } } impl ALPFloat for f64 { @@ -250,4 +330,14 @@ impl ALPFloat for f64 { 0.0000000000000000000001, 0.00000000000000000000001, // 10^-23 ]; + + #[inline(always)] + fn as_int(self) -> Self::ALPInt { + self as _ + } + + #[inline(always)] + fn from_int(n: Self::ALPInt) -> Self { + n as _ + } } diff --git a/encodings/alp/src/compress.rs b/encodings/alp/src/compress.rs index e05c89485bb..6c4dfeba7a5 100644 --- a/encodings/alp/src/compress.rs +++ b/encodings/alp/src/compress.rs @@ -157,7 +157,7 @@ mod tests { assert!(encoded.patches().is_some()); assert_eq!( encoded.encoded().as_primitive().maybe_null_slice::(), - vec![1234i64, 2718, 2718, 4000] // fill forward + vec![1234i64, 2718, 1234, 4000] // fill forward ); assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });