Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ dirs = "6.0.0"
divan = { package = "codspeed-divan-compat", version = "4.0.4" }
enum-iterator = "2.0.0"
env_logger = "0.11"
fastlanes = "0.5"
fastlanes = "0.5.1"
flatbuffers = "25.2.10"
fsst-rs = "0.5.11"
futures = { version = "0.3.31", default-features = false }
Expand Down
28 changes: 28 additions & 0 deletions encodings/fastlanes/src/bitpacking/array/unpack_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,34 @@ impl<T: PhysicalPType, S: UnpackStrategy<T>> UnpackedChunks<T, S> {
debug_assert_eq!(local_idx, self.len);
}

/// Walk every *packed* chunk in array order, yielding the raw packed FastLanes block and the
/// padded bit range it covers, without unpacking it.
///
/// Unlike [`Self::for_each_unpacked_chunk`], this does not fill the scratch buffer: it hands
/// the still-packed block to the callback so fused kernels (e.g. compare) can unpack and
/// consume it in a single pass. Each yielded block holds exactly `elems_per_chunk` packed
/// values (the buffer is zero-padded out to a whole final chunk).
///
/// The yielded range is in *padded* coordinates: block `c` covers
/// `[c * 1024, min((c + 1) * 1024, offset + len))`, so it includes the leading `offset` rows
/// that slicing skips. Block starts are therefore always 1024-aligned regardless of `offset`.
/// Callers must account for the array's `offset` when mapping a block's rows back to logical
/// output positions (e.g. by viewing the output buffer at a bit offset of `offset`).
pub(crate) fn for_each_packed_chunk<F>(&self, mut f: F)
where
F: FnMut(&[T::Physical], Range<usize>),
{
let packed_slice: &[T::Physical] = buffer_as_slice(&self.packed);
let elems_per_chunk = self.elems_per_chunk();
let padded_len = self.offset + self.len;
for chunk in 0..self.num_chunks {
let packed_chunk = &packed_slice[chunk * elems_per_chunk..][..elems_per_chunk];
let start = chunk * CHUNK_SIZE;
let end = (start + CHUNK_SIZE).min(padded_len);
f(packed_chunk, start..end);
}
}

/// Unpack full chunks into output range starting at the given index.
fn decode_full_chunks_into_at(
&mut self,
Expand Down
138 changes: 123 additions & 15 deletions encodings/fastlanes/src/bitpacking/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,24 @@
//! a [`BitBuffer`]. Patches are re-applied at the end by overwriting bits at the patched
//! indices with `predicate(patch_value)`.

use fastlanes::BitPacking;
use fastlanes::BitPackingCompare;
use fastlanes::FastLanesComparable;
use vortex_array::ArrayRef;
use vortex_array::ArrayView;
use vortex_array::ExecutionCtx;
use vortex_array::dtype::NativePType;
use vortex_array::dtype::Nullability;
use vortex_array::dtype::PhysicalPType;
use vortex_array::match_each_integer_ptype;
use vortex_array::scalar_fn::fns::binary::CompareKernel;
use vortex_array::scalar_fn::fns::operators::CompareOperator;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;

use crate::BitPacked;
use crate::bitpacking::compute::stream_predicate::stream_predicate;
use crate::bitpacking::compute::compare_fused::stream_compare_fused;
use crate::unpack_iter::BitPacked as BitPackedIter;

impl CompareKernel for BitPacked {
fn compare(
Expand Down Expand Up @@ -55,6 +60,10 @@ impl CompareKernel for BitPacked {
}
}

/// Compare every value against the constant via the fused FastLanes `unpack_cmp` kernel.
///
/// `NativePType::is_eq` / `is_lt` etc. provide total comparison (matching the primitive between
/// kernel's dispatch shape). `NotEq` has no direct method, so use `!is_eq`.
fn compare_constant_typed<T>(
lhs: ArrayView<'_, BitPacked>,
rhs: T,
Expand All @@ -63,42 +72,60 @@ fn compare_constant_typed<T>(
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef>
where
T: NativePType + Copy + crate::unpack_iter::BitPacked,
T: NativePType
+ BitPackedIter
+ FastLanesComparable<Bitpacked = <T as PhysicalPType>::Physical>,
<T as PhysicalPType>::Physical: BitPacking + NativePType + BitPackingCompare,
{
// `NativePType::is_eq` / `is_lt` etc. provide total comparison (matching the primitive
// between kernel's dispatch shape). `NotEq` has no direct method, so use `!is_eq`.
match operator {
CompareOperator::Eq => stream_predicate::<T, _>(lhs, nullability, |v| v.is_eq(rhs), ctx),
CompareOperator::Eq => {
stream_compare_fused::<T, _>(lhs, rhs, nullability, |a, b| a.is_eq(b), ctx)
}
CompareOperator::NotEq => {
stream_predicate::<T, _>(lhs, nullability, |v| !v.is_eq(rhs), ctx)
stream_compare_fused::<T, _>(lhs, rhs, nullability, |a, b| !a.is_eq(b), ctx)
}
CompareOperator::Lt => {
stream_compare_fused::<T, _>(lhs, rhs, nullability, |a, b| a.is_lt(b), ctx)
}
CompareOperator::Lte => {
stream_compare_fused::<T, _>(lhs, rhs, nullability, |a, b| a.is_le(b), ctx)
}
CompareOperator::Gt => {
stream_compare_fused::<T, _>(lhs, rhs, nullability, |a, b| a.is_gt(b), ctx)
}
CompareOperator::Gte => {
stream_compare_fused::<T, _>(lhs, rhs, nullability, |a, b| a.is_ge(b), ctx)
}
CompareOperator::Lt => stream_predicate::<T, _>(lhs, nullability, |v| v.is_lt(rhs), ctx),
CompareOperator::Lte => stream_predicate::<T, _>(lhs, nullability, |v| v.is_le(rhs), ctx),
CompareOperator::Gt => stream_predicate::<T, _>(lhs, nullability, |v| v.is_gt(rhs), ctx),
CompareOperator::Gte => stream_predicate::<T, _>(lhs, nullability, |v| v.is_ge(rhs), ctx),
}
}

#[cfg(test)]
mod tests {
use std::sync::LazyLock;

use rstest::rstest;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::BoolArray;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::slice::SliceKernel;
use vortex_array::assert_arrays_eq;
use vortex_array::builtins::ArrayBuiltins;
use vortex_array::scalar_fn::fns::binary::CompareKernel;
use vortex_array::scalar_fn::fns::operators::CompareOperator;
use vortex_array::scalar_fn::fns::operators::Operator;
use vortex_array::session::ArraySession;
use vortex_error::VortexResult;
use vortex_session::VortexSession;

use crate::BitPacked;
use crate::BitPackedArrayExt;
use crate::BitPackedData;

static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());

/// All six operators on a small in-range input.
#[rstest]
#[case(Operator::Eq, vec![false, false, false, true, false, false, true])]
Expand All @@ -108,7 +135,7 @@ mod tests {
#[case(Operator::Gt, vec![false, false, false, false, true, true, false])]
#[case(Operator::Gte, vec![false, false, false, true, true, true, true])]
fn small(#[case] op: Operator, #[case] expected: Vec<bool>) {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let mut ctx = SESSION.create_execution_ctx();
let values = PrimitiveArray::from_iter([0u32, 1, 2, 3, 4, 5, 3]);
let packed = BitPackedData::encode(&values.into_array(), 3, &mut ctx).unwrap();
let rhs = ConstantArray::new(3u32, packed.len()).into_array();
Expand All @@ -130,7 +157,7 @@ mod tests {
($name:ident, $T:ty, $($bw:expr),+) => {
#[test]
fn $name() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let mut ctx = SESSION.create_execution_ctx();
for bw in [$($bw),+] {
let cap: u128 = 1u128 << bw;
let values: Vec<$T> = (0..2048u128).map(|i| (i % cap) as $T).collect();
Expand Down Expand Up @@ -171,7 +198,7 @@ mod tests {
/// predicate runs.
#[test]
fn signed_with_patches_matches_primitive() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let mut ctx = SESSION.create_execution_ctx();
let values: Vec<i32> = (0..1500)
.map(|i| if i % 73 == 0 { 100_000 + i } else { i % 100 })
.collect();
Expand All @@ -191,10 +218,91 @@ mod tests {
Ok(())
}

/// Sliced inputs: a non-zero block offset (and a length spanning several blocks) must still go
/// through the fused kernel and agree with the primitive fallback. Sweeps slice starts that
/// land both inside the first block and past it, with lengths that end mid-block and on a block
/// boundary.
#[rstest]
#[case(1, 4000)] // start mid-first-block, multi-block length
#[case(1023, 2)] // start at the last row of the first block
#[case(1024, 1024)] // start exactly on a block boundary, exactly one block long
#[case(1500, 1000)] // start mid-second-block
#[case(3, 1021)] // ends exactly on the first block boundary
fn sliced_matches_primitive(
#[case] start: usize,
#[case] slice_len: usize,
) -> VortexResult<()> {
let mut ctx = SESSION.create_execution_ctx();
let values: Vec<u32> = (0..5000u32).map(|i| i % 128).collect();
let prim = PrimitiveArray::from_iter(values);
let packed = BitPackedData::encode(&prim.clone().into_array(), 7, &mut ctx)?;

let sliced = packed.into_array().slice(start..start + slice_len)?;
let rhs = ConstantArray::new(50u32, slice_len).into_array();
for op in [
CompareOperator::Eq,
CompareOperator::Lt,
CompareOperator::Gte,
] {
let got = <BitPacked as CompareKernel>::compare(
sliced.as_::<BitPacked>(),
&rhs,
op,
&mut ctx,
)?
.expect("fused compare kernel must engage for sliced arrays")
.execute::<BoolArray>(&mut ctx)?;
let want = prim
.clone()
.into_array()
.slice(start..start + slice_len)?
.binary(rhs.clone(), Operator::from(op))?
.execute::<BoolArray>(&mut ctx)?;
assert_arrays_eq!(got, want);
}
Ok(())
}

/// Sliced *and* patched: combine a non-zero offset with out-of-range values that land in
/// `Patches`, exercising the `offset + (global - p_off)` patch-position math.
#[test]
fn sliced_with_patches_matches_primitive() -> VortexResult<()> {
let mut ctx = SESSION.create_execution_ctx();
let values: Vec<i32> = (0..4096)
.map(|i| if i % 91 == 0 { 100_000 + i } else { i % 100 })
.collect();
let prim = PrimitiveArray::from_iter(values);
let packed = BitPackedData::encode(&prim.clone().into_array(), 7, &mut ctx)?;
assert!(packed.patches().is_some(), "test setup expects patches");

let (start, end) = (700usize, 3500usize);
// `ArrayRef::slice` leaves a lazy `SliceArray` over a patched `BitPacked` (the
// `SliceReduce` path bails when patches are present), so go through the `SliceKernel`,
// which reads the buffers and produces a sliced `BitPacked` with sliced patches.
let sliced = <BitPacked as SliceKernel>::slice(packed.as_view(), start..end, &mut ctx)?
.expect("slice kernel produces a sliced bitpacked array");
let rhs = ConstantArray::new(50i32, end - start).into_array();
let got = <BitPacked as CompareKernel>::compare(
sliced.as_::<BitPacked>(),
&rhs,
CompareOperator::Eq,
&mut ctx,
)?
.expect("fused compare kernel must engage for sliced arrays with patches")
.execute::<BoolArray>(&mut ctx)?;
let want = prim
.into_array()
.slice(start..end)?
.binary(rhs, Operator::Eq)?
.execute::<BoolArray>(&mut ctx)?;
assert_arrays_eq!(got, want);
Ok(())
}

/// Nullable input — the result must carry the array's validity.
#[test]
fn nullable_propagates_validity() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let mut ctx = SESSION.create_execution_ctx();
let prim = PrimitiveArray::from_option_iter([Some(1u32), None, Some(3), Some(4), None]);
let packed = BitPackedData::encode(&prim.clone().into_array(), 3, &mut ctx)?;
let rhs = ConstantArray::new(3u32, packed.len()).into_array();
Expand Down
Loading
Loading