Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions encodings/experimental/onpair/benches/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use vortex_array::ArrayRef;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::array_session;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinArray;
use vortex_array::arrays::VarBinViewArray;
Expand Down Expand Up @@ -80,7 +81,7 @@ use vortex_onpair::onpair_compress;
use vortex_session::VortexSession;

static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
let session = vortex_array::array_session();
let session = array_session();
vortex_onpair::initialize(&session);
session
});
Expand Down Expand Up @@ -153,13 +154,13 @@ fn corpus(n: usize, shape: Shape) -> Vec<String> {
out
}

fn compress(n: usize, shape: Shape) -> OnPairArray {
fn compress(n: usize, shape: Shape, ctx: &mut ExecutionCtx) -> OnPairArray {
let strings = corpus(n, shape);
let varbin = VarBinArray::from_iter(
strings.iter().map(|s| Some(s.as_bytes())),
DType::Utf8(Nullability::NonNullable),
);
onpair_compress(&varbin, varbin.len(), varbin.dtype(), DEFAULT_DICT12_CONFIG)
onpair_compress(varbin.as_array(), DEFAULT_DICT12_CONFIG, ctx)
.unwrap_or_else(|e| panic!("onpair_compress failed: {e}"))
}

Expand All @@ -172,13 +173,12 @@ fn widen<T: NativePType>(arr: &ArrayRef, ctx: &mut ExecutionCtx) -> Buffer<T> {
.into_buffer::<T>()
}

fn materialise(arr: &OnPairArray) -> (DecodeInputs, usize) {
let mut ctx = SESSION.create_execution_ctx();
fn materialise(arr: &OnPairArray, ctx: &mut ExecutionCtx) -> (DecodeInputs, usize) {
let view = arr.as_view();
let inputs = DecodeInputs {
dict_bytes: view.dict_bytes().clone(),
dict_offsets: widen::<u32>(view.dict_offsets(), &mut ctx),
codes: widen::<u16>(view.codes(), &mut ctx),
dict_offsets: widen::<u32>(view.dict_offsets(), ctx),
codes: widen::<u16>(view.codes(), ctx),
bits: view.bits(),
};
let total = inputs.decompressed_len();
Expand All @@ -197,9 +197,10 @@ const CASES: &[(Shape, usize)] = &[
/// Hits `onpair::decompress_into` directly.
#[divan::bench(args = CASES)]
fn decompress_into_bench(bencher: Bencher, case: (Shape, usize)) {
let mut ctx = SESSION.create_execution_ctx();
let (shape, n) = case;
let arr = compress(n, shape);
let (inputs, total) = materialise(&arr);
let arr = compress(n, shape, &mut ctx);
let (inputs, total) = materialise(&arr, &mut ctx);
bencher.bench_local(|| {
let mut out: Vec<u8> = Vec::with_capacity(total);
let written = inputs.decompress_into(out.spare_capacity_mut());
Expand All @@ -212,8 +213,9 @@ fn decompress_into_bench(bencher: Bencher, case: (Shape, usize)) {
/// building the view buffer + `BinaryView` list, etc.
#[divan::bench(args = CASES)]
fn canonicalize_to_varbinview(bencher: Bencher, case: (Shape, usize)) {
let mut ctx = SESSION.create_execution_ctx();
let (shape, n) = case;
let arr = compress(n, shape);
let arr = compress(n, shape, &mut ctx);
bencher
.with_inputs(|| (arr.clone().into_array(), SESSION.create_execution_ctx()))
.bench_local_values(|(arr, mut ctx)| {
Expand All @@ -232,8 +234,9 @@ const COMPUTE_CASES: &[(Shape, usize)] = &[(Shape::UrlLog, 100_000), (Shape::Url
/// rows; the cost is dominated by the `codes` segment copy + offsets.
#[divan::bench(args = COMPUTE_CASES)]
fn filter_share_dict(bencher: Bencher, case: (Shape, usize)) {
let mut ctx = SESSION.create_execution_ctx();
let (shape, n) = case;
let arr = compress(n, shape);
let arr = compress(n, shape, &mut ctx);
let mask = Mask::from_iter((0..n).map(|i| i % 7 == 0));
bencher
.with_inputs(|| SESSION.create_execution_ctx())
Expand Down
156 changes: 80 additions & 76 deletions encodings/experimental/onpair/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,70 +8,87 @@ use onpair::Offset;
use vortex_array::ArrayRef;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::accessor::ArrayAccessor;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::arrays::varbinview::BinaryView;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::validity::Validity;
use vortex_buffer::Alignment;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_buffer::ByteBuffer;
use vortex_buffer::ByteBufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_mask::AllOr;

use crate::OnPair;
use crate::OnPairArray;

/// Default OnPair training configuration: 12-bit codes ("dict-12").
pub const DEFAULT_DICT12_CONFIG: Config = onpair::DEFAULT_CONFIG;

/// Compress an iterable of optional byte strings via the OnPair encoder.
pub fn onpair_compress_iter<'a, I>(
iter: I,
len: usize,
dtype: DType,
config: Config,
) -> VortexResult<OnPairArray>
where
I: Iterator<Item = Option<&'a [u8]>>,
{
onpair_compress_iter_with_offsets::<u64, _>(iter, len, dtype, config)
}

fn onpair_compress_iter_with_offsets<'a, O, I>(
iter: I,
len: usize,
dtype: DType,
fn onpair_compress_varbinview<O>(
array: VarBinViewArray,
config: Config,
ctx: &mut ExecutionCtx,
) -> VortexResult<OnPairArray>
where
O: Offset,
I: Iterator<Item = Option<&'a [u8]>>,
{
let len = array.len();
let mask = array.validity()?.execute_mask(len, ctx)?;
if mask.all_false() {
return OnPair::try_new(
array.dtype().clone(),
BufferHandle::new_host(ByteBuffer::empty()),
ConstantArray::new(0, len).into_array(),
ConstantArray::new(0u16, len).into_array(),
ConstantArray::new(0u32, len + 1).into_array(),
ConstantArray::new(0i32, len).into_array(),
Validity::AllInvalid,
9,
);
}

let mut flat: Vec<u8> = Vec::with_capacity(len * 16);
let mut offsets: Vec<O> = Vec::with_capacity(len + 1);
let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
let mut validity_bits: Vec<bool> = Vec::with_capacity(len);
offsets.push(<O as Offset>::from_usize(0));
offsets.push(O::from_usize(0));
let views = array.views();
let buffers = array
.data_buffers()
.as_ref()
.iter()
.map(|b| b.as_host())
.collect::<Vec<_>>();

for item in iter {
match item {
Some(bytes) => {
match mask.bit_buffer() {
AllOr::All => {
for view in views {
let bytes = view_bytes(view, &buffers);
flat.extend_from_slice(bytes);
offsets.push(<O as Offset>::from_usize(flat.len()));
uncompressed_lengths.push(
i32::try_from(bytes.len()).vortex_expect("string length must fit in i32"),
);
validity_bits.push(true);
offsets.push(O::from_usize(flat.len()));
uncompressed_lengths
.push(i32::try_from(view.len()).vortex_expect("must fit in i32"));
}
None => {
offsets.push(<O as Offset>::from_usize(flat.len()));
uncompressed_lengths.push(0);
validity_bits.push(false);
}
AllOr::None => {
unreachable!("all_false() should have been caught earlier");
}
AllOr::Some(validity) => {
for (view, valid) in views.iter().zip(validity.iter()) {
if valid {
let bytes = view_bytes(view, &buffers);
flat.extend_from_slice(bytes);
offsets.push(O::from_usize(flat.len()));
uncompressed_lengths
.push(i32::try_from(view.len()).vortex_expect("must fit in i32"));
} else {
offsets.push(O::from_usize(flat.len()));
uncompressed_lengths.push(0);
}
}
}
}
Expand All @@ -80,44 +97,53 @@ where
.map_err(|e| vortex_err!("OnPair compress failed: {e}"))?;
let bits = column.bits;
let dict_bytes = dict_bytes_to_buffer(column.dict_bytes);
let codes_offsets = build_codes_offsets(&column.codes, &column.dict_offsets, &offsets)?;
let codes_offsets =
build_codes_offsets(&column.codes, &column.dict_offsets, &offsets)?.into_array();
let codes = Buffer::from(column.codes).into_array();
let dict_offsets = Buffer::from(column.dict_offsets).into_array();
let codes_offsets = Buffer::from(codes_offsets).into_array();

let uncompressed_lengths = uncompressed_lengths.into_array();
let validity = match dtype.nullability() {
Nullability::NonNullable => Validity::NonNullable,
Nullability::Nullable => Validity::from_iter(validity_bits),
};

OnPair::try_new(
dtype,
array.dtype().clone(),
dict_bytes,
dict_offsets,
codes,
codes_offsets,
uncompressed_lengths,
validity,
array.validity()?,
bits,
)
}

fn view_bytes<'a>(view: &'a BinaryView, buffers: &'a [&ByteBuffer]) -> &'a [u8] {
if view.is_inlined() {
view.as_inlined().value()
} else {
let view_ref = view.as_view();
&buffers[view_ref.buffer_index as usize][view_ref.as_range()]
}
}

/// Lift compressed dictionary bytes into the Vortex buffer slot.
fn dict_bytes_to_buffer(dict_bytes: Vec<u8>) -> BufferHandle {
// Pad the dictionary blob with MAX_TOKEN_SIZE zero bytes so the
// over-copy decoder can issue a fixed 16-byte load for every token
// without risking an OOB read on the last entry.
let mut padded = Vec::with_capacity(dict_bytes.len() + onpair::MAX_TOKEN_SIZE);
padded.extend_from_slice(&dict_bytes);
padded.resize(dict_bytes.len() + onpair::MAX_TOKEN_SIZE, 0);
//
// Align dict_bytes to 8 bytes so the segment that ultimately holds the
// OnPair tree starts at an 8-aligned in-memory address. Without this
// anchor, the per-buffer padding the serializer inserts is only
// *relative* to the segment start; if the segment lands at a u8-aligned
// heap address, downstream `PrimitiveArray<u32>::deserialize` panics
// with `Misaligned buffer cannot be used to build PrimitiveArray of u32`.
BufferHandle::new_host(ByteBuffer::from(padded).aligned(vortex_buffer::Alignment::new(8)))
let mut padded = ByteBufferMut::with_capacity_aligned(
dict_bytes.len() + onpair::MAX_TOKEN_SIZE,
Alignment::new(8),
);
padded.extend_from_slice(&dict_bytes);
unsafe { padded.push_n_unchecked(0, dict_bytes.len() + onpair::MAX_TOKEN_SIZE - padded.len()) };
BufferHandle::new_host(padded.freeze())
}

/// Reconstruct the per-row `codes_offsets` from the flat `codes`, the
Expand All @@ -128,9 +154,9 @@ fn build_codes_offsets<O: Offset>(
codes: &[u16],
dict_offsets: &[u32],
row_byte_offsets: &[O],
) -> VortexResult<Vec<u32>> {
) -> VortexResult<Buffer<u32>> {
let nrows = row_byte_offsets.len() - 1;
let mut codes_offsets = Vec::with_capacity(nrows + 1);
let mut codes_offsets = BufferMut::with_capacity(nrows + 1);
codes_offsets.push(0u32);
let mut decoded_bytes: u64 = 0;
let mut code_idx: usize = 0;
Expand All @@ -149,38 +175,16 @@ fn build_codes_offsets<O: Offset>(
.map_err(|_| vortex_err!("OnPair: code boundary {code_idx} does not fit u32"))?,
);
}
Ok(codes_offsets)
}

/// Compress a byte-string accessor (typically a `VarBinArray` or
/// `VarBinViewArray`).
pub fn onpair_compress<A: ArrayAccessor<[u8]>>(
array: A,
len: usize,
dtype: &DType,
config: Config,
) -> VortexResult<OnPairArray> {
array.with_iterator(|iter| onpair_compress_iter(iter, len, dtype.clone(), config))
Ok(codes_offsets.freeze())
}

/// Compress any [`ArrayRef`] whose canonical form is a string array, by first
/// canonicalising to `VarBinViewArray`.
pub fn onpair_compress_array(
pub fn onpair_compress(
array: &ArrayRef,
config: Config,
ctx: &mut ExecutionCtx,
) -> VortexResult<OnPairArray> {
let view = array.clone().execute::<VarBinViewArray>(ctx)?;
let len = view.len();
let dtype = view.dtype().clone();
onpair_compress(&view, len, &dtype, config)
}

/// Convenience: build a default `ExecutionCtx` from `LEGACY_SESSION`.
pub fn onpair_compress_array_default(
array: &ArrayRef,
config: Config,
) -> VortexResult<OnPairArray> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
onpair_compress_array(array, config, &mut ctx)
onpair_compress_varbinview::<u64>(view, config, ctx)
}
11 changes: 5 additions & 6 deletions encodings/experimental/onpair/src/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ mod tests {
use rstest::rstest;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::array_session;
use vortex_array::arrays::BoolArray;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::VarBinArray;
Expand All @@ -84,7 +85,7 @@ mod tests {
use crate::compress::DEFAULT_DICT12_CONFIG;
use crate::compress::onpair_compress;

static SESSION: LazyLock<VortexSession> = LazyLock::new(vortex_array::array_session);
static SESSION: LazyLock<VortexSession> = LazyLock::new(array_session);

#[cfg_attr(miri, ignore)]
#[rstest]
Expand All @@ -99,10 +100,9 @@ mod tests {
[Some(""), Some("a"), Some(""), Some("bbb")],
DType::Utf8(Nullability::NonNullable),
);
let arr = onpair_compress(&input, input.len(), input.dtype(), DEFAULT_DICT12_CONFIG)?
.into_array();

let mut ctx = SESSION.create_execution_ctx();
let arr = onpair_compress(input.as_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?.into_array();

let result = arr
.binary(ConstantArray::new("", input.len()).into_array(), op)?
.execute::<BoolArray>(&mut ctx)?;
Expand All @@ -117,9 +117,8 @@ mod tests {
[Some(""), None, Some("x")],
DType::Utf8(Nullability::Nullable),
);
let arr = onpair_compress(&input, input.len(), input.dtype(), DEFAULT_DICT12_CONFIG)?
.into_array();
let mut ctx = SESSION.create_execution_ctx();
let arr = onpair_compress(input.as_array(), DEFAULT_DICT12_CONFIG, &mut ctx)?.into_array();

let eq_empty = arr
.clone()
Expand Down
Loading
Loading