diff --git a/encodings/bytebool/src/array.rs b/encodings/bytebool/src/array.rs index 9f4c5f4fea9..98e28d0d9b8 100644 --- a/encodings/bytebool/src/array.rs +++ b/encodings/bytebool/src/array.rs @@ -10,7 +10,9 @@ use vortex_array::stats::StatsSet; use vortex_array::validity::{LogicalValidity, Validity, ValidityMetadata, ValidityVTable}; use vortex_array::variants::{BoolArrayTrait, VariantsVTable}; use vortex_array::visitor::{ArrayVisitor, VisitorVTable}; -use vortex_array::{impl_encoding, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoCanonical}; +use vortex_array::{ + impl_encoding, ArrayBuffer, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoCanonical, +}; use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{VortexExpect as _, VortexResult}; @@ -47,7 +49,7 @@ impl ByteBoolArray { Arc::new(ByteBoolMetadata { validity: validity.to_metadata(length)?, }), - Some(buffer), + Some(ArrayBuffer::new::(buffer)), validity.into_array().into_iter().collect::>().into(), StatsSet::default(), )? @@ -70,7 +72,7 @@ impl ByteBoolArray { Self::try_new(buffer, validity) } - pub fn buffer(&self) -> &Buffer { + pub fn buffer(&self) -> &ArrayBuffer { self.as_ref() .buffer() .vortex_expect("ByteBoolArray is missing the underlying buffer") diff --git a/encodings/bytebool/src/compute.rs b/encodings/bytebool/src/compute.rs index 6c9a31d0cfc..23e69da084c 100644 --- a/encodings/bytebool/src/compute.rs +++ b/encodings/bytebool/src/compute.rs @@ -99,9 +99,11 @@ impl FillForwardFn for ByteBoolEncoding { } // all valid, but we need to convert to non-nullable if validity.all_valid() { - return Ok( - ByteBoolArray::try_new(array.buffer().clone(), Validity::AllValid)?.into_array(), - ); + return Ok(ByteBoolArray::try_new( + array.buffer().clone().into_inner(), + Validity::AllValid, + )? + .into_array()); } // all invalid => fill with default value (false) if validity.all_invalid() { diff --git a/encodings/fastlanes/src/bitpacking/compute/scalar_at.rs b/encodings/fastlanes/src/bitpacking/compute/scalar_at.rs index 91d02294648..c2defd38637 100644 --- a/encodings/fastlanes/src/bitpacking/compute/scalar_at.rs +++ b/encodings/fastlanes/src/bitpacking/compute/scalar_at.rs @@ -34,7 +34,7 @@ mod test { #[test] fn invalid_patches() { let packed_array = BitPackedArray::try_new( - Buffer::from(vec![0u8; 128]), + Buffer::from(vec![0u32; 32]), PType::U32, Validity::AllInvalid, Some(Patches::new( diff --git a/encodings/fastlanes/src/bitpacking/mod.rs b/encodings/fastlanes/src/bitpacking/mod.rs index b869ff85d7b..ba4cb38356d 100644 --- a/encodings/fastlanes/src/bitpacking/mod.rs +++ b/encodings/fastlanes/src/bitpacking/mod.rs @@ -12,7 +12,8 @@ use vortex_array::validity::{LogicalValidity, Validity, ValidityMetadata, Validi use vortex_array::variants::{PrimitiveArrayTrait, VariantsVTable}; use vortex_array::visitor::{ArrayVisitor, VisitorVTable}; use vortex_array::{ - impl_encoding, ArrayDType, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoCanonical, + impl_encoding, ArrayBuffer, ArrayDType, ArrayData, ArrayLen, ArrayTrait, Canonical, + IntoCanonical, }; use vortex_buffer::Buffer; use vortex_dtype::{DType, NativePType, PType}; @@ -88,6 +89,11 @@ impl BitPackedArray { )); } + // TODO(ngates): enforce 128 byte alignment once we have a ScalarBufferBuilder that can + // enforce custom alignments. + // let packed = ArrayBuffer::new_with_alignment(packed, FASTLANES_ALIGNMENT); + let packed = ArrayBuffer::new_with_alignment(packed, ptype.byte_width()); + let metadata = BitPackedMetadata { validity: validity.to_metadata(length)?, offset, @@ -120,7 +126,7 @@ impl BitPackedArray { } #[inline] - pub fn packed(&self) -> &Buffer { + pub fn packed(&self) -> &ArrayBuffer { self.as_ref() .buffer() .vortex_expect("BitPackedArray must contain packed buffer") diff --git a/encodings/roaring/src/boolean/mod.rs b/encodings/roaring/src/boolean/mod.rs index 7f191d4880e..5932b8544b4 100644 --- a/encodings/roaring/src/boolean/mod.rs +++ b/encodings/roaring/src/boolean/mod.rs @@ -13,7 +13,8 @@ use vortex_array::validity::{LogicalValidity, ValidityVTable}; use vortex_array::variants::{BoolArrayTrait, VariantsVTable}; use vortex_array::visitor::{ArrayVisitor, VisitorVTable}; use vortex_array::{ - impl_encoding, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoArrayData, IntoCanonical, + impl_encoding, ArrayBuffer, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoArrayData, + IntoCanonical, }; use vortex_buffer::Buffer; use vortex_dtype::{DType, Nullability}; @@ -55,7 +56,9 @@ impl RoaringBoolArray { DType::Bool(Nullability::NonNullable), length, Arc::new(RoaringBoolMetadata), - Some(Buffer::from(bitmap.serialize::())), + Some(ArrayBuffer::new::(Buffer::from( + bitmap.serialize::(), + ))), vec![].into(), stats, )? @@ -75,7 +78,7 @@ impl RoaringBoolArray { } } - pub fn buffer(&self) -> &Buffer { + pub fn buffer(&self) -> &ArrayBuffer { self.as_ref() .buffer() .vortex_expect("Missing buffer in PrimitiveArray") diff --git a/encodings/roaring/src/integer/mod.rs b/encodings/roaring/src/integer/mod.rs index b40ffd3f430..09a9d0b8a59 100644 --- a/encodings/roaring/src/integer/mod.rs +++ b/encodings/roaring/src/integer/mod.rs @@ -12,8 +12,8 @@ use vortex_array::validity::{LogicalValidity, Validity, ValidityVTable}; use vortex_array::variants::{PrimitiveArrayTrait, VariantsVTable}; use vortex_array::visitor::{ArrayVisitor, VisitorVTable}; use vortex_array::{ - impl_encoding, ArrayDType as _, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoArrayData, - IntoCanonical, + impl_encoding, ArrayBuffer, ArrayDType as _, ArrayData, ArrayLen, ArrayTrait, Canonical, + IntoArrayData, IntoCanonical, }; use vortex_buffer::Buffer; use vortex_dtype::Nullability::NonNullable; @@ -68,7 +68,9 @@ impl RoaringIntArray { DType::Primitive(ptype, NonNullable), length, Arc::new(RoaringIntMetadata { ptype }), - Some(Buffer::from(bitmap.serialize::())), + Some(ArrayBuffer::new::(Buffer::from( + bitmap.serialize::(), + ))), vec![].into(), stats, )? diff --git a/pyvortex/src/array.rs b/pyvortex/src/array.rs index 9035adf017c..7d81a349069 100644 --- a/pyvortex/src/array.rs +++ b/pyvortex/src/array.rs @@ -516,10 +516,10 @@ impl PyArray { /// >>> print(arr.tree_display()) /// root: vortex.primitive(0x03)(i64?, len=4) nbytes=36 B (100.00%) /// metadata: PrimitiveMetadata { validity: Array } - /// buffer: 32 B + /// buffer (align=8): 32 B /// validity: vortex.bool(0x02)(bool, len=4) nbytes=3 B (8.33%) /// metadata: BoolMetadata { validity: NonNullable, first_byte_bit_offset: 0 } - /// buffer: 1 B + /// buffer (align=1): 1 B /// /// /// Compressed arrays often have more complex, deeply nested encoding trees. diff --git a/vortex-array/src/array/bool/mod.rs b/vortex-array/src/array/bool/mod.rs index bf8eca07a04..8c540ce4305 100644 --- a/vortex-array/src/array/bool/mod.rs +++ b/vortex-array/src/array/bool/mod.rs @@ -14,7 +14,8 @@ use crate::validity::{LogicalValidity, Validity, ValidityMetadata, ValidityVTabl use crate::variants::{BoolArrayTrait, VariantsVTable}; use crate::visitor::{ArrayVisitor, VisitorVTable}; use crate::{ - impl_encoding, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoArrayData, IntoCanonical, + impl_encoding, ArrayBuffer, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoArrayData, + IntoCanonical, }; pub mod compute; @@ -40,14 +41,14 @@ impl Display for BoolMetadata { impl BoolArray { /// Access internal array buffer - pub fn buffer(&self) -> &Buffer { + pub fn buffer(&self) -> &ArrayBuffer { self.as_ref() .buffer() .vortex_expect("Missing buffer in BoolArray") } /// Convert array into its internal buffer - pub fn into_buffer(self) -> Buffer { + pub fn into_buffer(self) -> ArrayBuffer { self.into_array() .into_buffer() .vortex_expect("BoolArray must have a buffer") @@ -56,7 +57,7 @@ impl BoolArray { /// Get array values as an arrow [BooleanBuffer] pub fn boolean_buffer(&self) -> BooleanBuffer { BooleanBuffer::new( - self.buffer().clone().into_arrow(), + self.buffer().clone().into_inner().into_arrow(), self.metadata().first_byte_bit_offset as usize, self.len(), ) @@ -71,7 +72,7 @@ impl BoolArray { pub fn into_boolean_builder(self) -> (BooleanBufferBuilder, usize) { let first_byte_bit_offset = self.metadata().first_byte_bit_offset as usize; let len = self.len(); - let arrow_buffer = self.into_buffer().into_arrow(); + let arrow_buffer = self.into_buffer().into_inner().into_arrow(); let mutable_buf = if arrow_buffer.ptr_offset() == 0 { arrow_buffer.into_mutable().unwrap_or_else(|b| { let mut buf = MutableBuffer::with_capacity(b.len()); @@ -127,7 +128,7 @@ impl BoolArray { validity: validity.to_metadata(buffer_len)?, first_byte_bit_offset, }), - Some(Buffer::from(inner)), + Some(ArrayBuffer::new::(Buffer::from(inner))), validity.into_array().into_iter().collect(), StatsSet::default(), )? diff --git a/vortex-array/src/array/primitive/compute/cast.rs b/vortex-array/src/array/primitive/compute/cast.rs index e716f1a5ce6..619e65a20b0 100644 --- a/vortex-array/src/array/primitive/compute/cast.rs +++ b/vortex-array/src/array/primitive/compute/cast.rs @@ -32,10 +32,12 @@ impl CastFn for PrimitiveEncoding { // If the bit width is the same, we can short-circuit and simply update the validity if array.ptype() == new_ptype { - return Ok( - PrimitiveArray::new(array.buffer().clone(), array.ptype(), new_validity) - .into_array(), - ); + return Ok(PrimitiveArray::new( + array.buffer().clone().into_inner(), + array.ptype(), + new_validity, + ) + .into_array()); } // Otherwise, we need to cast the values one-by-one diff --git a/vortex-array/src/array/primitive/compute/fill.rs b/vortex-array/src/array/primitive/compute/fill.rs index b80cd3d02f7..2f6e84b4a66 100644 --- a/vortex-array/src/array/primitive/compute/fill.rs +++ b/vortex-array/src/array/primitive/compute/fill.rs @@ -17,7 +17,7 @@ impl FillForwardFn for PrimitiveEncoding { let validity = array.logical_validity(); if validity.all_valid() { return Ok(PrimitiveArray::new( - array.buffer().clone(), + array.buffer().clone().into_inner(), array.ptype(), Validity::AllValid, ) diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index b8dd2adfc6d..d3c9cd85806 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -19,7 +19,8 @@ use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata use crate::variants::{PrimitiveArrayTrait, VariantsVTable}; use crate::visitor::{ArrayVisitor, VisitorVTable}; use crate::{ - impl_encoding, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoArrayData, IntoCanonical, + impl_encoding, ArrayBuffer, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoArrayData, + IntoCanonical, }; mod compute; @@ -60,7 +61,7 @@ impl PrimitiveArray { .to_metadata(length) .vortex_expect("Invalid validity"), }), - Some(buffer), + Some(ArrayBuffer::new_with_alignment(buffer, ptype.byte_width())), validity.into_array().into_iter().collect_vec().into(), StatsSet::default(), ) @@ -101,7 +102,7 @@ impl PrimitiveArray { }) } - pub fn buffer(&self) -> &Buffer { + pub fn buffer(&self) -> &ArrayBuffer { self.as_ref() .buffer() .vortex_expect("Missing buffer in PrimitiveArray") @@ -132,11 +133,14 @@ impl PrimitiveArray { T::PTYPE, self.ptype(), ); - self.into_buffer().into_vec::().unwrap_or_else(|b| { - let (prefix, values, suffix) = unsafe { b.as_ref().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - Vec::from(values) - }) + self.into_buffer() + .into_inner() + .into_vec::() + .unwrap_or_else(|b| { + let (prefix, values, suffix) = unsafe { b.as_ref().align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + Vec::from(values) + }) } pub fn get_as_cast(&self, idx: usize) -> T { @@ -156,10 +160,10 @@ impl PrimitiveArray { "can't reinterpret cast between integers of two different widths" ); - PrimitiveArray::new(self.buffer().clone(), ptype, self.validity()) + PrimitiveArray::new(self.buffer().clone().into_inner(), ptype, self.validity()) } - pub fn into_buffer(self) -> Buffer { + pub fn into_buffer(self) -> ArrayBuffer { self.into_array() .into_buffer() .vortex_expect("PrimitiveArray must have a buffer") diff --git a/vortex-array/src/array/varbin/arrow.rs b/vortex-array/src/array/varbin/arrow.rs index 270444bb1a9..046959f61d5 100644 --- a/vortex-array/src/array/varbin/arrow.rs +++ b/vortex-array/src/array/varbin/arrow.rs @@ -47,14 +47,14 @@ pub(crate) fn varbin_to_arrow(varbin_array: &VarBinArray) -> VortexResult Arc::new(unsafe { BinaryArray::new_unchecked( as_offset_buffer::(offsets), - data.clone().into_arrow(), + data.clone().into_inner().into_arrow(), nulls, ) }), PType::I64 => Arc::new(unsafe { LargeBinaryArray::new_unchecked( as_offset_buffer::(offsets), - data.clone().into_arrow(), + data.clone().into_inner().into_arrow(), nulls, ) }), @@ -64,14 +64,14 @@ pub(crate) fn varbin_to_arrow(varbin_array: &VarBinArray) -> VortexResult Arc::new(unsafe { StringArray::new_unchecked( as_offset_buffer::(offsets), - data.clone().into_arrow(), + data.clone().into_inner().into_arrow(), nulls, ) }), PType::I64 => Arc::new(unsafe { LargeStringArray::new_unchecked( as_offset_buffer::(offsets), - data.clone().into_arrow(), + data.clone().into_inner().into_arrow(), nulls, ) }), diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index 5c5726b655b..1a8981f5773 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -210,7 +210,7 @@ impl VarBinArray { let start = self.offset_at(index); let end = self.offset_at(index + 1); let sliced = slice(self.bytes(), start, end)?; - Ok(sliced.into_primitive()?.buffer().clone()) + Ok(sliced.into_primitive()?.buffer().clone().into_inner()) } /// Consumes self, returning a tuple containing the `DType`, the `bytes` array, diff --git a/vortex-array/src/array/varbinview/compute/mod.rs b/vortex-array/src/array/varbinview/compute/mod.rs index 53d5a1ca2e1..ebb1b735f89 100644 --- a/vortex-array/src/array/varbinview/compute/mod.rs +++ b/vortex-array/src/array/varbinview/compute/mod.rs @@ -63,8 +63,14 @@ impl TakeFn for VarBinViewEncoding { let validity = array.validity().take(indices)?; // Convert our views array into an Arrow u128 ScalarBuffer (16 bytes per view) - let views_buffer = - ScalarBuffer::::from(array.views().into_primitive()?.into_buffer().into_arrow()); + let views_buffer = ScalarBuffer::::from( + array + .views() + .into_primitive()? + .into_buffer() + .into_inner() + .into_arrow(), + ); let indices = indices.clone().into_primitive()?; @@ -97,8 +103,14 @@ impl TakeFn for VarBinViewEncoding { let validity = array.validity().take(indices)?; // Convert our views array into an Arrow u128 ScalarBuffer (16 bytes per view) - let views_buffer = - ScalarBuffer::::from(array.views().into_primitive()?.into_buffer().into_arrow()); + let views_buffer = ScalarBuffer::::from( + array + .views() + .into_primitive()? + .into_buffer() + .into_inner() + .into_arrow(), + ); let indices = indices.clone().into_primitive()?; diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index 4aceaf70cf6..dae201a5673 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -297,7 +297,7 @@ impl VarBinViewArray { /// only require hitting the prefixes or inline strings. pub fn binary_views(&self) -> VortexResult { Ok(Views { - inner: self.views().into_primitive()?.into_buffer(), + inner: self.views().into_primitive()?.into_buffer().into_inner(), idx: 0, len: self.len(), }) @@ -567,21 +567,21 @@ pub(crate) fn varbinview_as_arrow(var_bin_view: &VarBinViewArray) -> ArrayRef { let data = data .iter() - .map(|p| p.buffer().clone().into_arrow()) + .map(|p| p.buffer().clone().into_inner().into_arrow()) .collect::>(); // Switch on Arrow DType. match var_bin_view.dtype() { DType::Binary(_) => Arc::new(unsafe { BinaryViewArray::new_unchecked( - ScalarBuffer::::from(views.buffer().clone().into_arrow()), + ScalarBuffer::::from(views.buffer().clone().into_inner().into_arrow()), data, nulls, ) }), DType::Utf8(_) => Arc::new(unsafe { StringViewArray::new_unchecked( - ScalarBuffer::::from(views.buffer().clone().into_arrow()), + ScalarBuffer::::from(views.buffer().clone().into_inner().into_arrow()), data, nulls, ) diff --git a/vortex-array/src/arrow/wrappers.rs b/vortex-array/src/arrow/wrappers.rs index 1918aea82aa..7b4ae528d54 100644 --- a/vortex-array/src/arrow/wrappers.rs +++ b/vortex-array/src/arrow/wrappers.rs @@ -8,7 +8,7 @@ pub fn as_scalar_buffer( array: PrimitiveArray, ) -> ScalarBuffer { assert_eq!(array.ptype(), T::PTYPE); - ScalarBuffer::from(array.buffer().clone().into_arrow()) + ScalarBuffer::from(array.buffer().clone().into_inner().into_arrow()) } pub fn as_offset_buffer( diff --git a/vortex-array/src/buffer.rs b/vortex-array/src/buffer.rs new file mode 100644 index 00000000000..cf56699222e --- /dev/null +++ b/vortex-array/src/buffer.rs @@ -0,0 +1,58 @@ +use std::ops::Deref; + +use vortex_buffer::Buffer; +use vortex_error::{vortex_panic, VortexExpect}; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ArrayBuffer { + /// The underlying buffer holding the data. + buffer: Buffer, + /// The minimum alignment required for this buffer when (de)serialized. + alignment: usize, +} + +impl ArrayBuffer { + /// Create a new `ArrayBuffer` from the provided buffer and alignment. + /// + /// ## Panics + /// + /// Panics if `alignment` is greater than `u16::MAX`, is not a power of 2, or the buffer + /// is not aligned to `alignment`. + pub fn new_with_alignment(buffer: Buffer, alignment: usize) -> Self { + u16::try_from(alignment).vortex_expect("Alignment must fit into u16"); + if !alignment.is_power_of_two() { + vortex_panic!("Alignment must be a power of 2"); + } + if buffer.as_ptr().align_offset(alignment) != 0 { + vortex_panic!("Buffer must be aligned to {}", alignment); + } + Self { buffer, alignment } + } + + /// Create a new `ArrayBuffer` from the provided buffer with alignment derived from `T`. + pub fn new(buffer: Buffer) -> Self { + Self::new_with_alignment(buffer, align_of::()) + } + + #[inline] + pub fn alignment(&self) -> usize { + self.alignment + } + + #[inline] + pub fn alignment_u16(&self) -> u16 { + u16::try_from(self.alignment).vortex_expect("Alignment must fit into u16") + } + + pub fn into_inner(self) -> Buffer { + self.buffer + } +} + +impl Deref for ArrayBuffer { + type Target = Buffer; + + fn deref(&self) -> &Self::Target { + &self.buffer + } +} diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index f4aaa350da4..8c015021f63 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -172,7 +172,11 @@ fn primitive_to_arrow(primitive_array: PrimitiveArray) -> VortexResult array: &PrimitiveArray, ) -> VortexResult>> { Ok(Arc::new(ArrowPrimitiveArray::new( - ScalarBuffer::::new(array.buffer().clone().into_arrow(), 0, array.len()), + ScalarBuffer::::new( + array.buffer().clone().into_inner().into_arrow(), + 0, + array.len(), + ), array.logical_validity().to_null_buffer()?, ))) } @@ -288,8 +292,11 @@ fn temporal_to_arrow(temporal_array: TemporalArray) -> VortexResult { .into_primitive()?; let len = temporal_values.len(); let nulls = temporal_values.logical_validity().to_null_buffer()?; - let scalars = - ScalarBuffer::<$prim>::new(temporal_values.into_buffer().into_arrow(), 0, len); + let scalars = ScalarBuffer::<$prim>::new( + temporal_values.into_buffer().into_inner().into_arrow(), + 0, + len, + ); (scalars, nulls) }}; diff --git a/vortex-array/src/data/mod.rs b/vortex-array/src/data/mod.rs index 4baad3707dc..a5ac9f92c2b 100644 --- a/vortex-array/src/data/mod.rs +++ b/vortex-array/src/data/mod.rs @@ -22,8 +22,8 @@ use crate::stats::{ArrayStatistics, Stat, Statistics, StatsSet}; use crate::stream::{ArrayStream, ArrayStreamAdapter}; use crate::validity::{ArrayValidity, LogicalValidity, ValidityVTable}; use crate::{ - ArrayChildrenIterator, ArrayDType, ArrayLen, ArrayMetadata, Context, NamedChildrenCollector, - TryDeserializeArrayMetadata, + ArrayBuffer, ArrayChildrenIterator, ArrayDType, ArrayLen, ArrayMetadata, Context, + NamedChildrenCollector, TryDeserializeArrayMetadata, }; mod owned; @@ -61,7 +61,7 @@ impl ArrayData { dtype: DType, len: usize, metadata: Arc, - buffer: Option, + buffer: Option, children: Arc<[ArrayData]>, statistics: StatsSet, ) -> VortexResult { @@ -84,7 +84,7 @@ impl ArrayData { len: usize, flatbuffer: Buffer, flatbuffer_init: F, - buffers: Vec, + buffers: Vec, ) -> VortexResult where F: FnOnce(&[u8]) -> VortexResult, @@ -319,14 +319,14 @@ impl ArrayData { } } - pub fn buffer(&self) -> Option<&Buffer> { + pub fn buffer(&self) -> Option<&ArrayBuffer> { match &self.0 { InnerArrayData::Owned(d) => d.buffer(), InnerArrayData::Viewed(v) => v.buffer(), } } - pub fn into_buffer(self) -> Option { + pub fn into_buffer(self) -> Option { match self.0 { InnerArrayData::Owned(d) => d.into_buffer(), InnerArrayData::Viewed(v) => v.buffer().cloned(), diff --git a/vortex-array/src/data/owned.rs b/vortex-array/src/data/owned.rs index c0b89511089..c64ed8bf13e 100644 --- a/vortex-array/src/data/owned.rs +++ b/vortex-array/src/data/owned.rs @@ -1,13 +1,12 @@ use std::sync::{Arc, RwLock}; -use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_panic, VortexResult}; use vortex_scalar::Scalar; use crate::encoding::EncodingRef; use crate::stats::{Stat, Statistics, StatsSet}; -use crate::{ArrayDType, ArrayData, ArrayMetadata}; +use crate::{ArrayBuffer, ArrayDType, ArrayData, ArrayMetadata}; /// Owned [`ArrayData`] with serialized metadata, backed by heap-allocated memory. #[derive(Clone, Debug)] @@ -16,7 +15,7 @@ pub(super) struct OwnedArrayData { pub(super) dtype: DType, // FIXME(ngates): Arc? pub(super) len: usize, pub(super) metadata: Arc, - pub(super) buffer: Option, + pub(super) buffer: Option, pub(super) children: Arc<[ArrayData]>, pub(super) stats_set: Arc>, #[cfg(feature = "canonical_counter")] @@ -28,11 +27,11 @@ impl OwnedArrayData { &self.metadata } - pub fn buffer(&self) -> Option<&Buffer> { + pub fn buffer(&self) -> Option<&ArrayBuffer> { self.buffer.as_ref() } - pub fn into_buffer(self) -> Option { + pub fn into_buffer(self) -> Option { self.buffer } diff --git a/vortex-array/src/data/viewed.rs b/vortex-array/src/data/viewed.rs index 5130c666c60..5652c77ad48 100644 --- a/vortex-array/src/data/viewed.rs +++ b/vortex-array/src/data/viewed.rs @@ -11,7 +11,7 @@ use vortex_scalar::{Scalar, ScalarValue}; use crate::encoding::opaque::OpaqueEncoding; use crate::encoding::EncodingRef; use crate::stats::{Stat, Statistics, StatsSet}; -use crate::{flatbuffers as fb, ArrayData, ArrayMetadata, ChildrenCollector, Context}; +use crate::{flatbuffers as fb, ArrayBuffer, ArrayData, ArrayMetadata, ChildrenCollector, Context}; /// Zero-copy view over flatbuffer-encoded array data, created without eager serialization. #[derive(Clone)] @@ -22,7 +22,7 @@ pub(super) struct ViewedArrayData { pub(super) metadata: Arc, pub(super) flatbuffer: Buffer, pub(super) flatbuffer_loc: usize, - pub(super) buffers: Arc<[Buffer]>, + pub(super) buffers: Arc<[ArrayBuffer]>, pub(super) ctx: Arc, #[cfg(feature = "canonical_counter")] pub(super) canonical_counter: Arc, @@ -102,7 +102,7 @@ impl ViewedArrayData { collector.children() } - pub fn buffer(&self) -> Option<&Buffer> { + pub fn buffer(&self) -> Option<&ArrayBuffer> { self.flatbuffer() .buffers() .and_then(|buffers| { diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 783ef075536..85b5503024b 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -11,6 +11,7 @@ //! arrays can be [canonicalized](Canonical) into for ease of access in compute functions. //! +pub use buffer::*; pub use canonical::*; pub use children::*; pub use context::*; @@ -28,6 +29,7 @@ pub mod accessor; pub mod aliases; pub mod array; pub mod arrow; +mod buffer; pub mod builders; mod canonical; mod children; diff --git a/vortex-array/src/nbytes.rs b/vortex-array/src/nbytes.rs index da639c52948..8a6c6ad57f5 100644 --- a/vortex-array/src/nbytes.rs +++ b/vortex-array/src/nbytes.rs @@ -1,8 +1,7 @@ -use vortex_buffer::Buffer; use vortex_error::{VortexExpect, VortexResult}; use crate::visitor::ArrayVisitor; -use crate::ArrayData; +use crate::{ArrayBuffer, ArrayData}; impl ArrayData { /// Total size of the array in bytes, including all children and buffers. @@ -36,7 +35,7 @@ impl ArrayVisitor for NBytesVisitor { Ok(()) } - fn visit_buffer(&mut self, buffer: &Buffer) -> VortexResult<()> { + fn visit_buffer(&mut self, buffer: &ArrayBuffer) -> VortexResult<()> { self.0 += buffer.len(); Ok(()) } diff --git a/vortex-array/src/tree.rs b/vortex-array/src/tree.rs index 0f67cc423d7..71a90222f53 100644 --- a/vortex-array/src/tree.rs +++ b/vortex-array/src/tree.rs @@ -2,13 +2,12 @@ use std::fmt; use humansize::{format_size, DECIMAL}; use serde::ser::Error; -use vortex_buffer::Buffer; use vortex_error::{VortexError, VortexResult}; use crate::array::ChunkedEncoding; use crate::encoding::EncodingVTable; use crate::visitor::ArrayVisitor; -use crate::ArrayData; +use crate::{ArrayBuffer, ArrayData}; impl ArrayData { pub fn tree_display(&self) -> TreeDisplayWrapper { @@ -76,11 +75,12 @@ impl<'a, 'b: 'a> ArrayVisitor for TreeFormatter<'a, 'b> { Ok(()) } - fn visit_buffer(&mut self, buffer: &Buffer) -> VortexResult<()> { + fn visit_buffer(&mut self, buffer: &ArrayBuffer) -> VortexResult<()> { Ok(writeln!( self.fmt, - "{}buffer: {}", + "{}buffer (align={}): {}", self.indent, + buffer.alignment(), format_size(buffer.len(), DECIMAL) )?) } diff --git a/vortex-array/src/visitor.rs b/vortex-array/src/visitor.rs index 16bc1f8248b..12652f1502a 100644 --- a/vortex-array/src/visitor.rs +++ b/vortex-array/src/visitor.rs @@ -1,10 +1,9 @@ -use vortex_buffer::Buffer; use vortex_error::{vortex_err, VortexError, VortexResult}; use crate::encoding::Encoding; use crate::patches::Patches; use crate::validity::Validity; -use crate::ArrayData; +use crate::{ArrayBuffer, ArrayData}; pub trait VisitorVTable { fn accept(&self, array: &Array, visitor: &mut dyn ArrayVisitor) -> VortexResult<()>; @@ -47,7 +46,7 @@ pub trait ArrayVisitor { self.visit_child("patch_values", patches.values()) } - fn visit_buffer(&mut self, _buffer: &Buffer) -> VortexResult<()> { + fn visit_buffer(&mut self, _buffer: &ArrayBuffer) -> VortexResult<()> { Ok(()) } } diff --git a/vortex-flatbuffers/flatbuffers/vortex-array/array.fbs b/vortex-flatbuffers/flatbuffers/vortex-array/array.fbs index 425b1aabdf5..a8cc9a3dbb0 100644 --- a/vortex-flatbuffers/flatbuffers/vortex-array/array.fbs +++ b/vortex-flatbuffers/flatbuffers/vortex-array/array.fbs @@ -38,6 +38,8 @@ table Buffer { length: uint64; /// The length of any padding bytes written immediately following the buffer. padding: uint16; + /// The minimum alignment of the buffer. + alignment: uint16; } root_type Array; diff --git a/vortex-flatbuffers/src/generated/array.rs b/vortex-flatbuffers/src/generated/array.rs index 3e841ab312a..3420bb8e6c0 100644 --- a/vortex-flatbuffers/src/generated/array.rs +++ b/vortex-flatbuffers/src/generated/array.rs @@ -597,6 +597,7 @@ impl<'a> flatbuffers::Follow<'a> for Buffer<'a> { impl<'a> Buffer<'a> { pub const VT_LENGTH: flatbuffers::VOffsetT = 4; pub const VT_PADDING: flatbuffers::VOffsetT = 6; + pub const VT_ALIGNMENT_: flatbuffers::VOffsetT = 8; #[inline] pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { @@ -609,6 +610,7 @@ impl<'a> Buffer<'a> { ) -> flatbuffers::WIPOffset> { let mut builder = BufferBuilder::new(_fbb); builder.add_length(args.length); + builder.add_alignment_(args.alignment_); builder.add_padding(args.padding); builder.finish() } @@ -630,6 +632,14 @@ impl<'a> Buffer<'a> { // which contains a valid value in this slot unsafe { self._tab.get::(Buffer::VT_PADDING, Some(0)).unwrap()} } + /// The minimum alignment of the buffer. + #[inline] + pub fn alignment_(&self) -> u16 { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::(Buffer::VT_ALIGNMENT_, Some(0)).unwrap()} + } } impl flatbuffers::Verifiable for Buffer<'_> { @@ -641,6 +651,7 @@ impl flatbuffers::Verifiable for Buffer<'_> { v.visit_table(pos)? .visit_field::("length", Self::VT_LENGTH, false)? .visit_field::("padding", Self::VT_PADDING, false)? + .visit_field::("alignment_", Self::VT_ALIGNMENT_, false)? .finish(); Ok(()) } @@ -648,6 +659,7 @@ impl flatbuffers::Verifiable for Buffer<'_> { pub struct BufferArgs { pub length: u64, pub padding: u16, + pub alignment_: u16, } impl<'a> Default for BufferArgs { #[inline] @@ -655,6 +667,7 @@ impl<'a> Default for BufferArgs { BufferArgs { length: 0, padding: 0, + alignment_: 0, } } } @@ -673,6 +686,10 @@ impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> BufferBuilder<'a, 'b, A> { self.fbb_.push_slot::(Buffer::VT_PADDING, padding, 0); } #[inline] + pub fn add_alignment_(&mut self, alignment_: u16) { + self.fbb_.push_slot::(Buffer::VT_ALIGNMENT_, alignment_, 0); + } + #[inline] pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> BufferBuilder<'a, 'b, A> { let start = _fbb.start_table(); BufferBuilder { @@ -692,6 +709,7 @@ impl core::fmt::Debug for Buffer<'_> { let mut ds = f.debug_struct("Buffer"); ds.field("length", &self.length()); ds.field("padding", &self.padding()); + ds.field("alignment_", &self.alignment_()); ds.finish() } } diff --git a/vortex-ipc/src/messages/decoder.rs b/vortex-ipc/src/messages/decoder.rs index 821bcdc0404..acf1d9c87d9 100644 --- a/vortex-ipc/src/messages/decoder.rs +++ b/vortex-ipc/src/messages/decoder.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use bytes::{Buf, BytesMut}; use flatbuffers::{root, root_unchecked, Follow}; use itertools::Itertools; -use vortex_array::{flatbuffers as fba, ArrayData, Context}; +use vortex_array::{flatbuffers as fba, ArrayBuffer, ArrayData, Context}; use vortex_buffer::Buffer; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; @@ -32,7 +32,7 @@ pub struct ArrayParts { // Typed as fb::Array array_flatbuffer: Buffer, array_flatbuffer_loc: usize, - buffers: Vec, + buffers: Vec, } impl Debug for ArrayParts { @@ -229,9 +229,16 @@ impl MessageDecoder { .map(|buffer_msg| { let buffer_len = usize::try_from(buffer_msg.length()) .vortex_expect("buffer length is too large for usize"); - let buffer = bytes.split_to_aligned(buffer_len, self.alignment); + let alignment = match buffer_msg.alignment_() { + 0 => self.alignment, + align => (align as usize).min(self.alignment), + }; + let buffer = bytes.split_to_aligned(buffer_len, alignment); let _padding = bytes.split_to(buffer_msg.padding() as usize); - Buffer::from(buffer.freeze()) + ArrayBuffer::new_with_alignment( + Buffer::from(buffer.freeze()), + alignment, + ) }) .collect_vec(); diff --git a/vortex-ipc/src/messages/encoder.rs b/vortex-ipc/src/messages/encoder.rs index 319b9810d3d..23a86c4e1e3 100644 --- a/vortex-ipc/src/messages/encoder.rs +++ b/vortex-ipc/src/messages/encoder.rs @@ -95,9 +95,10 @@ impl MessageEncoder { &fba::BufferArgs { length: buffer.len() as u64, padding, + alignment_: buffer.alignment_u16(), }, )); - buffers.push(buffer.clone()); + buffers.push(buffer.clone().into_inner()); if padding > 0 { buffers.push(self.zeros.slice(0..usize::from(padding))); } @@ -128,6 +129,8 @@ impl MessageEncoder { &fba::BufferArgs { length: buffer.len() as u64, padding, + // Buffer messages have no minimum alignment, the reader decides. + alignment_: 0, }, ) .as_union_value()