diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 95955d9dfe6..5f2943b311f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,8 +3,7 @@ name: CI on: push: branches: [ "develop" ] - pull_request: - branches: [ "develop" ] + pull_request: { } workflow_dispatch: { } permissions: diff --git a/Cargo.lock b/Cargo.lock index b5a99f0b308..66c2075c264 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2569,10 +2569,7 @@ dependencies = [ "allocator-api2", "arrow-array", "arrow-buffer", - "arrow-cast", - "arrow-data", "arrow-schema", - "arrow-select", "dyn-clone", "half", "humansize", diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index d53c6dd8350..858d0bc3bcb 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -56,6 +56,15 @@ pub fn enumerate_arrays() -> Vec<&'static dyn Encoding> { ] } +pub fn compress_ctx() -> CompressCtx { + let cfg = CompressConfig::new( + HashSet::from_iter(enumerate_arrays().iter().map(|e| (*e).id())), + HashSet::default(), + ); + info!("Compression config {cfg:?}"); + CompressCtx::new(Arc::new(cfg)) +} + pub fn download_taxi_data() -> PathBuf { let download_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("data/yellow-tripdata-2023-11.parquet"); @@ -92,14 +101,7 @@ pub fn compress_taxi_data() -> ArrayRef { .build() .unwrap(); - // let array = ArrayRef::try_from((&mut reader) as &mut dyn RecordBatchReader).unwrap(); - let cfg = CompressConfig::new( - HashSet::from_iter(enumerate_arrays().iter().map(|e| (*e).id())), - HashSet::default(), - ); - info!("Compression config {cfg:?}"); - let ctx = CompressCtx::new(Arc::new(cfg)); - + let ctx = compress_ctx(); let schema = reader.schema(); let mut uncompressed_size = 0; let chunks = reader @@ -151,7 +153,7 @@ mod test { use vortex::compute::as_arrow::as_arrow; use vortex::encode::FromArrow; - use crate::{compress_taxi_data, download_taxi_data}; + use crate::{compress_ctx, compress_taxi_data, download_taxi_data}; #[allow(dead_code)] fn setup_logger(level: LevelFilter) { @@ -171,6 +173,7 @@ mod test { _ = compress_taxi_data(); } + #[ignore] #[test] fn round_trip_arrow() { let file = File::open(download_taxi_data()).unwrap(); @@ -185,4 +188,23 @@ mod test { assert_eq!(vortex_as_arrow.deref(), arrow_array.deref()); } } + + #[ignore] + #[test] + fn round_trip_arrow_compressed() { + let file = File::open(download_taxi_data()).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let reader = builder.with_limit(1).build().unwrap(); + + let ctx = compress_ctx(); + for record_batch in reader.map(|batch_result| batch_result.unwrap()) { + let struct_arrow: ArrowStructArray = record_batch.into(); + let arrow_array: ArrowArrayRef = Arc::new(struct_arrow); + let vortex_array = ArrayRef::from_arrow(arrow_array.clone(), false); + + let compressed = ctx.clone().compress(vortex_array.as_ref(), None).unwrap(); + let compressed_as_arrow = as_arrow(compressed.as_ref()).unwrap(); + assert_eq!(compressed_as_arrow.deref(), arrow_array.deref()); + } + } } diff --git a/pyvortex/src/dtype.rs b/pyvortex/src/dtype.rs index 14f76243fc3..bbe3b4a3778 100644 --- a/pyvortex/src/dtype.rs +++ b/pyvortex/src/dtype.rs @@ -1,13 +1,10 @@ -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, Field}; use arrow::pyarrow::FromPyArrow; use pyo3::types::PyType; use pyo3::{pyclass, pymethods, Py, PyAny, PyResult, Python}; -use vortex::arrow::convert::TryIntoDType; use vortex::dtype::DType; -use crate::error::PyVortexError; - #[pyclass(name = "DType", module = "vortex", subclass)] pub struct PyDType { inner: DType, @@ -35,12 +32,7 @@ impl PyDType { #[pyo3(from_py_with = "import_arrow_dtype")] arrow_dtype: DataType, nullable: bool, ) -> PyResult> { - PyDType::wrap( - cls.py(), - arrow_dtype - .try_into_dtype(nullable) - .map_err(PyVortexError::new)?, - ) + PyDType::wrap(cls.py(), (&Field::new("_", arrow_dtype, nullable)).into()) } } diff --git a/pyvortex/src/encode.rs b/pyvortex/src/encode.rs index 7b9873e35f9..53d8168b26b 100644 --- a/pyvortex/src/encode.rs +++ b/pyvortex/src/encode.rs @@ -1,5 +1,5 @@ use arrow::array::{make_array, ArrayData}; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, Field}; use arrow::ffi_stream::ArrowArrayStreamReader; use arrow::pyarrow::FromPyArrow; use arrow::record_batch::RecordBatchReader; @@ -8,7 +8,6 @@ use pyo3::prelude::*; use vortex::array::chunked::ChunkedArray; use vortex::array::{Array, ArrayRef}; -use vortex::arrow::convert::TryIntoDType; use vortex::dtype::DType; use vortex::encode::FromArrow; @@ -39,12 +38,10 @@ pub fn encode(obj: &PyAny) -> PyResult> { .map(|a| ArrayRef::from_arrow(a, false)) }) .collect::>>()?; - let null_count: usize = obj.getattr("null_count")?.extract()?; let dtype: DType = obj .getattr("type") - .and_then(DataType::from_pyarrow)? - .try_into_dtype(null_count > 0) - .map_err(PyVortexError::map_err)?; + .and_then(DataType::from_pyarrow) + .map(|dt| (&Field::new("_", dt, false)).into())?; PyArray::wrap(obj.py(), ChunkedArray::new(encoded_chunks, dtype).boxed()) } else if obj.is_instance(table)? { let array_stream = ArrowArrayStreamReader::from_pyarrow(obj)?; diff --git a/pyvortex/test/test_compress.py b/pyvortex/test/test_compress.py index 2c161e23341..2f89ef175db 100644 --- a/pyvortex/test/test_compress.py +++ b/pyvortex/test/test_compress.py @@ -1,5 +1,10 @@ +import os.path +from pathlib import Path + import numpy as np import pyarrow as pa +import pyarrow.parquet as pq +import pytest import vortex @@ -63,3 +68,12 @@ def test_table_encode(): assert encoded.to_pyarrow().combine_chunks() == pa.StructArray.from_arrays( [pa.array([0, 1, 2, 3, 4, 5]), pa.array(["a", "b", "c", "d", "e", "f"])], names=["number", "string"] ) + + +@pytest.mark.xfail(reason="Not yet implemented") +def test_taxi(): + curdir = Path(os.path.dirname(__file__)).parent.parent + table = pq.read_table(curdir / "bench-vortex/data/yellow-tripdata-2023-11.parquet") + compressed = vortex.compress(vortex.encode(table[:100])) + decompressed = compressed.to_pyarrow() + assert not decompressed diff --git a/vortex-alp/src/array.rs b/vortex-alp/src/array.rs index 4da9898b0d1..e80703a7e96 100644 --- a/vortex-alp/src/array.rs +++ b/vortex-alp/src/array.rs @@ -2,7 +2,7 @@ use std::any::Any; use std::sync::{Arc, RwLock}; use crate::alp::Exponents; -use vortex::array::{Array, ArrayKind, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef}; +use vortex::array::{Array, ArrayKind, ArrayRef, Encoding, EncodingId, EncodingRef}; use vortex::compress::EncodingCompression; use vortex::dtype::{DType, IntWidth, Signedness}; use vortex::error::{VortexError, VortexResult}; @@ -104,10 +104,6 @@ impl Array for ALPArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - todo!() - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { Ok(Self::try_new( self.encoded().slice(start, stop)?, diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index c07b1722d2b..8d6a4128748 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -23,9 +23,6 @@ allocator-api2 = "0.2.16" arrow-array = { version = "50.0.0" } arrow-buffer = { version = "50.0.0" } arrow-schema = { version = "50.0.0" } -arrow-data = { version = "50.0.0" } -arrow-cast = { version = "50.0.0" } -arrow-select = { version = "50.0.0" } dyn-clone = "1.0.16" half = "2.3.1" humansize = "2.1.3" diff --git a/vortex-array/src/accessor.rs b/vortex-array/src/accessor.rs new file mode 100644 index 00000000000..ec36393bcec --- /dev/null +++ b/vortex-array/src/accessor.rs @@ -0,0 +1,5 @@ +use crate::array::Array; + +pub trait ArrayAccessor: Array { + fn value(&self, index: usize) -> Option; +} diff --git a/vortex-array/src/array/bool/mod.rs b/vortex-array/src/array/bool/mod.rs index 9ad86f2dbcf..43c33a72ac3 100644 --- a/vortex-array/src/array/bool/mod.rs +++ b/vortex-array/src/array/bool/mod.rs @@ -1,13 +1,9 @@ use std::any::Any; -use std::iter; use std::sync::{Arc, RwLock}; -use arrow_array::array::{ArrayRef as ArrowArrayRef, BooleanArray}; -use arrow_array::cast::AsArray; -use arrow_buffer::buffer::{BooleanBuffer, NullBuffer}; +use arrow_buffer::buffer::BooleanBuffer; use linkme::distributed_slice; -use crate::arrow::CombineChunks; use crate::compute::scalar_at::scalar_at; use crate::dtype::{DType, Nullability}; use crate::error::VortexResult; @@ -16,8 +12,8 @@ use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stat, Stats, StatsSet}; use super::{ - check_slice_bounds, check_validity_buffer, Array, ArrayRef, ArrowIterator, Encoding, - EncodingId, EncodingRef, ENCODINGS, + check_slice_bounds, check_validity_buffer, Array, ArrayRef, Encoding, EncodingId, EncodingRef, + ENCODINGS, }; mod compute; @@ -113,21 +109,6 @@ impl Array for BoolArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - Box::new(iter::once(Arc::new(BooleanArray::new( - self.buffer.clone(), - self.validity().map(|v| { - NullBuffer::new( - v.iter_arrow() - .combine_chunks() - .as_boolean() - .values() - .clone(), - ) - }), - )) as ArrowArrayRef)) - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index 87fdeed58d8..4d40a1aa7b7 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -1,14 +1,11 @@ use std::any::Any; use std::sync::{Arc, RwLock}; -use std::vec::IntoIter; -use arrow_array::array::ArrayRef as ArrowArrayRef; use itertools::Itertools; use linkme::distributed_slice; use crate::array::{ - check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef, - ENCODINGS, + check_slice_bounds, Array, ArrayRef, Encoding, EncodingId, EncodingRef, ENCODINGS, }; use crate::dtype::DType; use crate::error::{VortexError, VortexResult}; @@ -124,10 +121,6 @@ impl Array for ChunkedArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - Box::new(ChunkedArrowIterator::new(self)) - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; @@ -218,49 +211,14 @@ impl Encoding for ChunkedEncoding { } } -struct ChunkedArrowIterator { - chunks_iter: IntoIter, - arrow_iter: Option>, -} - -impl ChunkedArrowIterator { - fn new(array: &ChunkedArray) -> Self { - let mut chunks_iter = array.chunks.clone().into_iter(); - let arrow_iter = chunks_iter.next().map(|c| c.iter_arrow()); - Self { - chunks_iter, - arrow_iter, - } - } -} - -impl Iterator for ChunkedArrowIterator { - type Item = ArrowArrayRef; - - fn next(&mut self) -> Option { - self.arrow_iter - .as_mut() - .and_then(|iter| iter.next()) - .or_else(|| { - self.chunks_iter.next().and_then(|next_chunk| { - self.arrow_iter = Some(next_chunk.iter_arrow()); - self.next() - }) - }) - } -} - #[cfg(test)] mod test { - use arrow_array::array::ArrayRef as ArrowArrayRef; - use arrow_array::array::ArrowPrimitiveType; - use arrow_array::cast::AsArray; - use arrow_array::types::UInt64Type; - use itertools::Itertools; + use crate::array::{Array, ArrayRef}; use crate::array::chunked::ChunkedArray; - use crate::array::Array; + use crate::compute::flatten::{flatten, flatten_primitive, FlattenedArray}; use crate::dtype::{DType, IntWidth, Nullability, Signedness}; + use crate::ptype::NativePType; fn chunked_array() -> ChunkedArray { ChunkedArray::new( @@ -277,74 +235,41 @@ mod test { ) } - fn assert_equal_slices(arr: ArrowArrayRef, slice: &[T::Native]) { - assert_eq!(*arr.as_primitive::().values(), slice); - } - - #[test] - pub fn iter() { - let chunked = ChunkedArray::new( - vec![vec![1u64, 2, 3].into(), vec![4u64, 5, 6].into()], - DType::Int( - IntWidth::_64, - Signedness::Unsigned, - Nullability::NonNullable, - ), - ); - + fn assert_equal_slices(arr: ArrayRef, slice: &[T]) { + let FlattenedArray::Chunked(chunked) = flatten(arr.as_ref()).unwrap() else { + unreachable!() + }; + let mut values = Vec::with_capacity(arr.len()); chunked - .iter_arrow() - .zip_eq([[1u64, 2, 3], [4, 5, 6]]) - .for_each(|(arr, slice)| assert_equal_slices::(arr, &slice)); + .chunks() + .iter() + .map(|a| flatten_primitive(a.as_ref()).unwrap()) + .for_each(|a| values.extend_from_slice(a.typed_data::())); + assert_eq!(values, slice); } #[test] pub fn slice_middle() { - chunked_array() - .slice(2, 5) - .unwrap() - .iter_arrow() - .zip_eq([vec![3u64], vec![4, 5]]) - .for_each(|(arr, slice)| assert_equal_slices::(arr, &slice)); + assert_equal_slices(chunked_array().slice(2, 5).unwrap(), &[3u64, 4, 5]) } #[test] pub fn slice_begin() { - chunked_array() - .slice(1, 3) - .unwrap() - .iter_arrow() - .zip_eq([[2u64, 3]]) - .for_each(|(arr, slice)| assert_equal_slices::(arr, &slice)); + assert_equal_slices(chunked_array().slice(1, 3).unwrap(), &[2u64, 3]); } #[test] pub fn slice_aligned() { - chunked_array() - .slice(3, 6) - .unwrap() - .iter_arrow() - .zip_eq([[4u64, 5, 6]]) - .for_each(|(arr, slice)| assert_equal_slices::(arr, &slice)); + assert_equal_slices(chunked_array().slice(3, 6).unwrap(), &[4u64, 5, 6]); } #[test] pub fn slice_many_aligned() { - chunked_array() - .slice(0, 6) - .unwrap() - .iter_arrow() - .zip_eq([[1u64, 2, 3], [4, 5, 6]]) - .for_each(|(arr, slice)| assert_equal_slices::(arr, &slice)); + assert_equal_slices(chunked_array().slice(0, 6).unwrap(), &[1u64, 2, 3, 4, 5, 6]); } #[test] pub fn slice_end() { - chunked_array() - .slice(7, 8) - .unwrap() - .iter_arrow() - .zip_eq([[8u64]]) - .for_each(|(arr, slice)| assert_equal_slices::(arr, &slice)); + assert_equal_slices(chunked_array().slice(7, 8).unwrap(), &[8u64]); } } diff --git a/vortex-array/src/array/constant/compute.rs b/vortex-array/src/array/constant/compute.rs new file mode 100644 index 00000000000..4052fd22f57 --- /dev/null +++ b/vortex-array/src/array/constant/compute.rs @@ -0,0 +1,89 @@ +use crate::array::bool::BoolArray; +use crate::array::constant::ConstantArray; +use crate::array::downcast::DowncastArrayBuiltin; +use crate::array::primitive::PrimitiveArray; +use crate::array::{Array, ArrayRef}; +use crate::compute::as_contiguous::AsContiguousFn; +use crate::compute::flatten::{FlattenFn, FlattenedArray}; +use crate::compute::scalar_at::ScalarAtFn; +use crate::compute::take::TakeFn; +use crate::compute::ArrayCompute; +use crate::error::VortexResult; +use crate::match_each_native_ptype; +use crate::scalar::Scalar; +use itertools::Itertools; + +impl ArrayCompute for ConstantArray { + fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { + Some(self) + } + + fn flatten(&self) -> Option<&dyn FlattenFn> { + Some(self) + } + + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + Some(self) + } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } +} + +impl AsContiguousFn for ConstantArray { + fn as_contiguous(&self, arrays: Vec) -> VortexResult { + let chunks = arrays.iter().map(|a| a.as_constant().clone()).collect_vec(); + if chunks.iter().map(|c| c.scalar()).all_equal() { + Ok(ConstantArray::new( + chunks.first().unwrap().scalar().clone(), + chunks.iter().map(|c| c.len()).sum(), + ) + .boxed()) + } else { + // TODO(ngates): we need to flatten the constant arrays and then concatenate them + Err("Cannot concatenate constant arrays with differing scalars".into()) + } + } +} + +impl FlattenFn for ConstantArray { + fn flatten(&self) -> VortexResult { + Ok(match self.scalar() { + Scalar::Bool(b) => { + if let Some(bv) = b.value() { + FlattenedArray::Bool(BoolArray::from(vec![bv; self.len()])) + } else { + FlattenedArray::Bool(BoolArray::null(self.len())) + } + } + Scalar::Primitive(p) => { + if let Some(ps) = p.value() { + match_each_native_ptype!(ps.ptype(), |$P| { + FlattenedArray::Primitive(PrimitiveArray::from_value::<$P>( + $P::try_from(self.scalar())?, + self.len(), + )) + }) + } else { + match_each_native_ptype!(p.ptype(), |$P| { + FlattenedArray::Primitive(PrimitiveArray::null::<$P>(self.len())) + }) + } + } + _ => panic!("Unsupported scalar type {}", self.dtype()), + }) + } +} + +impl ScalarAtFn for ConstantArray { + fn scalar_at(&self, _index: usize) -> VortexResult { + Ok(self.scalar().clone()) + } +} + +impl TakeFn for ConstantArray { + fn take(&self, indices: &dyn Array) -> VortexResult { + Ok(ConstantArray::new(self.scalar().clone(), indices.len()).boxed()) + } +} diff --git a/vortex-array/src/array/constant/compute/mod.rs b/vortex-array/src/array/constant/compute/mod.rs deleted file mode 100644 index 75b8d9f0535..00000000000 --- a/vortex-array/src/array/constant/compute/mod.rs +++ /dev/null @@ -1,29 +0,0 @@ -use crate::array::constant::ConstantArray; -use crate::array::{Array, ArrayRef}; -use crate::compute::scalar_at::ScalarAtFn; -use crate::compute::take::TakeFn; -use crate::compute::ArrayCompute; -use crate::error::VortexResult; -use crate::scalar::Scalar; - -impl ArrayCompute for ConstantArray { - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { - Some(self) - } - - fn take(&self) -> Option<&dyn TakeFn> { - Some(self) - } -} - -impl ScalarAtFn for ConstantArray { - fn scalar_at(&self, _index: usize) -> VortexResult { - Ok(self.scalar().clone()) - } -} - -impl TakeFn for ConstantArray { - fn take(&self, indices: &dyn Array) -> VortexResult { - Ok(ConstantArray::new(self.scalar().clone(), indices.len()).boxed()) - } -} diff --git a/vortex-array/src/array/constant/mod.rs b/vortex-array/src/array/constant/mod.rs index f91bd79566e..d3065f55318 100644 --- a/vortex-array/src/array/constant/mod.rs +++ b/vortex-array/src/array/constant/mod.rs @@ -3,17 +3,13 @@ use std::sync::{Arc, RwLock}; use linkme::distributed_slice; -use crate::array::bool::BoolArray; -use crate::array::primitive::PrimitiveArray; use crate::array::{ - check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef, - ENCODINGS, + check_slice_bounds, Array, ArrayRef, Encoding, EncodingId, EncodingRef, ENCODINGS, }; use crate::dtype::DType; use crate::error::VortexResult; use crate::formatter::{ArrayDisplay, ArrayFormatter}; -use crate::match_each_native_ptype; -use crate::scalar::{PScalar, Scalar}; +use crate::scalar::Scalar; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stat, Stats, StatsSet}; @@ -88,41 +84,6 @@ impl Array for ConstantArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - let plain_array = match self.scalar() { - Scalar::Bool(b) => { - if let Some(bv) = b.value() { - BoolArray::from(vec![bv; self.len()]).boxed() - } else { - BoolArray::null(self.len()).boxed() - } - } - Scalar::Primitive(p) => { - if let Some(ps) = p.value() { - match ps { - PScalar::U8(p) => PrimitiveArray::from_value(p, self.len()).boxed(), - PScalar::U16(p) => PrimitiveArray::from_value(p, self.len()).boxed(), - PScalar::U32(p) => PrimitiveArray::from_value(p, self.len()).boxed(), - PScalar::U64(p) => PrimitiveArray::from_value(p, self.len()).boxed(), - PScalar::I8(p) => PrimitiveArray::from_value(p, self.len()).boxed(), - PScalar::I16(p) => PrimitiveArray::from_value(p, self.len()).boxed(), - PScalar::I32(p) => PrimitiveArray::from_value(p, self.len()).boxed(), - PScalar::I64(p) => PrimitiveArray::from_value(p, self.len()).boxed(), - PScalar::F16(p) => PrimitiveArray::from_value(p, self.len()).boxed(), - PScalar::F32(p) => PrimitiveArray::from_value(p, self.len()).boxed(), - PScalar::F64(p) => PrimitiveArray::from_value(p, self.len()).boxed(), - } - } else { - match_each_native_ptype!(p.ptype(), |$P| { - PrimitiveArray::null::<$P>(self.len()).boxed() - }) - } - } - _ => panic!("Unsupported scalar type {}", self.dtype()), - }; - plain_array.iter_arrow() - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; diff --git a/vortex-array/src/array/mod.rs b/vortex-array/src/array/mod.rs index fa016c97afa..9643327bcd8 100644 --- a/vortex-array/src/array/mod.rs +++ b/vortex-array/src/array/mod.rs @@ -1,7 +1,6 @@ use std::any::Any; use std::fmt::{Debug, Display, Formatter}; -use arrow_array::array::ArrayRef as ArrowArrayRef; use linkme::distributed_slice; use crate::array::bool::{BoolArray, BoolEncoding}; @@ -33,7 +32,6 @@ pub mod typed; pub mod varbin; pub mod varbinview; -pub type ArrowIterator = dyn Iterator; pub type ArrayRef = Box; /// An Enc Array is the base object representing all arrays in enc. @@ -61,8 +59,6 @@ pub trait Array: fn dtype(&self) -> &DType; /// Get statistics for the array fn stats(&self) -> Stats; - /// Produce arrow batches from the encoding - fn iter_arrow(&self) -> Box; /// Limit array to start..stop range fn slice(&self, start: usize, stop: usize) -> VortexResult; /// Encoding kind of the array diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index 0ce7660fb2e..ae122cf40e8 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -6,23 +6,21 @@ use std::panic::RefUnwindSafe; use std::ptr::NonNull; use std::sync::{Arc, RwLock}; +use crate::accessor::ArrayAccessor; use allocator_api2::alloc::Allocator; -use arrow_array::array::make_array; -use arrow_array::cast::AsArray; -use arrow_buffer::buffer::{Buffer, NullBuffer, ScalarBuffer}; -use arrow_data::ArrayData; +use arrow_buffer::buffer::{Buffer, ScalarBuffer}; use linkme::distributed_slice; use crate::array::bool::BoolArray; use crate::array::{ - check_slice_bounds, check_validity_buffer, Array, ArrayRef, ArrowIterator, Encoding, - EncodingId, EncodingRef, ENCODINGS, + check_slice_bounds, check_validity_buffer, Array, ArrayRef, Encoding, EncodingId, EncodingRef, + ENCODINGS, }; -use crate::arrow::CombineChunks; use crate::compute::scalar_at::scalar_at; use crate::dtype::DType; use crate::error::VortexResult; use crate::formatter::{ArrayDisplay, ArrayFormatter}; +use crate::iterator::ArrayIter; use crate::ptype::{match_each_native_ptype, NativePType, PType}; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stats, StatsSet}; @@ -175,25 +173,6 @@ impl Array for PrimitiveArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - Box::new(iter::once(make_array( - ArrayData::builder(self.dtype().into()) - .len(self.len()) - .nulls(self.validity().map(|v| { - NullBuffer::new( - v.iter_arrow() - .combine_chunks() - .as_boolean() - .values() - .clone(), - ) - })) - .add_buffer(self.buffer.clone()) - .build() - .unwrap(), - ))) - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; @@ -235,6 +214,24 @@ impl<'arr> AsRef<(dyn Array + 'arr)> for PrimitiveArray { } } +impl ArrayAccessor for PrimitiveArray { + fn value(&self, index: usize) -> Option { + if self.is_valid(index) { + Some(self.typed_data::()[index]) + } else { + None + } + } +} + +impl PrimitiveArray { + pub fn iter(&self) -> ArrayIter { + ArrayIter::new(self.clone()) + } +} + +pub type PrimitiveIter<'a, T> = ArrayIter, T>; + #[derive(Debug)] pub struct PrimitiveEncoding; diff --git a/vortex-array/src/array/sparse/compute.rs b/vortex-array/src/array/sparse/compute.rs index 98330c3480c..07f65a29694 100644 --- a/vortex-array/src/array/sparse/compute.rs +++ b/vortex-array/src/array/sparse/compute.rs @@ -1,13 +1,18 @@ +use arrow_buffer::BooleanBufferBuilder; use itertools::Itertools; +use crate::array::bool::BoolArray; use crate::array::downcast::DowncastArrayBuiltin; +use crate::array::primitive::PrimitiveArray; use crate::array::sparse::SparseArray; use crate::array::{Array, ArrayRef}; use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; +use crate::compute::flatten::{flatten, FlattenFn, FlattenedArray}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::search_sorted::{search_sorted, SearchSortedSide}; use crate::compute::ArrayCompute; -use crate::error::VortexResult; +use crate::error::{VortexError, VortexResult}; +use crate::match_each_native_ptype; use crate::scalar::Scalar; impl ArrayCompute for SparseArray { @@ -15,6 +20,10 @@ impl ArrayCompute for SparseArray { Some(self) } + fn flatten(&self) -> Option<&dyn FlattenFn> { + Some(self) + } + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -43,6 +52,42 @@ impl AsContiguousFn for SparseArray { } } +impl FlattenFn for SparseArray { + fn flatten(&self) -> VortexResult { + // Resolve our indices into a vector of usize applying the offset + let indices = self.resolved_indices(); + + let mut validity = BooleanBufferBuilder::new(self.len()); + validity.append_n(self.len(), false); + + let values = flatten(self.values())?; + if let FlattenedArray::Primitive(parray) = values { + match_each_native_ptype!(parray.ptype(), |$P| { + let mut values = vec![$P::default(); self.len()]; + let mut offset = 0; + + for v in parray.typed_data::<$P>() { + let idx = indices[offset]; + values[idx] = *v; + validity.set_bit(idx, true); + offset += 1; + } + + let validity = BoolArray::new(validity.finish(), None); + + Ok(FlattenedArray::Primitive(PrimitiveArray::from_nullable( + values, + Some(validity.boxed()), + ))) + }) + } else { + Err(VortexError::InvalidArgument( + "Cannot flatten SparseArray with non-primitive values".into(), + )) + } + } +} + impl ScalarAtFn for SparseArray { fn scalar_at(&self, index: usize) -> VortexResult { // Check whether `true_patch_index` exists in the patch index array @@ -50,7 +95,7 @@ impl ScalarAtFn for SparseArray { // greater than or equal to the true index let true_patch_index = index + self.indices_offset; search_sorted(self.indices(), true_patch_index, SearchSortedSide::Left).and_then(|idx| { - // If the value at this index is equal to the true index, then it exists in the patch index array + // If the value at this index is equal to the true index, then it exists in the patch index array, // and we should return the value at the corresponding index in the patch values array scalar_at(self.indices(), idx) .or_else(|_| Ok(Scalar::null(self.values().dtype()))) diff --git a/vortex-array/src/array/sparse/mod.rs b/vortex-array/src/array/sparse/mod.rs index c473108be55..9410a4abced 100644 --- a/vortex-array/src/array/sparse/mod.rs +++ b/vortex-array/src/array/sparse/mod.rs @@ -1,24 +1,19 @@ use std::any::Any; -use std::iter; use std::sync::{Arc, RwLock}; -use arrow_array::array::{ArrayRef as ArrowArrayRef, PrimitiveArray as ArrowPrimitiveArray}; -use arrow_array::cast::AsArray; -use arrow_array::types::UInt64Type; -use arrow_buffer::buffer::{NullBuffer, ScalarBuffer}; -use arrow_buffer::BooleanBufferBuilder; +use itertools::Itertools; use linkme::distributed_slice; use crate::array::ENCODINGS; -use crate::array::{ - check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef, -}; +use crate::array::{check_slice_bounds, Array, ArrayRef, Encoding, EncodingId, EncodingRef}; use crate::compress::EncodingCompression; +use crate::compute::cast::cast; +use crate::compute::flatten::flatten_primitive; use crate::compute::search_sorted::{search_sorted, SearchSortedSide}; use crate::dtype::DType; use crate::error::{VortexError, VortexResult}; use crate::formatter::{ArrayDisplay, ArrayFormatter}; -use crate::match_arrow_numeric_type; +use crate::ptype::PType; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stats, StatsCompute, StatsSet}; @@ -83,18 +78,12 @@ impl SparseArray { /// Return indices as a vector of usize with the indices_offset applied. pub fn resolved_indices(&self) -> Vec { - let mut indices = Vec::with_capacity(self.len()); - self.indices().iter_arrow().for_each(|c| { - indices.extend( - arrow_cast::cast(c.as_ref(), &arrow_schema::DataType::UInt64) - .unwrap() - .as_primitive::() - .values() - .into_iter() - .map(|v| (*v as usize) - self.indices_offset), - ) - }); - indices + flatten_primitive(cast(self.indices(), &PType::U64.into()).unwrap().as_ref()) + .unwrap() + .typed_data::() + .iter() + .map(|v| (*v as usize) - self.indices_offset) + .collect_vec() } } @@ -134,30 +123,6 @@ impl Array for SparseArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - // Resolve our indices into a vector of usize applying the offset - let indices = self.resolved_indices(); - let array: ArrowArrayRef = match_arrow_numeric_type!(self.values().dtype(), |$E| { - let mut validity = BooleanBufferBuilder::new(self.len()); - validity.append_n(self.len(), false); - let mut values = vec![<$E as ArrowPrimitiveType>::Native::default(); self.len()]; - let mut offset = 0; - for values_array in self.values().iter_arrow() { - for v in values_array.as_primitive::<$E>().values() { - let idx = indices[offset]; - values[idx] = *v; - validity.set_bit(idx, true); - offset += 1; - } - } - Arc::new(ArrowPrimitiveArray::<$E>::new( - ScalarBuffer::from(values), - Some(NullBuffer::from(validity.finish())), - )) - }); - Box::new(iter::once(array)) - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; @@ -232,12 +197,11 @@ impl Encoding for SparseEncoding { #[cfg(test)] mod test { - use arrow_array::cast::AsArray; - use arrow_array::types::Int32Type; use itertools::Itertools; use crate::array::sparse::SparseArray; use crate::array::Array; + use crate::compute::flatten::flatten_primitive; use crate::compute::scalar_at::scalar_at; use crate::error::VortexError; @@ -247,13 +211,9 @@ mod test { } fn assert_sparse_array(sparse: &dyn Array, values: &[Option]) { - let sparse_arrow = sparse - .as_ref() - .iter_arrow() - .next() + let sparse_arrow = flatten_primitive(sparse) .unwrap() - .as_primitive::() - .into_iter() + .iter::() .collect_vec(); assert_eq!(sparse_arrow, values); } diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index 514c1d64708..a68bea1e51d 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -4,11 +4,6 @@ use std::sync::{Arc, RwLock}; use itertools::Itertools; use linkme::distributed_slice; -use arrow_array::array::StructArray as ArrowStructArray; -use arrow_array::array::{Array as ArrowArray, ArrayRef as ArrowArrayRef}; -use arrow_schema::{Field, Fields}; - -use crate::arrow::aligned_iter::AlignedArrowArrayIterator; use crate::compress::EncodingCompression; use crate::dtype::{DType, FieldNames}; use crate::error::VortexResult; @@ -16,10 +11,7 @@ use crate::formatter::{ArrayDisplay, ArrayFormatter}; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stats, StatsCompute, StatsSet}; -use super::{ - check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef, - ENCODINGS, -}; +use super::{check_slice_bounds, Array, ArrayRef, Encoding, EncodingId, EncodingRef, ENCODINGS}; mod compress; mod compute; @@ -66,15 +58,6 @@ impl StructArray { panic!("dtype is not a struct") } } - - fn arrow_fields(&self) -> Fields { - self.names() - .iter() - .zip(self.field_dtypes()) - .map(|(name, dtype)| Field::new(name.as_str(), dtype.into(), dtype.is_nullable())) - .map(Arc::new) - .collect() - } } impl Array for StructArray { @@ -112,25 +95,6 @@ impl Array for StructArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - let fields = self.arrow_fields(); - Box::new( - AlignedArrowArrayIterator::new( - self.fields - .iter() - .map(|f| f.iter_arrow()) - .collect::>(), - ) - .map(move |items| { - Arc::new(ArrowStructArray::new( - fields.clone(), - items.into_iter().map(ArrowArrayRef::from).collect(), - None, - )) as Arc - }), - ) - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; diff --git a/vortex-array/src/array/typed/mod.rs b/vortex-array/src/array/typed/mod.rs index 06d6382902d..3c47f901aa1 100644 --- a/vortex-array/src/array/typed/mod.rs +++ b/vortex-array/src/array/typed/mod.rs @@ -1,10 +1,9 @@ use std::any::Any; use std::sync::{Arc, RwLock}; -use arrow_schema::DataType; use linkme::distributed_slice; -use crate::array::{Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef, ENCODINGS}; +use crate::array::{Array, ArrayRef, Encoding, EncodingId, EncodingRef, ENCODINGS}; use crate::compress::EncodingCompression; use crate::dtype::DType; use crate::error::VortexResult; @@ -75,16 +74,6 @@ impl Array for TypedArray { Stats::new(&self.stats, self) } - // TODO(robert): Have cast happen in enc space and not in arrow space - fn iter_arrow(&self) -> Box { - let datatype: DataType = self.dtype().into(); - Box::new( - self.array - .iter_arrow() - .map(move |arr| arrow_cast::cast(arr.as_ref(), &datatype).unwrap()), - ) - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { Ok(Self::new(self.array.slice(start, stop)?, self.dtype.clone()).boxed()) } @@ -144,15 +133,7 @@ impl ArrayDisplay for TypedArray { #[cfg(test)] mod test { - use std::iter; - - use arrow_array::cast::AsArray; - use arrow_array::types::Time64MicrosecondType; - use arrow_array::Time64MicrosecondArray; - use itertools::Itertools; - use crate::array::typed::TypedArray; - use crate::array::Array; use crate::composite_dtypes::{localtime, TimeUnit}; use crate::compute::scalar_at::scalar_at; use crate::dtype::{IntWidth, Nullability}; @@ -182,22 +163,4 @@ mod test { .into() ); } - - #[test] - pub fn iter() { - let dtype = localtime(TimeUnit::Us, IntWidth::_64, Nullability::NonNullable); - - let arr = TypedArray::new(vec![64_799_000_000_i64, 43_000_000_000].into(), dtype); - arr.iter_arrow() - .zip_eq(iter::once(Box::new(Time64MicrosecondArray::from(vec![ - 64_799_000_000i64, - 43_000_000_000, - ])))) - .for_each(|(enc, arrow)| { - assert_eq!( - *enc.as_primitive::().values(), - *arrow.values() - ) - }); - } } diff --git a/vortex-array/src/array/varbin/compute.rs b/vortex-array/src/array/varbin/compute.rs index a72e7042692..b00bbf85426 100644 --- a/vortex-array/src/array/varbin/compute.rs +++ b/vortex-array/src/array/varbin/compute.rs @@ -65,8 +65,7 @@ impl AsContiguousFn for VarBinArray { offsets.push(0); for a in arrays.iter().map(|a| a.as_varbin()) { let first_offset: u64 = a.first_offset()?; - // FIXME(ngates): cast to u64, or iterate over the offsets as any? - let offsets_array = flatten_primitive(a.offsets())?; + let offsets_array = flatten_primitive(cast(a.offsets(), &PType::U64.into())?.as_ref())?; let shift = offsets.last().copied().unwrap_or(0); offsets.extend( offsets_array diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index c50832a2a56..35d543eb3ef 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -1,12 +1,6 @@ use std::any::Any; -use std::iter; use std::sync::{Arc, RwLock}; -use arrow_array::array::{make_array, Array as ArrowArray}; -use arrow_array::cast::AsArray; -use arrow_array::types::UInt8Type; -use arrow_buffer::buffer::NullBuffer; -use arrow_data::ArrayData; use linkme::distributed_slice; use num_traits::{FromPrimitive, Unsigned}; @@ -15,11 +9,11 @@ use crate::array::downcast::DowncastArrayBuiltin; use crate::array::primitive::PrimitiveArray; use crate::array::varbin::values_iter::{VarBinIter, VarBinPrimitiveIter}; use crate::array::{ - check_slice_bounds, check_validity_buffer, Array, ArrayRef, ArrowIterator, Encoding, - EncodingId, EncodingRef, ENCODINGS, + check_slice_bounds, check_validity_buffer, Array, ArrayRef, Encoding, EncodingId, EncodingRef, + ENCODINGS, }; -use crate::arrow::CombineChunks; use crate::compress::EncodingCompression; +use crate::compute::flatten::flatten_primitive; use crate::compute::scalar_at::scalar_at; use crate::dtype::{DType, IntWidth, Nullability, Signedness}; use crate::error::{VortexError, VortexResult}; @@ -212,8 +206,9 @@ impl VarBinArray { let start = scalar_at(self.offsets(), index)?.try_into()?; let end = scalar_at(self.offsets(), index + 1)?.try_into()?; let sliced = self.bytes().slice(start, end)?; - let arr_ref = sliced.iter_arrow().combine_chunks(); - Ok(arr_ref.as_primitive::().values().to_vec()) + Ok(flatten_primitive(sliced.as_ref())? + .typed_data::() + .to_vec()) } } @@ -253,29 +248,6 @@ impl Array for VarBinArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - let offsets_data = self.offsets.iter_arrow().combine_chunks().into_data(); - let bytes_data = self.bytes.iter_arrow().combine_chunks().into_data(); - - let data = ArrayData::builder(self.dtype.clone().into()) - .len(self.len()) - .nulls(self.validity().map(|v| { - NullBuffer::new( - v.iter_arrow() - .combine_chunks() - .as_boolean() - .values() - .clone(), - ) - })) - .add_buffer(offsets_data.buffers()[0].to_owned()) - .add_buffer(bytes_data.buffers()[0].to_owned()) - .build() - .unwrap(); - - Box::new(iter::once(make_array(data))) - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; @@ -394,13 +366,9 @@ impl<'a> FromIterator> for VarBinArray { #[cfg(test)] mod test { - use arrow_array::array::GenericStringArray as ArrowStringArray; - use arrow_array::cast::AsArray; - use crate::array::primitive::PrimitiveArray; use crate::array::varbin::VarBinArray; use crate::array::Array; - use crate::arrow::CombineChunks; use crate::compute::scalar_at::scalar_at; use crate::dtype::{DType, Nullability}; @@ -439,19 +407,4 @@ mod test { Ok("hello world this is a long string".into()) ); } - - #[test] - pub fn iter() { - let binary_array = binary_array(); - assert_eq!( - binary_array - .iter_arrow() - .combine_chunks() - .as_string::(), - &ArrowStringArray::::from(vec![ - "hello world", - "hello world this is a long string", - ]) - ); - } } diff --git a/vortex-array/src/array/varbin/values_iter.rs b/vortex-array/src/array/varbin/values_iter.rs index 5c61055af0f..6c8595a7b5a 100644 --- a/vortex-array/src/array/varbin/values_iter.rs +++ b/vortex-array/src/array/varbin/values_iter.rs @@ -1,10 +1,8 @@ use crate::array::primitive::PrimitiveArray; use crate::array::Array; -use crate::arrow::CombineChunks; +use crate::compute::flatten::flatten_primitive; use crate::compute::scalar_at::scalar_at; use crate::match_each_native_ptype; -use arrow_array::cast::AsArray; -use arrow_array::types::UInt8Type; use num_traits::AsPrimitive; #[derive(Debug)] @@ -84,16 +82,12 @@ impl<'a> Iterator for VarBinIter<'a> { .try_into() .unwrap(); let slice_bytes = self.bytes.slice(self.last_offset, next_offset).unwrap(); + let slice_bytes = flatten_primitive(slice_bytes.as_ref()) + .unwrap() + .typed_data::() + .to_vec(); self.last_offset = next_offset; self.idx += 1; - // TODO(robert): iter as primitive vs arrow - Some( - slice_bytes - .iter_arrow() - .combine_chunks() - .as_primitive::() - .values() - .to_vec(), - ) + Some(slice_bytes) } } diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index 21bd98405bd..4cdac253570 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -1,22 +1,14 @@ -mod compute; -mod serde; - use std::any::Any; -use std::str::from_utf8_unchecked; +use std::mem; use std::sync::{Arc, RwLock}; -use std::{iter, mem}; -use arrow_array::array::ArrayRef as ArrowArrayRef; -use arrow_array::builder::{BinaryBuilder, StringBuilder}; -use arrow_array::cast::AsArray; -use arrow_array::types::UInt8Type; use linkme::distributed_slice; use crate::array::{ - check_slice_bounds, check_validity_buffer, Array, ArrayRef, ArrowIterator, Encoding, - EncodingId, EncodingRef, ENCODINGS, + check_slice_bounds, check_validity_buffer, Array, ArrayRef, Encoding, EncodingId, EncodingRef, + ENCODINGS, }; -use crate::arrow::CombineChunks; +use crate::compute::flatten::flatten_primitive; use crate::compute::scalar_at::scalar_at; use crate::dtype::{DType, IntWidth, Nullability, Signedness}; use crate::error::{VortexError, VortexResult}; @@ -24,6 +16,9 @@ use crate::formatter::{ArrayDisplay, ArrayFormatter}; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stats, StatsSet}; +mod compute; +mod serde; + #[derive(Clone, Copy)] #[repr(C, align(8))] struct Inlined { @@ -158,15 +153,14 @@ impl VarBinViewArray { } pub(self) fn view_at(&self, index: usize) -> BinaryView { - let view_slice = self - .views - .slice(index * VIEW_SIZE, (index + 1) * VIEW_SIZE) - .unwrap() - .iter_arrow() - .next() - .unwrap(); - let view_vec: &[u8] = view_slice.as_primitive::().values(); - BinaryView::from_le_bytes(view_vec.try_into().unwrap()) + let view_vec = flatten_primitive( + self.views + .slice(index * VIEW_SIZE, (index + 1) * VIEW_SIZE) + .unwrap() + .as_ref(), + ) + .unwrap(); + BinaryView::from_le_bytes(view_vec.typed_data::().try_into().unwrap()) } #[inline] @@ -188,21 +182,18 @@ impl VarBinViewArray { let view = self.view_at(index); unsafe { if view.inlined.size > 12 { - let arrow_data_buffer = self - .data - .get(view._ref.buffer_index as usize) - .unwrap() - .slice( - view._ref.offset as usize, - (view._ref.size + view._ref.offset) as usize, - )? - .iter_arrow() - .combine_chunks(); - - Ok(arrow_data_buffer - .as_primitive::() - .values() - .to_vec()) + let arrow_data_buffer = flatten_primitive( + self.data + .get(view._ref.buffer_index as usize) + .unwrap() + .slice( + view._ref.offset as usize, + (view._ref.size + view._ref.offset) as usize, + )? + .as_ref(), + )?; + // TODO(ngates): can we avoid returning a copy? + Ok(arrow_data_buffer.typed_data::().to_vec()) } else { Ok(view.inlined.data[..view.inlined.size as usize].to_vec()) } @@ -246,35 +237,6 @@ impl Array for VarBinViewArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - let data_arr: ArrowArrayRef = if matches!(self.dtype, DType::Utf8(_)) { - let mut data_buf = StringBuilder::with_capacity(self.len(), self.plain_size()); - for i in 0..self.views.len() / VIEW_SIZE { - if !self.is_valid(i) { - data_buf.append_null() - } else { - unsafe { - data_buf.append_value(from_utf8_unchecked( - self.bytes_at(i).unwrap().as_slice(), - )); - } - } - } - Arc::new(data_buf.finish()) - } else { - let mut data_buf = BinaryBuilder::with_capacity(self.len(), self.plain_size()); - for i in 0..self.views.len() / VIEW_SIZE { - if !self.is_valid(i) { - data_buf.append_null() - } else { - data_buf.append_value(self.bytes_at(i).unwrap()) - } - } - Arc::new(data_buf.finish()) - }; - Box::new(iter::once(data_arr)) - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; @@ -344,8 +306,6 @@ impl ArrayDisplay for VarBinViewArray { #[cfg(test)] mod test { - use arrow_array::array::GenericStringArray as ArrowStringArray; - use crate::array::primitive::PrimitiveArray; use super::*; @@ -397,19 +357,4 @@ mod test { Ok("hello world this is a long string".into()) ); } - - #[test] - pub fn iter() { - let binary_array = binary_array(); - assert_eq!( - binary_array - .iter_arrow() - .combine_chunks() - .as_string::(), - &ArrowStringArray::::from(vec![ - "hello world", - "hello world this is a long string", - ]) - ); - } } diff --git a/vortex-array/src/arrow/aligned_iter.rs b/vortex-array/src/arrow/aligned_iter.rs deleted file mode 100644 index 34d4342a848..00000000000 --- a/vortex-array/src/arrow/aligned_iter.rs +++ /dev/null @@ -1,88 +0,0 @@ -use arrow_array::array::{Array as ArrowArray, ArrayRef}; - -pub struct AlignedArray { - iter: Box>, - current_chunk: Option, - offset: usize, -} - -impl AlignedArray { - pub fn new(mut iter: Box>) -> Self { - let current_chunk = iter.next(); - Self { - iter, - current_chunk, - offset: 0, - } - } - - pub fn length(&self) -> usize { - self.current_chunk.as_ref().unwrap().len() - self.offset - } -} - -pub struct AlignedArrowArrayIterator { - items: Vec, -} - -impl AlignedArrowArrayIterator { - pub fn new(iterators: Vec>>) -> Self { - let items = iterators.into_iter().map(AlignedArray::new).collect(); - Self { items } - } -} - -impl Iterator for AlignedArrowArrayIterator { - type Item = Vec; - - fn next(&mut self) -> Option { - let missing_chunks: usize = self - .items - .iter_mut() - .map(|v| { - if v.length() == 0 { - v.current_chunk = v.iter.next(); - v.offset = 0; - if v.current_chunk.is_none() { - 1 - } else { - 0 - } - } else { - 0 - } - }) - .sum(); - - if missing_chunks == self.items.len() { - return None; - } else if missing_chunks > 0 { - panic!( - "Misaligned arrays, {} arrays didn't return a next chunk", - missing_chunks - ); - } - - let smallest_chunk = self.items.iter().map(|v| v.length()).min().unwrap(); - - Some( - self.items - .iter_mut() - .map(|v| { - let len = v.length(); - let offset = v.offset; - v.offset += smallest_chunk; - - if len == smallest_chunk { - v.current_chunk.clone().unwrap() - } else { - v.current_chunk - .as_ref() - .unwrap() - .slice(offset, smallest_chunk) - } - }) - .collect::>(), - ) - } -} diff --git a/vortex-array/src/arrow/convert.rs b/vortex-array/src/arrow/convert.rs index ff3b60ec333..f0e3b3f0b2f 100644 --- a/vortex-array/src/arrow/convert.rs +++ b/vortex-array/src/arrow/convert.rs @@ -1,14 +1,12 @@ -use std::iter::zip; use std::sync::Arc; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, FieldRef, SchemaRef, TimeUnit as ArrowTimeUnit}; +use arrow_schema::{DataType, Field, SchemaRef, TimeUnit as ArrowTimeUnit}; +use itertools::Itertools; use crate::array::struct_::StructArray; use crate::array::{Array, ArrayRef}; -use crate::composite_dtypes::{ - localdate, localtime, map, zoneddatetime, TimeUnit, TimeUnitSerializer, -}; +use crate::composite_dtypes::{localdate, localtime, zoneddatetime, TimeUnit}; use crate::compute::cast::cast; use crate::dtype::DType::*; use crate::dtype::{DType, FloatWidth, IntWidth, Nullability}; @@ -35,7 +33,7 @@ impl From for ArrayRef { // The dtype of the child arrays infer their nullability from the array itself. // In case the schema says something different, we cast into the schema's dtype. let vortex_array = ArrayRef::from_arrow(array.clone(), field.is_nullable()); - cast(vortex_array.as_ref(), &field.try_into().unwrap()).unwrap() + cast(vortex_array.as_ref(), &field.as_ref().into()).unwrap() }) .collect(), ) @@ -56,8 +54,8 @@ impl TryFrom for DType { value .fields() .iter() - .map(|f| f.data_type().try_into_dtype(f.is_nullable())) - .collect::>>()?, + .map(|f| f.as_ref().into()) + .collect_vec(), )) } } @@ -89,76 +87,47 @@ impl TryFrom<&DataType> for PType { } } -pub trait TryIntoDType { - fn try_into_dtype(self, is_nullable: bool) -> VortexResult; -} - -impl TryIntoDType for &DataType { - fn try_into_dtype(self, is_nullable: bool) -> VortexResult { +impl From<&Field> for DType { + fn from(field: &Field) -> Self { use crate::dtype::Signedness::*; - let nullability: Nullability = is_nullable.into(); - - match self { - DataType::Null => Ok(Null), - DataType::Boolean => Ok(Bool(nullability)), - DataType::Int8 => Ok(Int(IntWidth::_8, Signed, nullability)), - DataType::Int16 => Ok(Int(IntWidth::_16, Signed, nullability)), - DataType::Int32 => Ok(Int(IntWidth::_32, Signed, nullability)), - DataType::Int64 => Ok(Int(IntWidth::_64, Signed, nullability)), - DataType::UInt8 => Ok(Int(IntWidth::_8, Unsigned, nullability)), - DataType::UInt16 => Ok(Int(IntWidth::_16, Unsigned, nullability)), - DataType::UInt32 => Ok(Int(IntWidth::_32, Unsigned, nullability)), - DataType::UInt64 => Ok(Int(IntWidth::_64, Unsigned, nullability)), - DataType::Float16 => Ok(Float(FloatWidth::_16, nullability)), - DataType::Float32 => Ok(Float(FloatWidth::_32, nullability)), - DataType::Float64 => Ok(Float(FloatWidth::_64, nullability)), - DataType::Utf8 | DataType::LargeUtf8 => Ok(Utf8(nullability)), - DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => { - Ok(Binary(nullability)) - } + let nullability: Nullability = field.is_nullable().into(); + + match field.data_type() { + DataType::Null => Null, + DataType::Boolean => Bool(nullability), + DataType::Int8 => Int(IntWidth::_8, Signed, nullability), + DataType::Int16 => Int(IntWidth::_16, Signed, nullability), + DataType::Int32 => Int(IntWidth::_32, Signed, nullability), + DataType::Int64 => Int(IntWidth::_64, Signed, nullability), + DataType::UInt8 => Int(IntWidth::_8, Unsigned, nullability), + DataType::UInt16 => Int(IntWidth::_16, Unsigned, nullability), + DataType::UInt32 => Int(IntWidth::_32, Unsigned, nullability), + DataType::UInt64 => Int(IntWidth::_64, Unsigned, nullability), + DataType::Float16 => Float(FloatWidth::_16, nullability), + DataType::Float32 => Float(FloatWidth::_32, nullability), + DataType::Float64 => Float(FloatWidth::_64, nullability), + DataType::Utf8 | DataType::LargeUtf8 => Utf8(nullability), + DataType::Binary | DataType::LargeBinary => Binary(nullability), // TODO(robert): what to do about this timezone? - DataType::Timestamp(u, _) => Ok(zoneddatetime(u.into(), nullability)), - DataType::Date32 => Ok(localdate(IntWidth::_32, nullability)), - DataType::Date64 => Ok(localdate(IntWidth::_64, nullability)), - DataType::Time32(u) => Ok(localtime(u.into(), IntWidth::_32, nullability)), - DataType::Time64(u) => Ok(localtime(u.into(), IntWidth::_64, nullability)), - DataType::List(e) | DataType::FixedSizeList(e, _) | DataType::LargeList(e) => { - Ok(List(Box::new(e.try_into()?), nullability)) + DataType::Timestamp(u, _) => zoneddatetime(u.into(), nullability), + DataType::Date32 => localdate(IntWidth::_32, nullability), + DataType::Date64 => localdate(IntWidth::_64, nullability), + DataType::Time32(u) => localtime(u.into(), IntWidth::_32, nullability), + DataType::Time64(u) => localtime(u.into(), IntWidth::_64, nullability), + DataType::List(e) | DataType::LargeList(e) => { + List(Box::new(e.as_ref().into()), nullability) } - DataType::Struct(f) => Ok(Struct( + DataType::Struct(f) => Struct( f.iter().map(|f| Arc::new(f.name().clone())).collect(), - f.iter() - .map(|f| f.data_type().try_into_dtype(f.is_nullable())) - .collect::>>()?, - )), - DataType::Dictionary(_, v) => v.as_ref().try_into_dtype(is_nullable), - DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => { - Ok(Decimal(*p, *s, nullability)) - } - DataType::Map(e, _) => match e.data_type() { - DataType::Struct(f) => Ok(map( - f.first().unwrap().try_into()?, - f.get(1).unwrap().try_into()?, - )), - _ => Err(VortexError::InvalidArrowDataType(e.data_type().clone())), - }, - DataType::RunEndEncoded(_, v) => v.try_into(), - DataType::Duration(_) | DataType::Interval(_) | DataType::Union(_, _) => { - Err(VortexError::InvalidArrowDataType(self.clone())) - } + f.iter().map(|f| f.as_ref().into()).collect_vec(), + ), + DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => Decimal(*p, *s, nullability), + _ => unimplemented!("Arrow data type not yet supported: {:?}", field.data_type()), } } } -impl TryFrom<&FieldRef> for DType { - type Error = VortexError; - - fn try_from(value: &FieldRef) -> VortexResult { - value.data_type().try_into_dtype(value.is_nullable()) - } -} - impl From<&ArrowTimeUnit> for TimeUnit { fn from(value: &ArrowTimeUnit) -> Self { match value { @@ -170,94 +139,6 @@ impl From<&ArrowTimeUnit> for TimeUnit { } } -impl From for DataType { - fn from(value: DType) -> Self { - (&value).into() - } -} - -// TODO(ngates): we probably want to implement this for an arrow Field not a DataType? -impl From<&DType> for DataType { - fn from(value: &DType) -> Self { - use crate::dtype::Signedness::*; - match value { - Null => DataType::Null, - Bool(_) => DataType::Boolean, - Int(w, s, _) => match w { - IntWidth::Unknown => match s { - Unknown => DataType::Int64, - Unsigned => DataType::UInt64, - Signed => DataType::Int64, - }, - IntWidth::_8 => match s { - Unknown => DataType::Int8, - Unsigned => DataType::UInt8, - Signed => DataType::Int8, - }, - IntWidth::_16 => match s { - Unknown => DataType::Int16, - Unsigned => DataType::UInt16, - Signed => DataType::Int16, - }, - IntWidth::_32 => match s { - Unknown => DataType::Int32, - Unsigned => DataType::UInt32, - Signed => DataType::Int32, - }, - IntWidth::_64 => match s { - Unknown => DataType::Int64, - Unsigned => DataType::UInt64, - Signed => DataType::Int64, - }, - }, - Decimal(p, w, _) => DataType::Decimal128(*p, *w), - Float(w, _) => match w { - FloatWidth::Unknown => DataType::Float64, - FloatWidth::_16 => DataType::Float16, - FloatWidth::_32 => DataType::Float32, - FloatWidth::_64 => DataType::Float64, - }, - Utf8(_) => DataType::Utf8, - Binary(_) => DataType::Binary, - Struct(names, dtypes) => DataType::Struct( - zip(names, dtypes) - .map(|(n, dt)| Field::new((**n).clone(), dt.into(), dt.is_nullable())) - .collect(), - ), - List(c, _) => DataType::List(Arc::new(Field::new( - "element", - c.as_ref().into(), - c.is_nullable(), - ))), - Composite(n, d, m) => match n.as_str() { - "instant" => DataType::Timestamp(TimeUnitSerializer::deserialize(m).into(), None), - "localtime" => match d.as_ref() { - Int(IntWidth::_32, _, _) => { - DataType::Time32(TimeUnitSerializer::deserialize(m).into()) - } - Int(IntWidth::_64, _, _) => { - DataType::Time64(TimeUnitSerializer::deserialize(m).into()) - } - _ => panic!("unexpected storage type"), - }, - "localdate" => match d.as_ref() { - Int(IntWidth::_32, _, _) => DataType::Date32, - Int(IntWidth::_64, _, _) => DataType::Date64, - _ => panic!("unexpected storage type"), - }, - "zoneddatetime" => { - DataType::Timestamp(TimeUnitSerializer::deserialize(m).into(), None) - } - "map" => DataType::Map( - Arc::new(Field::new("entries", d.as_ref().into(), false)), - false, - ), - _ => panic!("unknown composite type"), - }, - } - } -} - impl From for ArrowTimeUnit { fn from(value: TimeUnit) -> Self { match value { @@ -268,17 +149,3 @@ impl From for ArrowTimeUnit { } } } - -#[cfg(test)] -mod tests { - use crate::dtype::*; - - use super::*; - - #[test] - fn test_dtype_to_datatype() { - let dtype = Int(IntWidth::_32, Signedness::Signed, Nullability::Nullable); - let data_type: DataType = dtype.into(); - assert_eq!(data_type, DataType::Int32); - } -} diff --git a/vortex-array/src/arrow/mod.rs b/vortex-array/src/arrow/mod.rs index a5fb1aa24d0..ad0c08b3d8f 100644 --- a/vortex-array/src/arrow/mod.rs +++ b/vortex-array/src/arrow/mod.rs @@ -1,49 +1,2 @@ -use arrow_array::array::ArrayRef; -use arrow_select::concat::concat; -use itertools::Itertools; - -use crate::array::ArrowIterator; - -pub mod aligned_iter; pub mod convert; pub mod wrappers; - -pub trait CombineChunks { - fn combine_chunks(self) -> ArrayRef; -} - -impl CombineChunks for Box { - fn combine_chunks(self) -> ArrayRef { - let chunks = self.collect_vec(); - let chunk_refs = chunks.iter().map(|a| a.as_ref()).collect_vec(); - concat(&chunk_refs).unwrap() - } -} - -#[macro_export] -macro_rules! match_arrow_numeric_type { - ($self:expr, | $_:tt $enc:ident | $($body:tt)*) => ({ - macro_rules! __with__ {( $_ $enc:ident ) => ( $($body)* )} - use $crate::dtype::DType::*; - use $crate::dtype::IntWidth::*; - use $crate::dtype::Signedness::*; - use $crate::dtype::FloatWidth; - use arrow_array::types::*; - match $self { - Int(_8, Unsigned, _) => __with__! {UInt8Type}, - Int(_16, Unsigned, _) => __with__!{UInt16Type}, - Int(_32, Unsigned, _) => __with__!{UInt32Type}, - Int(_64, Unsigned, _) => __with__!{UInt64Type}, - Int(_8, Signed, _) => __with__! {Int8Type}, - Int(_16, Signed, _) => __with__!{Int16Type}, - Int(_32, Signed, _) => __with__!{Int32Type}, - Int(_64, Signed, _) => __with__!{Int64Type}, - Float(FloatWidth::_16, _) => __with__!{Float16Type}, - Float(FloatWidth::_32, _) => __with__!{Float32Type}, - Float(FloatWidth::_64, _) => __with__!{Float64Type}, - _ => unimplemented!("Convert this DType to ArrowPrimitiveType") - } - }) -} - -pub use match_arrow_numeric_type; diff --git a/vortex-array/src/composite_dtypes.rs b/vortex-array/src/composite_dtypes.rs index cf3547d4ea1..08ab92de9f9 100644 --- a/vortex-array/src/composite_dtypes.rs +++ b/vortex-array/src/composite_dtypes.rs @@ -88,16 +88,3 @@ pub fn zoneddatetime(unit: TimeUnit, nullability: Nullability) -> DType { TimeUnitSerializer::serialize(unit), ) } - -const MAP_DTYPE: &str = "map"; - -pub fn map(key_type: DType, value_type: DType) -> DType { - DType::Composite( - Arc::new(MAP_DTYPE.to_string()), - Box::new(DType::Struct( - vec![Arc::new("key".to_string()), Arc::new("value".to_string())], - vec![key_type, value_type], - )), - vec![], - ) -} diff --git a/vortex-array/src/encode.rs b/vortex-array/src/encode.rs index abcec43ec5b..634e290ca92 100644 --- a/vortex-array/src/encode.rs +++ b/vortex-array/src/encode.rs @@ -19,7 +19,7 @@ use arrow_array::types::{ }; use arrow_buffer::buffer::{NullBuffer, OffsetBuffer}; use arrow_buffer::Buffer; -use arrow_schema::{DataType, TimeUnit}; +use arrow_schema::{DataType, Field, TimeUnit}; use crate::array::bool::BoolArray; use crate::array::constant::ConstantArray; @@ -28,7 +28,7 @@ use crate::array::struct_::StructArray; use crate::array::typed::TypedArray; use crate::array::varbin::VarBinArray; use crate::array::{Array, ArrayRef}; -use crate::arrow::convert::TryIntoDType; +use crate::dtype::DType; use crate::ptype::PType; use crate::scalar::NullScalar; @@ -67,17 +67,22 @@ impl FromArrow<&ArrowPrimitiveArray> for ArrayRef { if T::DATA_TYPE.is_numeric() { arr } else { - TypedArray::new(arr, T::DATA_TYPE.try_into_dtype(nullable).unwrap()).boxed() + TypedArray::new(arr, (&Field::new("_", T::DATA_TYPE, false)).into()).boxed() } } } impl FromArrow<&GenericByteArray> for ArrayRef { fn from_arrow(value: &GenericByteArray, nullable: bool) -> Self { + let dtype = match T::DATA_TYPE { + DataType::Binary | DataType::LargeBinary => DType::Binary(nullable.into()), + DataType::Utf8 | DataType::LargeUtf8 => DType::Utf8(nullable.into()), + _ => panic!("Invalid data type for ByteArray"), + }; VarBinArray::new( value.offsets().into(), value.values().into(), - T::DATA_TYPE.try_into_dtype(value.is_nullable()).unwrap(), + dtype, nulls(value.nulls(), nullable, value.len()), ) .boxed() diff --git a/vortex-array/src/error.rs b/vortex-array/src/error.rs index c964b82f185..391a7b35ffb 100644 --- a/vortex-array/src/error.rs +++ b/vortex-array/src/error.rs @@ -87,6 +87,12 @@ pub enum VortexError { pub type VortexResult = Result; +impl From<&str> for VortexError { + fn from(value: &str) -> Self { + VortexError::InvalidArgument(value.to_string().into()) + } +} + macro_rules! wrapped_error { ($E:ty, $e:ident) => { #[derive(Debug)] diff --git a/vortex-array/src/iterator.rs b/vortex-array/src/iterator.rs new file mode 100644 index 00000000000..25d6469cd0b --- /dev/null +++ b/vortex-array/src/iterator.rs @@ -0,0 +1,56 @@ +use crate::accessor::ArrayAccessor; +use std::marker::PhantomData; + +pub struct ArrayIter, T> { + array: A, + current: usize, + end: usize, + phantom: PhantomData, +} + +impl, T> ArrayIter { + pub fn new(array: A) -> Self { + let len = array.len(); + ArrayIter { + array, + current: 0, + end: len, + phantom: PhantomData, + } + } +} + +impl, T> Iterator for ArrayIter { + type Item = Option; + + #[inline] + fn next(&mut self) -> Option { + if self.current == self.end { + None + } else { + let old = self.current; + self.current += 1; + Some(self.array.value(old)) + } + } + + fn size_hint(&self) -> (usize, Option) { + ( + self.array.len() - self.current, + Some(self.array.len() - self.current), + ) + } +} + +impl, T> DoubleEndedIterator for ArrayIter { + fn next_back(&mut self) -> Option { + if self.end == self.current { + None + } else { + self.end -= 1; + Some(self.array.value(self.end)) + } + } +} + +impl, T> ExactSizeIterator for ArrayIter {} diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 8d2f7200913..fa84743448c 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -2,6 +2,7 @@ pub mod array; pub mod arrow; pub mod scalar; +pub mod accessor; pub mod composite_dtypes; pub mod compress; pub mod compute; @@ -9,6 +10,7 @@ pub mod dtype; pub mod encode; pub mod error; pub mod formatter; +pub mod iterator; pub mod ptype; mod sampling; pub mod serde; diff --git a/vortex-dict/src/dict.rs b/vortex-dict/src/dict.rs index 3ca7cd1a617..ff09dab4d8f 100644 --- a/vortex-dict/src/dict.rs +++ b/vortex-dict/src/dict.rs @@ -1,7 +1,7 @@ use std::any::Any; use std::sync::{Arc, RwLock}; -use vortex::array::{check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId}; +use vortex::array::{check_slice_bounds, Array, ArrayRef, Encoding, EncodingId}; use vortex::compress::EncodingCompression; use vortex::dtype::{DType, Signedness}; use vortex::error::{VortexError, VortexResult}; @@ -72,10 +72,6 @@ impl Array for DictArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - todo!() - } - // TODO(robert): Add function to trim the dictionary fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; diff --git a/vortex-fastlanes/src/bitpacking/mod.rs b/vortex-fastlanes/src/bitpacking/mod.rs index e238b21a993..b54d43ab620 100644 --- a/vortex-fastlanes/src/bitpacking/mod.rs +++ b/vortex-fastlanes/src/bitpacking/mod.rs @@ -1,9 +1,7 @@ use std::any::Any; use std::sync::{Arc, RwLock}; -use vortex::array::{ - check_validity_buffer, Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef, -}; +use vortex::array::{check_validity_buffer, Array, ArrayRef, Encoding, EncodingId, EncodingRef}; use vortex::compress::EncodingCompression; use vortex::compute::scalar_at::scalar_at; use vortex::compute::ArrayCompute; @@ -115,10 +113,6 @@ impl Array for BitPackedArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - todo!() - } - fn slice(&self, _start: usize, _stop: usize) -> VortexResult { unimplemented!("BitPackedArray::slice") } diff --git a/vortex-fastlanes/src/for/mod.rs b/vortex-fastlanes/src/for/mod.rs index 443ba4ddfd0..f69922199ea 100644 --- a/vortex-fastlanes/src/for/mod.rs +++ b/vortex-fastlanes/src/for/mod.rs @@ -1,7 +1,7 @@ use std::any::Any; use std::sync::{Arc, RwLock}; -use vortex::array::{Array, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef}; +use vortex::array::{Array, ArrayRef, Encoding, EncodingId, EncodingRef}; use vortex::compress::EncodingCompression; use vortex::compute::ArrayCompute; use vortex::dtype::DType; @@ -85,10 +85,6 @@ impl Array for FoRArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - todo!() - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { Ok(Self { child: self.child.slice(start, stop)?, diff --git a/vortex-ree/src/compress.rs b/vortex-ree/src/compress.rs index 112e7921cc3..f02ef538877 100644 --- a/vortex-ree/src/compress.rs +++ b/vortex-ree/src/compress.rs @@ -111,7 +111,6 @@ fn ree_encode_primitive(elements: &[T]) -> (Vec, Vec) { (ends, values) } -#[allow(dead_code)] pub fn ree_decode( ends: &PrimitiveArray, values: &PrimitiveArray, diff --git a/vortex-ree/src/compute.rs b/vortex-ree/src/compute.rs index c77272e547e..b59e25a1eba 100644 --- a/vortex-ree/src/compute.rs +++ b/vortex-ree/src/compute.rs @@ -1,16 +1,51 @@ +use std::cmp::min; +use vortex::array::primitive::PrimitiveArray; +use vortex::array::{Array, CloneOptionalArray}; +use vortex::compute::cast::cast; +use vortex::compute::flatten::{flatten, flatten_primitive, FlattenFn, FlattenedArray}; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::ArrayCompute; -use vortex::error::VortexResult; +use vortex::error::{VortexError, VortexResult}; +use vortex::ptype::PType; use vortex::scalar::Scalar; +use crate::compress::ree_decode; use crate::REEArray; impl ArrayCompute for REEArray { + fn flatten(&self) -> Option<&dyn FlattenFn> { + Some(self) + } + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } } +impl FlattenFn for REEArray { + fn flatten(&self) -> VortexResult { + let ends: PrimitiveArray = + flatten_primitive(cast(self.ends(), &PType::U64.into())?.as_ref())? + .typed_data::() + .iter() + .map(|v| v - self.offset() as u64) + .map(|v| min(v, self.len() as u64)) + .take_while(|v| *v <= (self.len() as u64)) + .collect::>() + .into(); + + let values = flatten(self.values())?; + if let FlattenedArray::Primitive(pvalues) = values { + ree_decode(&ends, &pvalues, self.validity().clone_optional()) + .map(FlattenedArray::Primitive) + } else { + Err(VortexError::InvalidArgument( + "Cannot yet flatten non-primitive REE array".into(), + )) + } + } +} + impl ScalarAtFn for REEArray { fn scalar_at(&self, index: usize) -> VortexResult { scalar_at(self.values(), self.find_physical_index(index)?) diff --git a/vortex-ree/src/ree.rs b/vortex-ree/src/ree.rs index a40214e0a3a..f2fe6a9dcb0 100644 --- a/vortex-ree/src/ree.rs +++ b/vortex-ree/src/ree.rs @@ -1,31 +1,20 @@ use std::any::Any; -use std::cmp::min; -use std::marker::PhantomData; use std::sync::{Arc, RwLock}; -use arrow_array::array::ArrowPrimitiveType; -use arrow_array::array::{Array as ArrowArray, ArrayRef as ArrowArrayRef}; -use arrow_array::cast::AsArray; -use num_traits::AsPrimitive; - -use vortex::array::primitive::PrimitiveArray; use vortex::array::{ - check_slice_bounds, check_validity_buffer, Array, ArrayKind, ArrayRef, ArrowIterator, - CloneOptionalArray, Encoding, EncodingId, EncodingRef, + check_slice_bounds, check_validity_buffer, Array, ArrayKind, ArrayRef, CloneOptionalArray, + Encoding, EncodingId, EncodingRef, }; -use vortex::arrow::match_arrow_numeric_type; use vortex::compress::EncodingCompression; use vortex::compute; -use vortex::compute::scalar_at::scalar_at; use vortex::compute::search_sorted::SearchSortedSide; -use vortex::dtype::{DType, Nullability, Signedness}; +use vortex::dtype::DType; use vortex::error::{VortexError, VortexResult}; use vortex::formatter::{ArrayDisplay, ArrayFormatter}; -use vortex::ptype::NativePType; use vortex::serde::{ArraySerde, EncodingSerde}; use vortex::stats::{Stat, Stats, StatsCompute, StatsSet}; -use crate::compress::{ree_decode_primitive, ree_encode}; +use crate::compress::ree_encode; #[derive(Debug, Clone)] pub struct REEArray { @@ -55,13 +44,6 @@ impl REEArray { ) -> VortexResult { check_validity_buffer(validity.as_deref(), length)?; - if !matches!( - ends.dtype(), - DType::Int(_, Signedness::Unsigned, Nullability::NonNullable) - ) { - return Err(VortexError::InvalidDType(ends.dtype().clone())); - } - if !ends .stats() .get_as::(&Stat::IsStrictSorted) @@ -70,8 +52,7 @@ impl REEArray { return Err(VortexError::IndexArrayMustBeStrictSorted); } - // see https://github.com/fulcrum-so/spiral/issues/873 - // let length = run_ends_logical_length(&ends); + // TODO(ngates): https://github.com/fulcrum-so/spiral/issues/873 Ok(Self { ends, values, @@ -106,6 +87,11 @@ impl REEArray { } } + #[inline] + pub fn offset(&self) -> usize { + self.offset + } + #[inline] pub fn ends(&self) -> &dyn Array { self.ends.as_ref() @@ -158,30 +144,6 @@ impl Array for REEArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - let ends: Vec = self - .ends - .iter_arrow() - .flat_map(|c| { - match_arrow_numeric_type!(self.ends.dtype(), |$E| { - let ends = c.as_primitive::<$E>() - .values() - .iter() - .map(|v| AsPrimitive::::as_(*v)) - .map(|v| v - self.offset as u64) - .map(|v| min(v, self.length as u64)) - .take_while(|v| *v <= (self.length as u64)) - .collect::>(); - ends.into_iter() - }) - }) - .collect(); - - match_arrow_numeric_type!(self.values.dtype(), |$N| { - Box::new(REEArrowIterator::<$N>::new(ends, self.values.iter_arrow())) - }) - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; let slice_begin = self.find_physical_index(start)?; @@ -253,67 +215,10 @@ impl ArrayDisplay for REEArray { } } -pub struct REEArrowIterator -where - T::Native: NativePType, -{ - ends: Vec, - values: Box, - current_idx: usize, - _marker: PhantomData, -} - -impl REEArrowIterator -where - T::Native: NativePType, -{ - pub fn new(ends: Vec, values: Box) -> Self { - Self { - ends, - values, - current_idx: 0, - _marker: PhantomData, - } - } -} - -impl Iterator for REEArrowIterator -where - T::Native: NativePType, -{ - type Item = ArrowArrayRef; - - fn next(&mut self) -> Option { - self.values.next().and_then(|vs| { - let batch_ends = &self.ends[self.current_idx..self.current_idx + vs.len()]; - self.current_idx += vs.len(); - let decoded = - ree_decode_primitive(batch_ends, vs.as_primitive::().values().as_ref()); - // TODO(ngates): avoid going back into PrimitiveArray? - PrimitiveArray::from(decoded).iter_arrow().next() - }) - } -} - -/// Gets the logical end from the ends array. -#[allow(dead_code)] -fn run_ends_logical_length>(ends: &T) -> usize { - if ends.as_ref().is_empty() { - 0 - } else { - scalar_at(ends.as_ref(), ends.as_ref().len() - 1) - .and_then(|end| end.try_into()) - .unwrap_or_else(|_| panic!("Couldn't convert ends to usize")) - } -} - #[cfg(test)] mod test { - use arrow_array::cast::AsArray; - use arrow_array::types::Int32Type; - use itertools::Itertools; - use vortex::array::Array; + use vortex::compute::flatten::flatten_primitive; use vortex::compute::scalar_at::scalar_at; use vortex::dtype::{DType, IntWidth, Nullability, Signedness}; @@ -321,7 +226,7 @@ mod test { #[test] fn new() { - let arr = REEArray::new(vec![2u32, 5, 10].into(), vec![1, 2, 3].into(), None, 10); + let arr = REEArray::new(vec![2u32, 5, 10].into(), vec![1i32, 2, 3].into(), None, 10); assert_eq!(arr.len(), 10); assert_eq!( arr.dtype(), @@ -339,7 +244,7 @@ mod test { #[test] fn slice() { - let arr = REEArray::new(vec![2u32, 5, 10].into(), vec![1, 2, 3].into(), None, 10) + let arr = REEArray::new(vec![2u32, 5, 10].into(), vec![1i32, 2, 3].into(), None, 10) .slice(3, 8) .unwrap(); assert_eq!( @@ -348,20 +253,18 @@ mod test { ); assert_eq!(arr.len(), 5); - arr.iter_arrow() - .zip_eq([vec![2, 2, 3, 3, 3]]) - .for_each(|(from_iter, orig)| { - assert_eq!(*from_iter.as_primitive::().values(), orig); - }); + assert_eq!( + flatten_primitive(arr.as_ref()).unwrap().typed_data::(), + vec![2, 2, 3, 3, 3] + ); } #[test] - fn iter_arrow() { - let arr = REEArray::new(vec![2u32, 5, 10].into(), vec![1, 2, 3].into(), None, 10); - arr.iter_arrow() - .zip_eq([vec![1, 1, 2, 2, 2, 3, 3, 3, 3, 3]]) - .for_each(|(from_iter, orig)| { - assert_eq!(*from_iter.as_primitive::().values(), orig); - }); + fn flatten() { + let arr = REEArray::new(vec![2u32, 5, 10].into(), vec![1i32, 2, 3].into(), None, 10); + assert_eq!( + flatten_primitive(arr.as_ref()).unwrap().typed_data::(), + vec![1, 1, 2, 2, 2, 3, 3, 3, 3, 3] + ); } } diff --git a/vortex-roaring/src/boolean/mod.rs b/vortex-roaring/src/boolean/mod.rs index 822433488ff..99230571036 100644 --- a/vortex-roaring/src/boolean/mod.rs +++ b/vortex-roaring/src/boolean/mod.rs @@ -5,8 +5,7 @@ use croaring::{Bitmap, Native}; use compress::roaring_encode; use vortex::array::{ - check_slice_bounds, Array, ArrayKind, ArrayRef, ArrowIterator, Encoding, EncodingId, - EncodingRef, + check_slice_bounds, Array, ArrayKind, ArrayRef, Encoding, EncodingId, EncodingRef, }; use vortex::compress::EncodingCompression; use vortex::dtype::DType; @@ -84,10 +83,6 @@ impl Array for RoaringBoolArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - todo!() - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; diff --git a/vortex-roaring/src/integer/mod.rs b/vortex-roaring/src/integer/mod.rs index dc74b484f7a..1a6b71e36a8 100644 --- a/vortex-roaring/src/integer/mod.rs +++ b/vortex-roaring/src/integer/mod.rs @@ -5,8 +5,7 @@ use croaring::{Bitmap, Native}; use compress::roaring_encode; use vortex::array::{ - check_slice_bounds, Array, ArrayKind, ArrayRef, ArrowIterator, Encoding, EncodingId, - EncodingRef, + check_slice_bounds, Array, ArrayKind, ArrayRef, Encoding, EncodingId, EncodingRef, }; use vortex::compress::EncodingCompression; use vortex::dtype::DType; @@ -96,10 +95,6 @@ impl Array for RoaringIntArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - todo!() - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { check_slice_bounds(self, start, stop)?; todo!() diff --git a/vortex-zigzag/src/zigzag.rs b/vortex-zigzag/src/zigzag.rs index 107679a86ee..2e046579085 100644 --- a/vortex-zigzag/src/zigzag.rs +++ b/vortex-zigzag/src/zigzag.rs @@ -1,7 +1,7 @@ use std::any::Any; use std::sync::{Arc, RwLock}; -use vortex::array::{Array, ArrayKind, ArrayRef, ArrowIterator, Encoding, EncodingId, EncodingRef}; +use vortex::array::{Array, ArrayKind, ArrayRef, Encoding, EncodingId, EncodingRef}; use vortex::compress::EncodingCompression; use vortex::dtype::{DType, Signedness}; use vortex::error::{VortexError, VortexResult}; @@ -85,10 +85,6 @@ impl Array for ZigZagArray { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - todo!() - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { Ok(Self::try_new(self.encoded.slice(start, stop)?)?.boxed()) }