Skip to content
182 changes: 136 additions & 46 deletions encodings/alp/src/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use std::fmt::{Display, Formatter};
use std::mem::size_of;

use itertools::Itertools;
use num_traits::{CheckedSub, Float, NumCast, PrimInt, ToPrimitive, Zero};
use num_traits::{CheckedSub, Float, PrimInt, ToPrimitive};
use serde::{Deserialize, Serialize};
use vortex_error::vortex_panic;

const SAMPLE_SIZE: usize = 32;

Expand Down Expand Up @@ -35,10 +34,11 @@ pub trait ALPFloat: Float + Display + 'static {
(self + Self::SWEET) - Self::SWEET
}

#[inline]
fn as_int(self) -> Option<Self::ALPInt> {
<Self::ALPInt as NumCast>::from(self)
}
/// Equivalent to calling `as` to cast the primitive float to the target integer type.
fn as_int(self) -> Self::ALPInt;

/// Convert from the integer type back to the float type using `as`.
fn from_int(n: Self::ALPInt) -> Self;
Comment thread
lwwmanning marked this conversation as resolved.

fn find_best_exponents(values: &[Self]) -> Exponents {
let mut best_exp = Exponents { e: 0, f: 0 };
Expand Down Expand Up @@ -72,7 +72,7 @@ pub trait ALPFloat: Float + Display + 'static {
best_exp
}

#[inline(always)]
#[inline]
fn estimate_encoded_size(encoded: &[Self::ALPInt], patches: &[Self]) -> usize {
let bits_per_encoded = encoded
.iter()
Expand Down Expand Up @@ -103,56 +103,126 @@ pub trait ALPFloat: Float + Display + 'static {
) -> (Exponents, Vec<Self::ALPInt>, Vec<u64>, Vec<Self>) {
let exp = exponents.unwrap_or_else(|| Self::find_best_exponents(values));

let mut exc_pos = Vec::new();
let mut exc_value = Vec::new();
let mut prev = Self::ALPInt::zero();
let encoded = values
.iter()
.enumerate()
.map(|(i, v)| {
match Self::encode_single(*v, exp) {
Ok(fi) => {
prev = fi;
fi
}
Err(exc) => {
exc_pos.push(i as u64);
exc_value.push(exc);
// Emit the last known good value. This helps with run-end encoding.
prev
}
}
})
.collect_vec();
let mut encoded_output = Vec::with_capacity(values.len());
let mut patch_indices = Vec::new();
let mut patch_values = Vec::new();
let mut fill_value: Option<Self::ALPInt> = None;

(exp, encoded, exc_pos, exc_value)
// this is intentionally branchless
// we batch this into 32KB of values at a time to make it more L1 cache friendly
let encode_chunk_size: usize = (32 << 10) / size_of::<Self::ALPInt>();
for chunk in values.chunks(encode_chunk_size) {
encode_chunk_unchecked(
chunk,
exp,
&mut encoded_output,
&mut patch_indices,
&mut patch_values,
&mut fill_value,
);
}

(exp, encoded_output, patch_indices, patch_values)
}

#[inline]
fn encode_single(value: Self, exponents: Exponents) -> Result<Self::ALPInt, Self> {
let encoded = (value * Self::F10[exponents.e as usize] * Self::IF10[exponents.f as usize])
.fast_round();
if let Some(e) = encoded.as_int() {
let decoded = Self::decode_single(e, exponents);
if decoded == value {
return Ok(e);
}
let encoded = unsafe { Self::encode_single_unchecked(value, exponents) };
let decoded = Self::decode_single(encoded, exponents);
if decoded == value {
return Ok(encoded);
}

Err(value)
}

#[inline]
fn decode_single(encoded: Self::ALPInt, exponents: Exponents) -> Self {
let encoded_float: Self = Self::from(encoded).unwrap_or_else(|| {
vortex_panic!(
"Failed to convert encoded value {} from {} to {} in ALPFloat::decode_single",
encoded,
std::any::type_name::<Self::ALPInt>(),
std::any::type_name::<Self>()
)
});
encoded_float * Self::F10[exponents.f as usize] * Self::IF10[exponents.e as usize]
Self::from_int(encoded) * Self::F10[exponents.f as usize] * Self::IF10[exponents.e as usize]
}

/// # Safety
///
/// The returned value may not decode back to the original value.
#[inline(always)]
unsafe fn encode_single_unchecked(value: Self, exponents: Exponents) -> Self::ALPInt {
(value * Self::F10[exponents.e as usize] * Self::IF10[exponents.f as usize])
.fast_round()
.as_int()
}
}

fn encode_chunk_unchecked<T: ALPFloat>(
chunk: &[T],
exp: Exponents,
encoded_output: &mut Vec<T::ALPInt>,
patch_indices: &mut Vec<u64>,
patch_values: &mut Vec<T>,
fill_value: &mut Option<T::ALPInt>,
) {
let num_prev_encoded = encoded_output.len();
let num_prev_patches = patch_indices.len();
assert_eq!(patch_indices.len(), patch_values.len());
let has_filled = fill_value.is_some();

// encode the chunk, counting the number of patches
let mut chunk_patch_count = 0;
encoded_output.extend(chunk.iter().map(|v| {
let encoded = unsafe { T::encode_single_unchecked(*v, exp) };
let decoded = T::decode_single(encoded, exp);
let neq = (decoded != *v) as usize;
chunk_patch_count += neq;
encoded
}));
let chunk_patch_count = chunk_patch_count; // immutable hereafter
assert_eq!(encoded_output.len(), num_prev_encoded + chunk.len());

// find the first successfully encoded value (i.e., not patched)
// this is our fill value for missing values
if fill_value.is_none() && (num_prev_encoded + chunk_patch_count < encoded_output.len()) {
assert_eq!(num_prev_encoded, num_prev_patches);
for i in num_prev_encoded..encoded_output.len() {
if i >= patch_indices.len() || patch_indices[i] != i as u64 {
*fill_value = Some(encoded_output[i]);
break;
}
}
}

// if there are no patches, we are done
if chunk_patch_count == 0 {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to handle the edge case of 2 chunks where chunk 0 is all patches, chunk 1 has 0 patches... which won't fill

return;
}

// we need to gather the patches for this chunk
// preallocate space for the patches (plus one because our loop may attempt to write one past the end)
patch_indices.reserve(chunk_patch_count + 1);
patch_values.reserve(chunk_patch_count + 1);

// record the patches in this chunk
let patch_indices_mut = patch_indices.spare_capacity_mut();
let patch_values_mut = patch_values.spare_capacity_mut();
let mut chunk_patch_index = 0;
for i in num_prev_encoded..encoded_output.len() {
let decoded = T::decode_single(encoded_output[i], exp);
// write() is only safe to call more than once because the values are primitive (i.e., Drop is a no-op)
patch_indices_mut[chunk_patch_index].write(i as u64);
patch_values_mut[chunk_patch_index].write(chunk[i - num_prev_encoded]);
chunk_patch_index += (decoded != chunk[i - num_prev_encoded]) as usize;
}
assert_eq!(chunk_patch_index, chunk_patch_count);
unsafe {
patch_indices.set_len(num_prev_patches + chunk_patch_count);
patch_values.set_len(num_prev_patches + chunk_patch_count);
}

// replace the patched values in the encoded array with the fill value
// for better downstream compression
if let Some(fill_value) = fill_value {
// handle the edge case where the first N >= 1 chunks are all patches
let start_patch = if !has_filled { 0 } else { num_prev_patches };
for patch_idx in &patch_indices[start_patch..] {
encoded_output[*patch_idx as usize] = *fill_value;
}
}
}

Expand Down Expand Up @@ -189,6 +259,16 @@ impl ALPFloat for f32 {
0.000000001,
0.0000000001, // 10^-10
];

#[inline(always)]
fn as_int(self) -> Self::ALPInt {
self as _
}

#[inline(always)]
fn from_int(n: Self::ALPInt) -> Self {
n as _
}
}

impl ALPFloat for f64 {
Expand Down Expand Up @@ -250,4 +330,14 @@ impl ALPFloat for f64 {
0.0000000000000000000001,
0.00000000000000000000001, // 10^-23
];

#[inline(always)]
fn as_int(self) -> Self::ALPInt {
self as _
}

#[inline(always)]
fn from_int(n: Self::ALPInt) -> Self {
n as _
}
}
2 changes: 1 addition & 1 deletion encodings/alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod tests {
assert!(encoded.patches().is_some());
assert_eq!(
encoded.encoded().as_primitive().maybe_null_slice::<i64>(),
vec![1234i64, 2718, 2718, 4000] // fill forward
vec![1234i64, 2718, 1234, 4000] // fill forward
);
assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });

Expand Down