From 6467a1ebe4244ff35f8c9535eca047f64fc0c96e Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Sat, 2 Mar 2024 23:31:44 +0000 Subject: [PATCH 1/7] datetime --- Cargo.lock | 9 ++ Cargo.toml | 1 + vortex-datetime/Cargo.toml | 9 ++ vortex-datetime/src/compress.rs | 13 +++ vortex-datetime/src/datetime.rs | 169 ++++++++++++++++++++++++++++++++ vortex-datetime/src/lib.rs | 24 +++++ 6 files changed, 225 insertions(+) create mode 100644 vortex-datetime/Cargo.toml create mode 100644 vortex-datetime/src/compress.rs create mode 100644 vortex-datetime/src/datetime.rs create mode 100644 vortex-datetime/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 49fafdbf0a6..987a7ce0e42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2832,6 +2832,15 @@ dependencies = [ "vortex-zigzag", ] +[[package]] +name = "vortex-datetime" +version = "0.1.0" +dependencies = [ + "linkme", + "log", + "vortex", +] + [[package]] name = "vortex-dict" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 62b7edd0bd3..bc14d9e142a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "vortex", "vortex-alloc", "vortex-alp", + "vortex-datetime", "vortex-dict", "vortex-ffor", "vortex-ree", diff --git a/vortex-datetime/Cargo.toml b/vortex-datetime/Cargo.toml new file mode 100644 index 00000000000..a09dfc5a605 --- /dev/null +++ b/vortex-datetime/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "vortex-datetime" +version = "0.1.0" +edition = "2021" + +[dependencies] +vortex = { "path" = "../vortex" } +linkme = "0.3.22" +log = "0.4.20" diff --git a/vortex-datetime/src/compress.rs b/vortex-datetime/src/compress.rs new file mode 100644 index 00000000000..3f6411a0fe2 --- /dev/null +++ b/vortex-datetime/src/compress.rs @@ -0,0 +1,13 @@ +// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. diff --git a/vortex-datetime/src/datetime.rs b/vortex-datetime/src/datetime.rs new file mode 100644 index 00000000000..1f50b3abbc8 --- /dev/null +++ b/vortex-datetime/src/datetime.rs @@ -0,0 +1,169 @@ +// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::{Arc, RwLock}; + +use vortex::array::{ + check_index_bounds, check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId, +}; +use vortex::compress::EncodingCompression; +use vortex::dtype::DType; +use vortex::error::{VortexError, VortexResult}; +use vortex::formatter::{ArrayDisplay, ArrayFormatter}; +use vortex::ptype::PType; +use vortex::scalar::Scalar; +use vortex::serde::{ArraySerde, EncodingSerde}; +use vortex::stats::{Stats, StatsSet}; + +/// An array that decomposes a datetime into days, seconds, and nanoseconds. +#[derive(Debug, Clone)] +pub struct DateTimeArray { + days: ArrayRef, + seconds: ArrayRef, + nanoseconds: ArrayRef, + stats: Arc>, +} + +impl DateTimeArray { + pub fn new(days: ArrayRef, seconds: ArrayRef, nanoseconds: ArrayRef) -> Self { + Self::try_new(days, seconds, nanoseconds).unwrap() + } + + pub fn try_new(days: ArrayRef, seconds: ArrayRef, nanoseconds: ArrayRef) -> VortexResult { + if !matches!(days.dtype(), DType::Int(_, _, _)) { + return Err(VortexError::InvalidDType(days.dtype().clone())); + } + if !matches!(seconds.dtype(), DType::Int(_, _, _)) { + return Err(VortexError::InvalidDType(seconds.dtype().clone())); + } + if !matches!(nanoseconds.dtype(), DType::Int(_, _, _)) { + return Err(VortexError::InvalidDType(nanoseconds.dtype().clone())); + } + + Ok(Self { + days, + seconds, + nanoseconds, + stats: Arc::new(RwLock::new(StatsSet::new())), + }) + } + + #[inline] + pub fn days(&self) -> &dyn Array { + self.days.as_ref() + } + + #[inline] + pub fn seconds(&self) -> &dyn Array { + self.seconds.as_ref() + } + + #[inline] + pub fn nanoseconds(&self) -> &dyn Array { + self.nanoseconds.as_ref() + } +} + +impl Array for DateTimeArray { + fn as_any(&self) -> &dyn Any { + self + } + + fn boxed(self) -> ArrayRef { + Box::new(self) + } + + fn into_any(self: Box) -> Box { + self + } + + fn len(&self) -> usize { + self.days.len() + } + + fn is_empty(&self) -> bool { + self.days.is_empty() && self.seconds.is_empty() && self.nanoseconds.is_empty() + } + + fn dtype(&self) -> &DType { + PType::I64.dtype() + } + + fn stats(&self) -> Stats { + Stats::new(&self.stats, self) + } + + fn scalar_at(&self, index: usize) -> VortexResult> { + check_index_bounds(self, index)?; + let dict_index: usize = self.codes().scalar_at(index)?.try_into()?; + self.dict().scalar_at(dict_index) + } + + fn iter_arrow(&self) -> Box { + todo!() + } + + // TODO(robert): Add function to trim the dictionary + fn slice(&self, start: usize, stop: usize) -> VortexResult { + check_slice_bounds(self, start, stop)?; + Ok(Self::new(self.codes().slice(start, stop)?, self.dict.clone()).boxed()) + } + + fn encoding(&self) -> &'static dyn Encoding { + &DateTimeEncoding + } + + fn nbytes(&self) -> usize { + self.codes().nbytes() + self.dict().nbytes() + } + + fn serde(&self) -> &dyn ArraySerde { + self + } +} + +impl<'arr> AsRef<(dyn Array + 'arr)> for DateTimeArray { + fn as_ref(&self) -> &(dyn Array + 'arr) { + self + } +} + +impl ArrayDisplay for DateTimeArray { + fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result { + f.writeln("dict:")?; + f.indent(|indent| indent.array(self.dict()))?; + f.writeln("codes:")?; + f.indent(|indent| indent.array(self.codes())) + } +} + +#[derive(Debug)] +pub struct DateTimeEncoding; + +pub const DATETIME_ENCODING: EncodingId = EncodingId::new("vortex.datetime"); + +impl Encoding for DateTimeEncoding { + fn id(&self) -> &EncodingId { + &DATETIME_ENCODING + } + + fn compression(&self) -> Option<&dyn EncodingCompression> { + Some(self) + } + + fn serde(&self) -> Option<&dyn EncodingSerde> { + Some(self) + } +} diff --git a/vortex-datetime/src/lib.rs b/vortex-datetime/src/lib.rs new file mode 100644 index 00000000000..54c95a1504d --- /dev/null +++ b/vortex-datetime/src/lib.rs @@ -0,0 +1,24 @@ +// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use linkme::distributed_slice; + +pub use datetime::*; +use vortex::array::{EncodingRef, ENCODINGS}; + +mod compress; +mod datetime; + +#[distributed_slice(ENCODINGS)] +static ENCODINGS_DICT: EncodingRef = &DictEncoding; From af726774f8bee368a8240901ab82b5cf62a5043c Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Fri, 8 Mar 2024 22:36:32 +0000 Subject: [PATCH 2/7] Bit shift to FOR --- Cargo.lock | 1 + bench-vortex/Cargo.toml | 1 + bench-vortex/src/lib.rs | 4 +- vortex-datetime/src/compress.rs | 95 ++++++++++++++++++++++++++++----- vortex-datetime/src/datetime.rs | 90 +++++++++++++++---------------- vortex-datetime/src/lib.rs | 16 +----- 6 files changed, 133 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46436c48abe..5ce634986a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -371,6 +371,7 @@ dependencies = [ "simplelog", "vortex-alp", "vortex-array", + "vortex-datetime", "vortex-dict", "vortex-fastlanes", "vortex-ree", diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index 8bdc8ed2323..4415835f4aa 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -18,6 +18,7 @@ workspace = true arrow-array = "50.0.0" vortex-array = { path = "../vortex-array" } vortex-alp = { path = "../vortex-alp" } +vortex-datetime = { path = "../vortex-datetime" } vortex-dict = { path = "../vortex-dict" } vortex-fastlanes = { path = "../vortex-fastlanes" } vortex-ree = { path = "../vortex-ree" } diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index aff0ad0745e..753363501f9 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -23,6 +23,7 @@ use vortex::compress::{CompressConfig, CompressCtx}; use vortex::dtype::DType; use vortex::formatter::display_tree; use vortex_alp::ALPEncoding; +use vortex_datetime::DateTimeEncoding; use vortex_dict::DictEncoding; use vortex_fastlanes::{BitPackedEncoding, FoREncoding}; use vortex_ree::REEEncoding; @@ -46,6 +47,7 @@ pub fn enumerate_arrays() -> Vec<&'static dyn Encoding> { &DictEncoding, &BitPackedEncoding, &FoREncoding, + &DateTimeEncoding, // &DeltaEncoding, // &FFoREncoding, &REEEncoding, @@ -156,7 +158,7 @@ mod test { .unwrap(); } - #[ignore] + //#[ignore] #[test] fn compression_ratio() { setup_logger(LevelFilter::Warn); diff --git a/vortex-datetime/src/compress.rs b/vortex-datetime/src/compress.rs index 3f6411a0fe2..47a372398c9 100644 --- a/vortex-datetime/src/compress.rs +++ b/vortex-datetime/src/compress.rs @@ -1,13 +1,82 @@ -// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +use crate::{DateTimeArray, DateTimeEncoding}; +use vortex::array::downcast::DowncastArrayBuiltin; +use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding}; +use vortex::array::typed::{TypedArray, TypedEncoding}; +use vortex::array::{Array, ArrayRef, Encoding}; +use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::dtype::{DType, TimeUnit}; +use vortex::error::{VortexError, VortexResult}; + +impl EncodingCompression for DateTimeEncoding { + fn can_compress( + &self, + array: &dyn Array, + _config: &CompressConfig, + ) -> Option<&dyn EncodingCompression> { + if array.encoding().id() != TypedEncoding.id() { + return None; + } + + if array.as_typed().untyped_array().encoding().id() != PrimitiveEncoding.id() { + return None; + } + + if !matches!(array.dtype(), DType::ZonedDateTime(_, _)) { + return None; + } + + Some(self) + } + + fn compress( + &self, + array: &dyn Array, + like: Option<&dyn Array>, + ctx: CompressCtx, + ) -> VortexResult { + match array.dtype() { + DType::ZonedDateTime(unit, nullability) => { + let tarray = array.as_any().downcast_ref::().unwrap(); + let parray = tarray + .untyped_array() + .as_any() + .downcast_ref::() + .unwrap(); + // Eh, it's fine for now. + let ts = parray.typed_data::(); + + let ld = like.map(|l| l.as_any().downcast_ref::().unwrap()); + + match unit { + TimeUnit::Us => { + let mut days = Vec::with_capacity(ts.len()); + let mut seconds = Vec::with_capacity(ts.len()); + let mut subsecond = Vec::with_capacity(ts.len()); + for &t in ts.iter() { + days.push(t / 86_400_000_000); + seconds.push((t % 86_400_000_000) / 1_000_000); + subsecond.push((t % 86_400_000_000) % 1_000_000); + } + + let days_array = PrimitiveArray::from(days); + let seconds_array = PrimitiveArray::from(seconds); + let subsecond_array = PrimitiveArray::from(subsecond); + + Ok(DateTimeArray::new( + ctx.named("days") + .compress(days_array.as_ref(), ld.map(|l| l.days()))?, + ctx.named("seconds") + .compress(seconds_array.as_ref(), ld.map(|l| l.seconds()))?, + ctx.named("subsecond") + .compress(subsecond_array.as_ref(), ld.map(|l| l.subsecond()))?, + array.dtype().clone(), + ) + .boxed()) + } + _ => todo!("Unit {:?}", unit), + } + } + _ => Err(VortexError::InvalidDType(array.dtype().clone())), + } + } +} diff --git a/vortex-datetime/src/datetime.rs b/vortex-datetime/src/datetime.rs index 1f50b3abbc8..027ff07b8fa 100644 --- a/vortex-datetime/src/datetime.rs +++ b/vortex-datetime/src/datetime.rs @@ -1,61 +1,54 @@ -// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - use std::any::Any; use std::sync::{Arc, RwLock}; -use vortex::array::{ - check_index_bounds, check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId, -}; +use vortex::array::{check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId}; use vortex::compress::EncodingCompression; -use vortex::dtype::DType; +use vortex::compute::ArrayCompute; +use vortex::dtype::Nullability::NonNullable; +use vortex::dtype::{DType, Nullability}; use vortex::error::{VortexError, VortexResult}; use vortex::formatter::{ArrayDisplay, ArrayFormatter}; use vortex::ptype::PType; use vortex::scalar::Scalar; -use vortex::serde::{ArraySerde, EncodingSerde}; -use vortex::stats::{Stats, StatsSet}; +use vortex::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx}; +use vortex::stats::{Stats, StatsCompute, StatsSet}; /// An array that decomposes a datetime into days, seconds, and nanoseconds. #[derive(Debug, Clone)] pub struct DateTimeArray { days: ArrayRef, seconds: ArrayRef, - nanoseconds: ArrayRef, + subsecond: ArrayRef, + dtype: DType, stats: Arc>, } impl DateTimeArray { - pub fn new(days: ArrayRef, seconds: ArrayRef, nanoseconds: ArrayRef) -> Self { - Self::try_new(days, seconds, nanoseconds).unwrap() + pub fn new(days: ArrayRef, seconds: ArrayRef, subsecond: ArrayRef, dtype: DType) -> Self { + Self::try_new(days, seconds, subsecond, dtype).unwrap() } - pub fn try_new(days: ArrayRef, seconds: ArrayRef, nanoseconds: ArrayRef) -> VortexResult { + pub fn try_new( + days: ArrayRef, + seconds: ArrayRef, + subsecond: ArrayRef, + dtype: DType, + ) -> VortexResult { if !matches!(days.dtype(), DType::Int(_, _, _)) { return Err(VortexError::InvalidDType(days.dtype().clone())); } if !matches!(seconds.dtype(), DType::Int(_, _, _)) { return Err(VortexError::InvalidDType(seconds.dtype().clone())); } - if !matches!(nanoseconds.dtype(), DType::Int(_, _, _)) { - return Err(VortexError::InvalidDType(nanoseconds.dtype().clone())); + if !matches!(subsecond.dtype(), DType::Int(_, _, _)) { + return Err(VortexError::InvalidDType(subsecond.dtype().clone())); } Ok(Self { days, seconds, - nanoseconds, + subsecond, + dtype, stats: Arc::new(RwLock::new(StatsSet::new())), }) } @@ -71,8 +64,8 @@ impl DateTimeArray { } #[inline] - pub fn nanoseconds(&self) -> &dyn Array { - self.nanoseconds.as_ref() + pub fn subsecond(&self) -> &dyn Array { + self.subsecond.as_ref() } } @@ -94,31 +87,23 @@ impl Array for DateTimeArray { } fn is_empty(&self) -> bool { - self.days.is_empty() && self.seconds.is_empty() && self.nanoseconds.is_empty() + self.days.is_empty() } fn dtype(&self) -> &DType { - PType::I64.dtype() + &DType::LocalDate(NonNullable) } fn stats(&self) -> Stats { Stats::new(&self.stats, self) } - fn scalar_at(&self, index: usize) -> VortexResult> { - check_index_bounds(self, index)?; - let dict_index: usize = self.codes().scalar_at(index)?.try_into()?; - self.dict().scalar_at(dict_index) - } - fn iter_arrow(&self) -> Box { todo!() } - // TODO(robert): Add function to trim the dictionary fn slice(&self, start: usize, stop: usize) -> VortexResult { - check_slice_bounds(self, start, stop)?; - Ok(Self::new(self.codes().slice(start, stop)?, self.dict.clone()).boxed()) + todo!() } fn encoding(&self) -> &'static dyn Encoding { @@ -126,7 +111,7 @@ impl Array for DateTimeArray { } fn nbytes(&self) -> usize { - self.codes().nbytes() + self.dict().nbytes() + self.days().nbytes() + self.seconds().nbytes() + self.subsecond().nbytes() } fn serde(&self) -> &dyn ArraySerde { @@ -134,6 +119,22 @@ impl Array for DateTimeArray { } } +impl StatsCompute for DateTimeArray {} + +impl ArrayCompute for DateTimeArray {} + +impl ArraySerde for DateTimeArray { + fn write(&self, ctx: &mut WriteCtx) -> std::io::Result<()> { + todo!() + } +} + +impl EncodingSerde for DateTimeEncoding { + fn read(&self, ctx: &mut ReadCtx) -> std::io::Result { + todo!() + } +} + impl<'arr> AsRef<(dyn Array + 'arr)> for DateTimeArray { fn as_ref(&self) -> &(dyn Array + 'arr) { self @@ -142,10 +143,9 @@ impl<'arr> AsRef<(dyn Array + 'arr)> for DateTimeArray { impl ArrayDisplay for DateTimeArray { fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result { - f.writeln("dict:")?; - f.indent(|indent| indent.array(self.dict()))?; - f.writeln("codes:")?; - f.indent(|indent| indent.array(self.codes())) + f.child("days", self.days())?; + f.child("seconds", self.seconds())?; + f.child("subsecond", self.subsecond()) } } diff --git a/vortex-datetime/src/lib.rs b/vortex-datetime/src/lib.rs index 54c95a1504d..89250003052 100644 --- a/vortex-datetime/src/lib.rs +++ b/vortex-datetime/src/lib.rs @@ -1,17 +1,3 @@ -// (c) Copyright 2024 Fulcrum Technologies, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - use linkme::distributed_slice; pub use datetime::*; @@ -21,4 +7,4 @@ mod compress; mod datetime; #[distributed_slice(ENCODINGS)] -static ENCODINGS_DICT: EncodingRef = &DictEncoding; +static ENCODINGS_DATETIME: EncodingRef = &DateTimeEncoding; From cd0959c4c4a8e06afac6d25feb60dc9448429131 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Sat, 9 Mar 2024 08:54:14 +0000 Subject: [PATCH 3/7] Bit shift to FOR --- bench-vortex/src/lib.rs | 4 ++-- vortex-datetime/src/compress.rs | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 753363501f9..0b532dde67d 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -1,6 +1,6 @@ use arrow_array::RecordBatchReader; use itertools::Itertools; -use log::info; +use log::{info, warn}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::ProjectionMask; use std::collections::HashSet; @@ -119,7 +119,7 @@ pub fn compress_taxi_data() -> ArrayRef { let dtype: DType = schema.clone().try_into().unwrap(); let compressed = ChunkedArray::new(chunks.clone(), dtype).boxed(); - info!("Compressed array {}", display_tree(compressed.as_ref())); + warn!("Compressed array {}", display_tree(compressed.as_ref())); let mut field_bytes = vec![0; schema.fields().len()]; for chunk in chunks { diff --git a/vortex-datetime/src/compress.rs b/vortex-datetime/src/compress.rs index 47a372398c9..b38b8c27f32 100644 --- a/vortex-datetime/src/compress.rs +++ b/vortex-datetime/src/compress.rs @@ -58,17 +58,17 @@ impl EncodingCompression for DateTimeEncoding { subsecond.push((t % 86_400_000_000) % 1_000_000); } - let days_array = PrimitiveArray::from(days); - let seconds_array = PrimitiveArray::from(seconds); - let subsecond_array = PrimitiveArray::from(subsecond); - Ok(DateTimeArray::new( ctx.named("days") - .compress(days_array.as_ref(), ld.map(|l| l.days()))?, - ctx.named("seconds") - .compress(seconds_array.as_ref(), ld.map(|l| l.seconds()))?, - ctx.named("subsecond") - .compress(subsecond_array.as_ref(), ld.map(|l| l.subsecond()))?, + .compress(&PrimitiveArray::from(days), ld.map(|l| l.days()))?, + ctx.named("seconds").compress( + &PrimitiveArray::from(seconds).as_ref(), + ld.map(|l| l.seconds()), + )?, + ctx.named("subsecond").compress( + &PrimitiveArray::from(subsecond).as_ref(), + ld.map(|l| l.subsecond()), + )?, array.dtype().clone(), ) .boxed()) From 563d31e4db729cdcc18e125ce161ddc6fd4ea076 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 20 Mar 2024 13:34:49 +0000 Subject: [PATCH 4/7] Composite --- vortex-array/src/array/composite/typed.rs | 9 ++ vortex-array/src/datetime/localdatetime.rs | 5 + vortex-datetime/Cargo.toml | 3 + vortex-datetime/src/compress.rs | 113 +++++++++++---------- vortex-datetime/src/datetime.rs | 58 +++++------ 5 files changed, 106 insertions(+), 82 deletions(-) diff --git a/vortex-array/src/array/composite/typed.rs b/vortex-array/src/array/composite/typed.rs index 084c5c064a7..41a99dff2bc 100644 --- a/vortex-array/src/array/composite/typed.rs +++ b/vortex-array/src/array/composite/typed.rs @@ -5,6 +5,7 @@ use crate::array::composite::array::CompositeArray; use crate::array::composite::{CompositeID, CompositeMetadata}; use crate::array::{Array, ArrayRef}; use crate::compute::ArrayCompute; +use crate::dtype::DType; pub trait CompositeExtension: Debug + Send + Sync + 'static { fn id(&self) -> CompositeID; @@ -18,13 +19,16 @@ pub type CompositeExtensionRef = &'static dyn CompositeExtension; pub struct TypedCompositeArray { metadata: M, underlying: ArrayRef, + dtype: DType, } impl TypedCompositeArray { pub fn new(metadata: M, underlying: ArrayRef) -> Self { + let dtype = DType::Composite(metadata.id(), underlying.dtype().is_nullable().into()); Self { metadata, underlying, + dtype, } } @@ -38,6 +42,11 @@ impl TypedCompositeArray { self.underlying.as_ref() } + #[inline] + pub fn dtype(&self) -> &DType { + &self.dtype + } + pub fn as_composite(&self) -> CompositeArray { CompositeArray::new( self.metadata().id(), diff --git a/vortex-array/src/datetime/localdatetime.rs b/vortex-array/src/datetime/localdatetime.rs index d297100c90f..c11d21a2dba 100644 --- a/vortex-array/src/datetime/localdatetime.rs +++ b/vortex-array/src/datetime/localdatetime.rs @@ -29,6 +29,11 @@ impl LocalDateTime { pub fn new(time_unit: TimeUnit) -> Self { Self { time_unit } } + + #[inline] + pub fn time_unit(&self) -> TimeUnit { + self.time_unit + } } impl Display for LocalDateTime { diff --git a/vortex-datetime/Cargo.toml b/vortex-datetime/Cargo.toml index 8559799afcd..77cd5f52882 100644 --- a/vortex-datetime/Cargo.toml +++ b/vortex-datetime/Cargo.toml @@ -3,6 +3,9 @@ name = "vortex-datetime" version = "0.1.0" edition = "2021" +[lints] +workspace = true + [dependencies] vortex-array = { "path" = "../vortex-array" } linkme = "0.3.22" diff --git a/vortex-datetime/src/compress.rs b/vortex-datetime/src/compress.rs index b38b8c27f32..879f3980da0 100644 --- a/vortex-datetime/src/compress.rs +++ b/vortex-datetime/src/compress.rs @@ -1,11 +1,15 @@ -use crate::{DateTimeArray, DateTimeEncoding}; +use vortex::array::composite::CompositeEncoding; use vortex::array::downcast::DowncastArrayBuiltin; -use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding}; -use vortex::array::typed::{TypedArray, TypedEncoding}; -use vortex::array::{Array, ArrayRef, Encoding}; +use vortex::array::primitive::PrimitiveArray; +use vortex::array::{Array, ArrayRef, CloneOptionalArray}; use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; -use vortex::dtype::{DType, TimeUnit}; -use vortex::error::{VortexError, VortexResult}; +use vortex::compute::cast::cast; +use vortex::compute::flatten::flatten_primitive; +use vortex::datetime::{LocalDateTime, LocalDateTimeArray, LocalDateTimeExtension, TimeUnit}; +use vortex::error::VortexResult; +use vortex::ptype::PType; + +use crate::{DateTimeArray, DateTimeEncoding}; impl EncodingCompression for DateTimeEncoding { fn can_compress( @@ -13,15 +17,12 @@ impl EncodingCompression for DateTimeEncoding { array: &dyn Array, _config: &CompressConfig, ) -> Option<&dyn EncodingCompression> { - if array.encoding().id() != TypedEncoding.id() { - return None; - } - - if array.as_typed().untyped_array().encoding().id() != PrimitiveEncoding.id() { + if array.encoding().id() != &CompositeEncoding::ID { return None; } - if !matches!(array.dtype(), DType::ZonedDateTime(_, _)) { + let composite = array.as_composite(); + if !matches!(composite.id(), LocalDateTimeExtension::ID) { return None; } @@ -34,49 +35,55 @@ impl EncodingCompression for DateTimeEncoding { like: Option<&dyn Array>, ctx: CompressCtx, ) -> VortexResult { - match array.dtype() { - DType::ZonedDateTime(unit, nullability) => { - let tarray = array.as_any().downcast_ref::().unwrap(); - let parray = tarray - .untyped_array() - .as_any() - .downcast_ref::() - .unwrap(); - // Eh, it's fine for now. - let ts = parray.typed_data::(); + let array = array.as_composite(); + match array.id() { + LocalDateTimeExtension::ID => compress_localdatetime( + array.as_typed::(), + like.map(|l| l.as_any().downcast_ref::().unwrap()), + ctx, + ), + _ => panic!("Unsupported composite ID {}", array.id()), + } + } +} - let ld = like.map(|l| l.as_any().downcast_ref::().unwrap()); +fn compress_localdatetime( + array: LocalDateTimeArray, + like: Option<&DateTimeArray>, + ctx: CompressCtx, +) -> VortexResult { + let underlying = flatten_primitive(cast(array.underlying(), &PType::I64.into())?.as_ref())?; - match unit { - TimeUnit::Us => { - let mut days = Vec::with_capacity(ts.len()); - let mut seconds = Vec::with_capacity(ts.len()); - let mut subsecond = Vec::with_capacity(ts.len()); - for &t in ts.iter() { - days.push(t / 86_400_000_000); - seconds.push((t % 86_400_000_000) / 1_000_000); - subsecond.push((t % 86_400_000_000) % 1_000_000); - } + let divisor = match array.metadata().time_unit() { + TimeUnit::Ns => 1_000_000_000, + TimeUnit::Us => 1_000_000, + TimeUnit::Ms => 1_000, + TimeUnit::S => 1, + }; - Ok(DateTimeArray::new( - ctx.named("days") - .compress(&PrimitiveArray::from(days), ld.map(|l| l.days()))?, - ctx.named("seconds").compress( - &PrimitiveArray::from(seconds).as_ref(), - ld.map(|l| l.seconds()), - )?, - ctx.named("subsecond").compress( - &PrimitiveArray::from(subsecond).as_ref(), - ld.map(|l| l.subsecond()), - )?, - array.dtype().clone(), - ) - .boxed()) - } - _ => todo!("Unit {:?}", unit), - } - } - _ => Err(VortexError::InvalidDType(array.dtype().clone())), - } + let mut days = Vec::with_capacity(underlying.len()); + let mut seconds = Vec::with_capacity(underlying.len()); + let mut subsecond = Vec::with_capacity(underlying.len()); + + for &t in underlying.typed_data::().iter() { + days.push(t / (86_400 * divisor)); + seconds.push((t % (86_400 * divisor)) / divisor); + subsecond.push((t % (86_400 * divisor)) % divisor); } + + Ok(DateTimeArray::new( + ctx.named("days") + .compress(&PrimitiveArray::from(days), like.map(|l| l.days()))?, + ctx.named("seconds").compress( + PrimitiveArray::from(seconds).as_ref(), + like.map(|l| l.seconds()), + )?, + ctx.named("subsecond").compress( + PrimitiveArray::from(subsecond).as_ref(), + like.map(|l| l.subsecond()), + )?, + underlying.validity().clone_optional(), + LocalDateTimeExtension::dtype(underlying.validity().is_some().into()), + ) + .boxed()) } diff --git a/vortex-datetime/src/datetime.rs b/vortex-datetime/src/datetime.rs index 027ff07b8fa..62e3a9da9fc 100644 --- a/vortex-datetime/src/datetime.rs +++ b/vortex-datetime/src/datetime.rs @@ -1,16 +1,13 @@ use std::any::Any; use std::sync::{Arc, RwLock}; -use vortex::array::{check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId}; +use vortex::array::{Array, ArrayRef, Encoding, EncodingId}; use vortex::compress::EncodingCompression; use vortex::compute::ArrayCompute; -use vortex::dtype::Nullability::NonNullable; -use vortex::dtype::{DType, Nullability}; +use vortex::dtype::DType; use vortex::error::{VortexError, VortexResult}; use vortex::formatter::{ArrayDisplay, ArrayFormatter}; -use vortex::ptype::PType; -use vortex::scalar::Scalar; -use vortex::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx}; +use vortex::serde::{ArraySerde, EncodingSerde}; use vortex::stats::{Stats, StatsCompute, StatsSet}; /// An array that decomposes a datetime into days, seconds, and nanoseconds. @@ -19,19 +16,27 @@ pub struct DateTimeArray { days: ArrayRef, seconds: ArrayRef, subsecond: ArrayRef, + validity: Option, dtype: DType, stats: Arc>, } impl DateTimeArray { - pub fn new(days: ArrayRef, seconds: ArrayRef, subsecond: ArrayRef, dtype: DType) -> Self { - Self::try_new(days, seconds, subsecond, dtype).unwrap() + pub fn new( + days: ArrayRef, + seconds: ArrayRef, + subsecond: ArrayRef, + validity: Option, + dtype: DType, + ) -> Self { + Self::try_new(days, seconds, subsecond, validity, dtype).unwrap() } pub fn try_new( days: ArrayRef, seconds: ArrayRef, subsecond: ArrayRef, + validity: Option, dtype: DType, ) -> VortexResult { if !matches!(days.dtype(), DType::Int(_, _, _)) { @@ -48,6 +53,7 @@ impl DateTimeArray { days, seconds, subsecond, + validity, dtype, stats: Arc::new(RwLock::new(StatsSet::new())), }) @@ -91,19 +97,25 @@ impl Array for DateTimeArray { } fn dtype(&self) -> &DType { - &DType::LocalDate(NonNullable) + &self.dtype } fn stats(&self) -> Stats { Stats::new(&self.stats, self) } - fn iter_arrow(&self) -> Box { - todo!() - } - fn slice(&self, start: usize, stop: usize) -> VortexResult { - todo!() + Ok(Self::new( + self.days.slice(start, stop)?, + self.seconds.slice(start, stop)?, + self.subsecond.slice(start, stop)?, + self.validity + .as_ref() + .map(|v| v.slice(start, stop)) + .transpose()?, + self.dtype.clone(), + ) + .boxed()) } fn encoding(&self) -> &'static dyn Encoding { @@ -114,8 +126,8 @@ impl Array for DateTimeArray { self.days().nbytes() + self.seconds().nbytes() + self.subsecond().nbytes() } - fn serde(&self) -> &dyn ArraySerde { - self + fn serde(&self) -> Option<&dyn ArraySerde> { + None } } @@ -123,18 +135,6 @@ impl StatsCompute for DateTimeArray {} impl ArrayCompute for DateTimeArray {} -impl ArraySerde for DateTimeArray { - fn write(&self, ctx: &mut WriteCtx) -> std::io::Result<()> { - todo!() - } -} - -impl EncodingSerde for DateTimeEncoding { - fn read(&self, ctx: &mut ReadCtx) -> std::io::Result { - todo!() - } -} - impl<'arr> AsRef<(dyn Array + 'arr)> for DateTimeArray { fn as_ref(&self) -> &(dyn Array + 'arr) { self @@ -164,6 +164,6 @@ impl Encoding for DateTimeEncoding { } fn serde(&self) -> Option<&dyn EncodingSerde> { - Some(self) + None } } From 41905ac5619ce82201f52be783e91c33b46b3b3c Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 20 Mar 2024 13:36:50 +0000 Subject: [PATCH 5/7] Composite --- bench-vortex/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index ff351cfa578..f2f911ceeae 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -169,7 +169,7 @@ mod test { .unwrap(); } - //#[ignore] + #[ignore] #[test] fn compression_ratio() { setup_logger(LevelFilter::Info); From 85d817a959577cfbd61c6d5d7e25674f2693ad61 Mon Sep 17 00:00:00 2001 From: Will Manning Date: Wed, 20 Mar 2024 18:09:09 +0000 Subject: [PATCH 6/7] fix semantic merge conflict --- Cargo.lock | 1 + vortex-array/src/array/composite/typed.rs | 4 ++-- vortex-datetime/Cargo.toml | 1 + vortex-datetime/src/datetime.rs | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ebcd550013c..b8daebb1107 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2588,6 +2588,7 @@ dependencies = [ "linkme", "log", "vortex-array", + "vortex-schema", ] [[package]] diff --git a/vortex-array/src/array/composite/typed.rs b/vortex-array/src/array/composite/typed.rs index 69956f055ae..47659485252 100644 --- a/vortex-array/src/array/composite/typed.rs +++ b/vortex-array/src/array/composite/typed.rs @@ -2,12 +2,12 @@ use std::fmt::Debug; use std::sync::Arc; use vortex_schema::CompositeID; +use vortex_schema::DType; +use crate::array::{Array, ArrayRef}; use crate::array::composite::array::CompositeArray; use crate::array::composite::CompositeMetadata; -use crate::array::{Array, ArrayRef}; use crate::compute::ArrayCompute; -use crate::dtype::DType; pub trait CompositeExtension: Debug + Send + Sync + 'static { fn id(&self) -> CompositeID; diff --git a/vortex-datetime/Cargo.toml b/vortex-datetime/Cargo.toml index 77cd5f52882..3d048a24e54 100644 --- a/vortex-datetime/Cargo.toml +++ b/vortex-datetime/Cargo.toml @@ -8,5 +8,6 @@ workspace = true [dependencies] vortex-array = { "path" = "../vortex-array" } +vortex-schema = { "path" = "../vortex-schema" } linkme = "0.3.22" log = "0.4.20" diff --git a/vortex-datetime/src/datetime.rs b/vortex-datetime/src/datetime.rs index 62e3a9da9fc..5631fa1e771 100644 --- a/vortex-datetime/src/datetime.rs +++ b/vortex-datetime/src/datetime.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, RwLock}; use vortex::array::{Array, ArrayRef, Encoding, EncodingId}; use vortex::compress::EncodingCompression; use vortex::compute::ArrayCompute; -use vortex::dtype::DType; +use vortex_schema::DType; use vortex::error::{VortexError, VortexResult}; use vortex::formatter::{ArrayDisplay, ArrayFormatter}; use vortex::serde::{ArraySerde, EncodingSerde}; From 71a564d4929797684d80a89af7d4400701afa6c9 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Thu, 21 Mar 2024 08:54:02 +0000 Subject: [PATCH 7/7] Composite --- vortex-array/src/array/composite/typed.rs | 2 +- vortex-datetime/src/datetime.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/vortex-array/src/array/composite/typed.rs b/vortex-array/src/array/composite/typed.rs index 47659485252..85f95b7e523 100644 --- a/vortex-array/src/array/composite/typed.rs +++ b/vortex-array/src/array/composite/typed.rs @@ -4,9 +4,9 @@ use std::sync::Arc; use vortex_schema::CompositeID; use vortex_schema::DType; -use crate::array::{Array, ArrayRef}; use crate::array::composite::array::CompositeArray; use crate::array::composite::CompositeMetadata; +use crate::array::{Array, ArrayRef}; use crate::compute::ArrayCompute; pub trait CompositeExtension: Debug + Send + Sync + 'static { diff --git a/vortex-datetime/src/datetime.rs b/vortex-datetime/src/datetime.rs index 5631fa1e771..77571a47aeb 100644 --- a/vortex-datetime/src/datetime.rs +++ b/vortex-datetime/src/datetime.rs @@ -4,11 +4,11 @@ use std::sync::{Arc, RwLock}; use vortex::array::{Array, ArrayRef, Encoding, EncodingId}; use vortex::compress::EncodingCompression; use vortex::compute::ArrayCompute; -use vortex_schema::DType; use vortex::error::{VortexError, VortexResult}; use vortex::formatter::{ArrayDisplay, ArrayFormatter}; use vortex::serde::{ArraySerde, EncodingSerde}; use vortex::stats::{Stats, StatsCompute, StatsSet}; +use vortex_schema::DType; /// An array that decomposes a datetime into days, seconds, and nanoseconds. #[derive(Debug, Clone)]