diff --git a/Cargo.lock b/Cargo.lock index c7c50f5047e..ec2fa196e98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10185,6 +10185,7 @@ dependencies = [ "bit-vec", "flatbuffers", "futures", + "insta", "itertools 0.14.0", "kanal", "moka", diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 804218779c8..777846c553e 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -48,6 +48,8 @@ use vortex_layout::layouts::compressed::CompressingStrategy; use vortex_layout::layouts::compressed::CompressorPlugin; use vortex_layout::layouts::dict::writer::DictStrategy; use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; +#[cfg(feature = "unstable_encodings")] +use vortex_layout::layouts::list::writer::ListLayoutStrategy; use vortex_layout::layouts::repartition::RepartitionStrategy; use vortex_layout::layouts::repartition::RepartitionWriterOptions; use vortex_layout::layouts::table::TableStrategy; @@ -240,8 +242,25 @@ impl WriteStrategyBuilder { Arc::new(FlatLayoutStrategy::default()) }; - // 7. for each chunk create a flat layout - let chunked = ChunkedLayoutStrategy::new(Arc::clone(&flat)); + // 7. for each chunk create a layout. Under the `unstable_encodings` feature, list-typed + // chunks route through `ListLayoutStrategy` (separately-addressable elements/offsets/ + // validity sub-layouts; non-list chunks fall through its built-in fallback to `flat`). + // Nested lists (`list>`) recurse, shredding each level into its own + // `ListLayout`. Otherwise everything goes through the flat strategy. + #[cfg(feature = "unstable_encodings")] + let leaf: Arc = Arc::new( + // Thread the configured `flat` (which carries `allow_encodings` / any custom flat + // override) through every child; list elements still recurse into a nested ListLayout. + ListLayoutStrategy::default() + .with_elements(Arc::clone(&flat)) + .with_offsets(Arc::clone(&flat)) + .with_validity(Arc::clone(&flat)) + .with_fallback(Arc::clone(&flat)), + ); + #[cfg(not(feature = "unstable_encodings"))] + let leaf: Arc = Arc::clone(&flat); + + let chunked = ChunkedLayoutStrategy::new(leaf); // 6. buffer chunks so they end up with closer segment ids physically let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB diff --git a/vortex-layout/Cargo.toml b/vortex-layout/Cargo.toml index 61b1253ef43..daa96d4e138 100644 --- a/vortex-layout/Cargo.toml +++ b/vortex-layout/Cargo.toml @@ -55,6 +55,7 @@ vortex-utils = { workspace = true, features = ["dashmap"] } [dev-dependencies] futures = { workspace = true, features = ["executor"] } +insta = { workspace = true } rstest = { workspace = true } temp-env = { workspace = true } tokio = { workspace = true, features = ["rt", "macros"] } diff --git a/vortex-layout/src/layouts/list/mod.rs b/vortex-layout/src/layouts/list/mod.rs new file mode 100644 index 00000000000..ba926e173f6 --- /dev/null +++ b/vortex-layout/src/layouts/list/mod.rs @@ -0,0 +1,268 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod reader; +pub mod writer; + +use std::sync::Arc; + +use reader::ListReader; +use vortex_array::DeserializeMetadata; +use vortex_array::ProstMetadata; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure_eq; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; +use vortex_session::VortexSession; + +use crate::LayoutBuildContext; +use crate::LayoutChildType; +use crate::LayoutEncodingRef; +use crate::LayoutId; +use crate::LayoutReaderContext; +use crate::LayoutReaderRef; +use crate::LayoutRef; +use crate::VTable; +use crate::children::LayoutChildren; +use crate::segments::SegmentId; +use crate::segments::SegmentSource; +use crate::vtable; + +/// Child index of the `elements` layout. +pub const ELEMENTS_CHILD_INDEX: usize = 0; +/// Child index of the `offsets` layout. +pub const OFFSETS_CHILD_INDEX: usize = 1; +/// Child index of the `validity` layout (only present when the list dtype is nullable). +pub const VALIDITY_CHILD_INDEX: usize = 2; + +/// Number of children when the list dtype is non-nullable. +pub const NUM_CHILDREN_NON_NULLABLE: usize = 2; + +vtable!(List); + +impl VTable for List { + type Layout = ListLayout; + type Encoding = ListLayoutEncoding; + type Metadata = ProstMetadata; + + fn id(_encoding: &Self::Encoding) -> LayoutId { + LayoutId::new("vortex.list") + } + + fn encoding(_layout: &Self::Layout) -> LayoutEncodingRef { + LayoutEncodingRef::new_ref(ListLayoutEncoding.as_ref()) + } + + fn row_count(layout: &Self::Layout) -> u64 { + layout.row_count() + } + + fn dtype(layout: &Self::Layout) -> &DType { + &layout.dtype + } + + fn metadata(layout: &Self::Layout) -> Self::Metadata { + ProstMetadata(ListLayoutMetadata::new(layout.offsets_ptype())) + } + + fn segment_ids(_layout: &Self::Layout) -> Vec { + vec![] + } + + fn nchildren(layout: &Self::Layout) -> usize { + if layout.dtype.is_nullable() { + NUM_CHILDREN_NON_NULLABLE + 1 + } else { + NUM_CHILDREN_NON_NULLABLE + } + } + + fn child(layout: &Self::Layout, idx: usize) -> VortexResult { + match (idx, layout.validity.as_ref()) { + (ELEMENTS_CHILD_INDEX, _) => Ok(Arc::clone(&layout.elements)), + (OFFSETS_CHILD_INDEX, _) => Ok(Arc::clone(&layout.offsets)), + (VALIDITY_CHILD_INDEX, Some(validity)) => Ok(Arc::clone(validity)), + _ => vortex_bail!("Invalid child index {idx} for ListLayout"), + } + } + + fn child_type(layout: &Self::Layout, idx: usize) -> LayoutChildType { + match (idx, layout.validity.is_some()) { + (ELEMENTS_CHILD_INDEX, _) => LayoutChildType::Auxiliary("elements".into()), + (OFFSETS_CHILD_INDEX, _) => LayoutChildType::Auxiliary("offsets".into()), + (VALIDITY_CHILD_INDEX, true) => LayoutChildType::Auxiliary("validity".into()), + _ => vortex_panic!("Invalid child index {idx} for ListLayout"), + } + } + + fn new_reader( + layout: &Self::Layout, + name: Arc, + segment_source: Arc, + session: &VortexSession, + ctx: &LayoutReaderContext, + ) -> VortexResult { + Ok(Arc::new(ListReader::try_new( + layout.clone(), + name, + segment_source, + session.clone(), + ctx, + )?)) + } + + fn build( + _encoding: &Self::Encoding, + dtype: &DType, + _row_count: u64, + metadata: &::Output, + _segment_ids: Vec, + children: &dyn LayoutChildren, + _ctx: &LayoutBuildContext<'_>, + ) -> VortexResult { + validate_children(dtype, children.nchildren())?; + + let elements_dtype = dtype + .as_list_element_opt() + .ok_or_else(|| vortex_err!("ListLayout requires a List dtype, got {dtype}"))?; + let elements = children.child(ELEMENTS_CHILD_INDEX, elements_dtype.as_ref())?; + + let offsets_dtype = DType::Primitive(metadata.offsets_ptype(), Nullability::NonNullable); + let offsets = children.child(OFFSETS_CHILD_INDEX, &offsets_dtype)?; + + let validity = dtype + .is_nullable() + .then(|| children.child(VALIDITY_CHILD_INDEX, &DType::Bool(Nullability::NonNullable))) + .transpose()?; + + Ok(ListLayout { + dtype: dtype.clone(), + elements, + offsets, + validity, + }) + } + + fn with_children(layout: &mut Self::Layout, children: Vec) -> VortexResult<()> { + validate_children(layout.dtype(), children.len())?; + + let mut iter = children.into_iter(); + layout.elements = iter + .next() + .ok_or_else(|| vortex_err!("missing elements child"))?; + layout.offsets = iter + .next() + .ok_or_else(|| vortex_err!("missing offsets child"))?; + layout.validity = layout + .dtype + .is_nullable() + .then(|| { + iter.next() + .ok_or_else(|| vortex_err!("missing validity child")) + }) + .transpose()?; + Ok(()) + } +} + +/// Validates expected number of children based on `dtype` +fn validate_children(dtype: &DType, n_children: usize) -> VortexResult<()> { + let expected = if dtype.is_nullable() { + NUM_CHILDREN_NON_NULLABLE + 1 + } else { + NUM_CHILDREN_NON_NULLABLE + }; + + vortex_ensure_eq!(n_children, expected); + Ok(()) +} + +#[derive(Debug)] +pub struct ListLayoutEncoding; + +/// Stores a list-typed array by shredding `elements`, `offsets`, and optional `validity` children. +#[derive(Clone, Debug)] +pub struct ListLayout { + dtype: DType, + elements: LayoutRef, + offsets: LayoutRef, + validity: Option, +} + +impl ListLayout { + /// Construct a new `ListLayout` from its components. + /// + /// # Invariants + /// + /// - `dtype` must be a [`DType::List`]. + /// - `validity` must be `Some` iff `dtype.is_nullable()`. + /// - `offsets.dtype()` must be a non-nullable integer. + /// - `offsets.row_count()` is the Arrow-canonical `n+1` for `n` lists (or `0` for empty). + /// - When present, `validity.row_count() == offsets.row_count().saturating_sub(1)`. + pub fn new( + dtype: DType, + elements: LayoutRef, + offsets: LayoutRef, + validity: Option, + ) -> Self { + Self { + dtype, + elements, + offsets, + validity, + } + } + + /// Number of lists in this layout. + #[inline] + pub fn row_count(&self) -> u64 { + self.offsets.row_count().saturating_sub(1) + } + + #[inline] + pub fn elements(&self) -> &LayoutRef { + &self.elements + } + + #[inline] + pub fn offsets(&self) -> &LayoutRef { + &self.offsets + } + + #[inline] + pub fn validity(&self) -> Option<&LayoutRef> { + self.validity.as_ref() + } + + /// The integer type used for the `offsets` child layout. + #[inline] + pub fn offsets_ptype(&self) -> PType { + self.offsets.dtype().as_ptype() + } + + /// The dtype of the inner elements column. + pub fn elements_dtype(&self) -> &DType { + self.dtype + .as_list_element_opt() + .vortex_expect("ListLayout dtype must be a List") + } +} + +#[derive(prost::Message)] +pub struct ListLayoutMetadata { + #[prost(enumeration = "PType", tag = "1")] + offsets_ptype: i32, +} + +impl ListLayoutMetadata { + pub fn new(offsets_ptype: PType) -> Self { + let mut metadata = Self::default(); + metadata.set_offsets_ptype(offsets_ptype); + metadata + } +} diff --git a/vortex-layout/src/layouts/list/reader.rs b/vortex-layout/src/layouts/list/reader.rs new file mode 100644 index 00000000000..6c7b25c268d --- /dev/null +++ b/vortex-layout/src/layouts/list/reader.rs @@ -0,0 +1,1159 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; +use std::sync::Arc; + +use futures::FutureExt; +use futures::future::BoxFuture; +use futures::try_join; +use vortex_array::Array; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::MaskFuture; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::ConstantArray; +use vortex_array::arrays::ListArray; +use vortex_array::arrays::Primitive; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::builtins::ArrayBuiltins; +use vortex_array::dtype::DType; +use vortex_array::dtype::FieldMask; +use vortex_array::dtype::IntegerPType; +use vortex_array::dtype::Nullability; +use vortex_array::expr::Expression; +use vortex_array::expr::is_root; +use vortex_array::expr::not; +use vortex_array::expr::root; +use vortex_array::scalar_fn::fns::is_not_null::IsNotNull; +use vortex_array::scalar_fn::fns::is_null::IsNull; +use vortex_array::scalar_fn::fns::operators::Operator; +use vortex_array::validity::Validity; +use vortex_buffer::Buffer; +use vortex_error::VortexError; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_mask::Mask; +use vortex_session::VortexSession; + +use crate::ArrayFuture; +use crate::LayoutReader; +use crate::LayoutReaderContext; +use crate::LayoutReaderRef; +use crate::RowSplits; +use crate::SplitRange; +use crate::layouts::list::ListLayout; +use crate::segments::SegmentSource; + +type OptionalArrayFuture = BoxFuture<'static, VortexResult>>; + +/// The threshold of mask density below which we push the input mask into projection evaluation, +/// and above which we evaluate the expression over all rows and intersect afterward. +const EXPR_EVAL_THRESHOLD: f64 = 0.2; + +/// Reader for [`ListLayout`]. +#[derive(Clone)] +pub struct ListReader { + layout: ListLayout, + name: Arc, + session: VortexSession, + elements: LayoutReaderRef, + offsets: LayoutReaderRef, + validity: Option, +} + +impl ListReader { + pub(super) fn try_new( + layout: ListLayout, + name: Arc, + segment_source: Arc, + session: VortexSession, + ctx: &LayoutReaderContext, + ) -> VortexResult { + let elements = layout.elements().new_reader( + format!("{name}.elements").into(), + Arc::clone(&segment_source), + &session, + ctx, + )?; + let offsets = layout.offsets().new_reader( + format!("{name}.offsets").into(), + Arc::clone(&segment_source), + &session, + ctx, + )?; + let validity = layout + .validity() + .map(|v| { + v.new_reader( + format!("{name}.validity").into(), + Arc::clone(&segment_source), + &session, + ctx, + ) + }) + .transpose()?; + + Ok(Self { + layout, + name, + session, + elements, + offsets, + validity, + }) + } + + /// Projection for [`ExprClass::Validity`] expressions (`is_null` / `is_not_null` of the list): + /// reads only the validity child — synthesizing all-valid for a non-nullable list — and never + /// touches the offsets or elements. + fn project_validity( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let validity_reader = self.validity.clone(); + let nullability = self.layout.dtype().nullability(); + let row_range = row_range.clone(); + // Evaluate the rewritten expression against the validity bool array (true == valid row). + let rewritten = rewrite_validity_expr(expr)?; + + Ok(async move { + let mask = mask.await?; + let row_count = usize::try_from(row_range.end - row_range.start)?; + let out_len = if mask.all_true() { + row_count + } else { + mask.true_count() + }; + + let validity_array = match validity_reader.as_ref() { + Some(v) => Some( + v.projection_evaluation(&row_range, &root(), MaskFuture::ready(mask))? + .await?, + ), + None => None, + }; + + let validity = create_validity(validity_array, nullability).to_array(out_len); + validity.apply(&rewritten) + } + .boxed()) + } + + /// Projection for [`ExprClass::Elements`] expressions (everything else): materializes the list + /// (offsets + elements + validity) and applies the expression. + fn project_elements( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + // Fire the offsets read before cloning so it overlaps the mask await below. + let projection = ElementsProjection { + reader: self.clone(), + expr: expr.clone(), + row_range: row_range.clone(), + offsets: self.fetch_offsets(row_range)?, + }; + + Ok(async move { + // Await the caller mask to decide the read shape. Offsets is already in flight and + // overlaps this wait; for statically-resolved masks the await is free. + let mask = mask.await?; + let is_whole_chunk = projection.row_range.start == 0 + && projection.row_range.end == projection.reader.layout.row_count(); + + if mask.all_true() && is_whole_chunk { + projection.project_whole_chunk().await + } else if mask.all_true() { + projection.project_full_range().await + } else { + projection.project_sparse(mask).await + } + } + .boxed()) + } + + /// Fire the offsets read for `row_range`. The offsets child has an extra entry, so reading + /// `row_range` maps to offsets in `[row_range.start..row_range.end + 1)`. + fn fetch_offsets(&self, row_range: &Range) -> VortexResult { + let offsets_range = row_range.start..(row_range.end + 1); + let offsets_count = usize::try_from(offsets_range.end - offsets_range.start)?; + self.offsets.projection_evaluation( + &offsets_range, + &root(), + MaskFuture::new_true(offsets_count), + ) + } +} + +/// Read `offsets[0]` and `offsets[-1]` and return the elements-buffer range they describe. +fn calculate_elements_range( + offsets: &ArrayRef, + session: &VortexSession, +) -> VortexResult> { + if offsets.is_empty() { + return Ok(0..0); + } + let mut exec_ctx = session.create_execution_ctx(); + let start = offsets + .execute_scalar(0, &mut exec_ctx)? + .as_primitive() + .as_::() + .vortex_expect("offset value fits in u64"); + let end = offsets + .execute_scalar(offsets.len() - 1, &mut exec_ctx)? + .as_primitive() + .as_::() + .vortex_expect("offset value fits in u64"); + Ok(start..end) +} + +/// Subtract `first` from every offset so the resulting offsets index into a sliced +/// `elements[first..]` buffer starting at zero. The constant array is cast to the offsets' dtype. +fn rebase_offsets(offsets: ArrayRef, first: u64) -> VortexResult { + let constant = ConstantArray::new(first, offsets.len()) + .into_array() + .cast(offsets.dtype().clone())?; + offsets.binary(constant, Operator::Sub) +} + +fn create_validity(validity_array: Option, nullability: Nullability) -> Validity { + match validity_array { + Some(arr) => Validity::Array(arr), + None => match nullability { + Nullability::Nullable => Validity::AllValid, + Nullability::NonNullable => Validity::NonNullable, + }, + } +} + +/// The deepest list child an expression needs, cheapest first. +/// +/// Drives "fetch as little as possible": a projection/filter that only inspects the list's +/// null-ness needs the validity child; everything else needs the element values. The ordering +/// `Validity < Elements` lets us take the max over the operands of a compound expression. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +enum ExprClass { + /// Only the list's validity is needed (`is_null` / `is_not_null` of the list itself). + Validity, + /// The element values are needed (everything else). + Elements, +} + +/// Classify `expr` by the deepest list child it touches, where `root()` is the list. +/// +/// Only the exact shapes `is_null(root())` / `is_not_null(root())` (validity) are recognized. Every +/// other access to the list, including a bare `root()`, falls through to [`ExprClass::Elements`], +/// which is always correct. +fn classify(expr: &Expression) -> ExprClass { + // `is_null(root())` / `is_not_null(root())` need only the list's own validity. Note this is + // the list's null-ness, not the validity of some derived value, so the child must be `root()`. + if (expr.is::() || expr.is::()) + && expr.children().len() == 1 + && is_root(expr.child(0)) + { + return ExprClass::Validity; + } + + // A bare reference to the list needs its elements. + if is_root(expr) { + return ExprClass::Elements; + } + + // Otherwise the requirement is the max over the operands. Operands that never touch the list + // (e.g. literals) contribute nothing, so an expression that never references `root()` is + // treated as the cheapest class. + expr.children() + .iter() + .map(classify) + .max() + .unwrap_or(ExprClass::Validity) +} + +/// Rewrite a validity-class expression so it can be evaluated against the list's validity bool +/// array (`true` == valid row): `is_not_null(root())` becomes `root()` and `is_null(root())` +/// becomes `not(root())`. All other nodes are rebuilt with rewritten children. +fn rewrite_validity_expr(expr: &Expression) -> VortexResult { + if expr.is::() && expr.children().len() == 1 && is_root(expr.child(0)) { + return Ok(root()); + } + if expr.is::() && expr.children().len() == 1 && is_root(expr.child(0)) { + return Ok(not(root())); + } + let children = expr + .children() + .iter() + .map(rewrite_validity_expr) + .collect::>>()?; + expr.clone().with_children(children) +} + +/// Plan for fetching only the elements needed to materialize the kept list rows under a sparse +/// row mask, plus the offsets array we'll hand to `ListArray::try_new` for those kept rows. +/// +/// When the row mask is sparse, the alternative (read full row_range, build full list, then +/// `array.filter(mask)`) wastes IO on elements that get thrown away. This plan tells the reader: +/// +/// - which contiguous span of the elements buffer to fetch (`elements_range`), +/// - which positions inside that span belong to a kept row (`element_mask`), +/// - the offsets for the kept-row output, rebased to start at zero (`new_offsets`). +struct ScatterGather { + /// Tightest absolute elements range covering all kept rows. Empty range when no rows kept. + elements_range: Range, + /// `element_mask.len() == elements_range.end - elements_range.start`. A bit is set iff its + /// position in the elements buffer belongs to a kept list row. + element_mask: Mask, + /// Cumulative kept-list lengths starting at zero. `new_offsets.len() == kept_count + 1`. + new_offsets: ArrayRef, + /// Number of true bits in the input row mask. Read by unit tests only. + #[cfg_attr(not(test), allow(dead_code))] + kept_count: usize, +} + +/// Walk the row mask and the (canonicalized) offsets to plan the elements fetch + output offsets +/// for the sparse-mask path of `projection_evaluation`. Single linear pass; no IO. +/// +/// `offsets` is the offsets array we fetched for the full `row_range` (length n+1). `mask` is +/// the row-space mask (length n). Returns a plan suitable for handing the elements child a +/// bounded range + element-level mask, then constructing a kept-only `ListArray`. +// `usize::try_from` / `u64::try_from` are required by the macro arms whose `O` may be `u64` / +// `i64` (potentially fallible on 32-bit targets) but also expand to arms where `O` is `u8`, +// `u16`, etc. (where the conversion is trivially infallible). Suppress the resulting +// `unnecessary_fallible_conversions` lint from the latter arms — the uniform fallible form +// keeps the inner body identical across all expansions. +#[allow(clippy::unnecessary_fallible_conversions)] +fn compute_scatter_gather( + offsets: &ArrayRef, + mask: &Mask, + session: &VortexSession, +) -> VortexResult { + let kept_count = mask.true_count(); + let mut exec_ctx = session.create_execution_ctx(); + let prim_offsets = offsets.clone().execute::(&mut exec_ctx)?; + let ptype = prim_offsets.ptype(); + + if kept_count == 0 { + // Empty result: no elements to fetch, new_offsets is a single zero. + let new_offsets = vortex_array::match_each_integer_ptype!(ptype, |O| { + Array::::new::( + Buffer::::from(vec![O::default()]), + Validity::NonNullable, + ) + .into_array() + }); + return Ok(ScatterGather { + elements_range: 0..0, + element_mask: Mask::new_false(0), + new_offsets, + kept_count: 0, + }); + } + + vortex_array::match_each_integer_ptype!(ptype, |O| { + compute_scatter_gather_typed::(prim_offsets.as_slice::(), mask, kept_count) + }) +} + +fn compute_scatter_gather_typed( + offsets: &[O], + mask: &Mask, + kept_count: usize, +) -> VortexResult +where + O: IntegerPType, + usize: TryFrom, + VortexError: From<>::Error>, +{ + let mut new_off: Vec = Vec::with_capacity(kept_count + 1); + let mut element_slices: Vec<(usize, usize)> = Vec::with_capacity(kept_count); + new_off.push(O::default()); + let mut cumulative: O = O::default(); + let mut range_start: Option = None; + let mut range_end = 0usize; + + { + let mut keep_row = |i: usize| -> VortexResult<()> { + let start_offset = offsets[i]; + let end_offset = offsets[i + 1]; + cumulative += end_offset - start_offset; + new_off.push(cumulative); + + let start = usize::try_from(start_offset)?; + let end = usize::try_from(end_offset)?; + if start < end { + let start_base = *range_start.get_or_insert(start); + element_slices.push((start - start_base, end - start_base)); + range_end = end; + } + Ok(()) + }; + + // `mask.indices()` returns the set bit positions for `Values` masks; `AllTrue` is rare + // here (caller checks density) but we handle it via fallback iteration. + match mask.indices() { + vortex_mask::AllOr::All => { + for i in 0..mask.len() { + keep_row(i)?; + } + } + vortex_mask::AllOr::None => {} + vortex_mask::AllOr::Some(idxs) => { + for &i in idxs { + keep_row(i)?; + } + } + } + } + + let range_start = range_start.unwrap_or(0); + let element_mask = Mask::from_slices(range_end - range_start, element_slices); + let new_offsets = + Array::::new::(Buffer::::from(new_off), Validity::NonNullable) + .into_array(); + + Ok(ScatterGather { + elements_range: u64::try_from(range_start)?..u64::try_from(range_end)?, + element_mask, + new_offsets, + kept_count, + }) +} + +impl LayoutReader for ListReader { + fn name(&self) -> &Arc { + &self.name + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn dtype(&self) -> &DType { + self.layout.dtype() + } + + fn row_count(&self) -> u64 { + self.layout.row_count() + } + + fn register_splits( + &self, + field_mask: &[FieldMask], + split_range: &SplitRange, + splits: &mut RowSplits, + ) -> VortexResult<()> { + self.offsets + .register_splits(field_mask, split_range, splits)?; + if let Some(validity) = &self.validity { + validity.register_splits(field_mask, split_range, splits)?; + } + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: Mask, + ) -> VortexResult { + // All stats-based pruning should already be done upstream. + Ok(MaskFuture::ready(mask)) + } + + fn filter_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let len = mask.len(); + let reader = self.clone(); + let row_range = row_range.clone(); + let expr = expr.clone(); + let session = self.session.clone(); + + Ok(MaskFuture::new(len, async move { + let mask = mask.await?; + + if mask.all_false() { + return Ok(mask); + } + + if mask.density() < EXPR_EVAL_THRESHOLD { + let predicate = reader + .projection_evaluation(&row_range, &expr, MaskFuture::ready(mask.clone()))? + .await?; + let predicate_mask = predicate_array_to_mask(predicate, &session)?; + Ok(mask.intersect_by_rank(&predicate_mask)) + } else { + let predicate = reader + .projection_evaluation(&row_range, &expr, MaskFuture::new_true(len))? + .await?; + let predicate_mask = predicate_array_to_mask(predicate, &session)?; + Ok(mask & &predicate_mask) + } + })) + } + + fn projection_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + // Read as little as possible based on which list children the expression needs. + match classify(expr) { + ExprClass::Validity => self.project_validity(row_range, expr, mask), + ExprClass::Elements => self.project_elements(row_range, expr, mask), + } + } +} + +/// Fetch the validity child for `row_range` under `mask`, yielding `None` for a non-nullable list +/// (which has no validity child). +fn fetch_validity( + validity: Option<&LayoutReaderRef>, + row_range: &Range, + mask: MaskFuture, +) -> VortexResult { + let fut = validity + .map(|v| v.projection_evaluation(row_range, &root(), mask)) + .transpose()?; + Ok(async move { + match fut { + Some(f) => f.await.map(Some), + None => Ok(None), + } + } + .boxed()) +} + +struct ListParts { + elements: ArrayRef, + offsets: ArrayRef, + validity: Option, +} + +/// Build the list array from its parts and apply the projection expression. +fn build_list( + parts: ListParts, + nullability: Nullability, + expr: &Expression, +) -> VortexResult { + let validity = create_validity(parts.validity, nullability); + ListArray::try_new(parts.elements, parts.offsets, validity)? + .into_array() + .apply(expr) +} + +fn predicate_array_to_mask(array: ArrayRef, session: &VortexSession) -> VortexResult { + let mut ctx = session.create_execution_ctx(); + array.null_as_false().execute(&mut ctx) +} + +struct ElementsProjection { + reader: ListReader, + expr: Expression, + row_range: Range, + offsets: ArrayFuture, +} + +impl ElementsProjection { + /// Path A1: whole-chunk read with an all-true mask. The elements bound is the whole elements + /// buffer (`0..elements_row_count`) and `offsets[0] == 0` within a chunk, so we don't need to + /// read offsets to know the bound and don't need to rebase. Fires elements + validity in + /// parallel with the already-in-flight offsets — a single `try_join!` over all three children. + async fn project_whole_chunk(self) -> VortexResult { + let Self { + reader, + expr, + row_range, + offsets, + } = self; + let validity_row_count = usize::try_from(row_range.end - row_range.start)?; + let elements_row_count = reader.elements.row_count(); + let elements_fut = reader.elements.projection_evaluation( + &(0..elements_row_count), + &root(), + MaskFuture::new_true(usize::try_from(elements_row_count)?), + )?; + let validity_fut = fetch_validity( + reader.validity.as_ref(), + &row_range, + MaskFuture::new_true(validity_row_count), + )?; + let (offsets, elements, validity) = try_join!(offsets, elements_fut, validity_fut)?; + build_list( + ListParts { + elements, + offsets, + validity, + }, + reader.layout.dtype().nullability(), + &expr, + ) + } + + /// Path A2: partial range with an all-true mask. The elements bound is + /// `offsets[a]..offsets[b]`, so we await offsets before firing the elements read and rebase the + /// offsets to start at zero. + async fn project_full_range(self) -> VortexResult { + let Self { + reader, + expr, + row_range, + offsets, + } = self; + let offsets = offsets.await?; + let elements_range = calculate_elements_range(&offsets, &reader.session)?; + let rebased_offsets = rebase_offsets(offsets, elements_range.start)?; + let elements_len = elements_range.end - elements_range.start; + let validity_row_count = usize::try_from(row_range.end - row_range.start)?; + + let elements_fut = reader.elements.projection_evaluation( + &elements_range, + &root(), + MaskFuture::new_true(usize::try_from(elements_len)?), + )?; + let validity_fut = fetch_validity( + reader.validity.as_ref(), + &row_range, + MaskFuture::new_true(validity_row_count), + )?; + let (elements, validity) = try_join!(elements_fut, validity_fut)?; + build_list( + ListParts { + elements, + offsets: rebased_offsets, + validity, + }, + reader.layout.dtype().nullability(), + &expr, + ) + } + + /// Path B: sparse mask. Bound the elements fetch to the tightest range covering the kept rows + /// and pass an element-level mask so the elements child only materializes kept-row positions; + /// validity is fetched for the kept rows by pushing the caller mask down directly. + async fn project_sparse(self, mask: Mask) -> VortexResult { + let Self { + reader, + expr, + row_range, + offsets, + } = self; + let validity_fut = fetch_validity( + reader.validity.as_ref(), + &row_range, + MaskFuture::ready(mask.clone()), + )?; + let offsets = offsets.await?; + let sg = compute_scatter_gather(&offsets, &mask, &reader.session)?; + let elements_fut = reader.elements.projection_evaluation( + &sg.elements_range, + &root(), + MaskFuture::ready(sg.element_mask), + )?; + let (elements, validity) = try_join!(elements_fut, validity_fut)?; + build_list( + ListParts { + elements, + offsets: sg.new_offsets, + validity, + }, + reader.layout.dtype().nullability(), + &expr, + ) + } +} + +#[cfg(test)] +mod tests { + use std::ops::Range; + + use rstest::rstest; + use vortex_array::ArrayContext; + use vortex_array::arrays::BoolArray; + use vortex_array::arrays::ListArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::assert_arrays_eq; + use vortex_array::dtype::Nullability::NonNullable; + use vortex_array::expr::eq; + use vortex_array::expr::is_not_null; + use vortex_array::expr::is_null; + use vortex_array::expr::lit; + use vortex_array::expr::not; + use vortex_buffer::buffer; + + use super::*; + use crate::LayoutRef; + use crate::LayoutStrategy; + use crate::layouts::list::writer::ListLayoutStrategy; + use crate::segments::SegmentSource; + use crate::segments::TestSegments; + use crate::sequence::SequenceId; + use crate::sequence::SequentialArrayStreamExt; + use crate::test::SESSION; + + /// `classify` keys off the deepest list child an expression touches; `Elements` is the + /// always-correct default for anything not specifically recognized. + #[rstest] + // `is_null` / `is_not_null` of the list itself need only validity. + #[case::is_null(is_null(root()), ExprClass::Validity)] + #[case::is_not_null(is_not_null(root()), ExprClass::Validity)] + // Compound over validity-only operands stays validity. + #[case::not_is_null(not(is_null(root())), ExprClass::Validity)] + // A list-independent (constant) expression falls to the cheapest class. + #[case::constant(lit(5), ExprClass::Validity)] + // A bare list reference needs the elements. + #[case::bare_root(root(), ExprClass::Elements)] + // Any other fn over the list needs the elements. + #[case::not_root(not(root()), ExprClass::Elements)] + // `is_null` only short-circuits to validity when its argument is the list itself. + #[case::is_null_of_derived(is_null(not(root())), ExprClass::Elements)] + // Max over operands: validity + elements => elements. + #[case::validity_and_elements(eq(is_null(root()), root()), ExprClass::Elements)] + fn classify_expr_class(#[case] expr: Expression, #[case] expected: ExprClass) { + assert_eq!(classify(&expr), expected); + } + + /// Validity-class projections (`is_null` / `is_not_null` of the list) round-trip through the + /// validity-only read path, for both nullable and non-nullable lists. + #[rstest] + // `create_basic_list_array(true)` has validity `[true, false, true]`. + #[case::nullable(true, vec![true, false, true])] + #[case::non_nullable(false, vec![true, true, true])] + #[tokio::test] + async fn projection_validity_class( + #[case] nullable: bool, + #[case] valid: Vec, + ) -> VortexResult<()> { + let list = create_basic_list_array(nullable); + let ctx = LayoutReaderContext::new(); + let (segments, layout) = write_layout(&flat_list_strategy(), list).await?; + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + + let not_null = reader + .projection_evaluation(&(0..3), &is_not_null(root()), MaskFuture::new_true(3))? + .await?; + let mut exec_ctx = SESSION.create_execution_ctx(); + assert_arrays_eq!(not_null, BoolArray::from_iter(valid.clone()), &mut exec_ctx); + + let is_null_res = reader + .projection_evaluation(&(0..3), &is_null(root()), MaskFuture::new_true(3))? + .await?; + assert_arrays_eq!( + is_null_res, + BoolArray::from_iter(valid.iter().map(|v| !v).collect::>()), + &mut exec_ctx + ); + + Ok(()) + } + + #[rstest] + #[case::is_not_null_nullable(true, is_not_null(root()), Mask::from_iter([true, false, true]))] + #[case::is_not_null_non_nullable(false, is_not_null(root()), Mask::new_true(3))] + #[case::is_null_nullable(true, is_null(root()), Mask::from_iter([false, true, false]))] + #[case::is_null_non_nullable(false, is_null(root()), Mask::new_false(3))] + #[tokio::test] + async fn filter_evaluation_validity_class( + #[case] nullable: bool, + #[case] expr: Expression, + #[case] expected: Mask, + ) -> VortexResult<()> { + let list = create_basic_list_array(nullable); + let ctx = LayoutReaderContext::new(); + let (segments, layout) = write_layout(&flat_list_strategy(), list).await?; + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + + let result = reader + .filter_evaluation(&(0..3), &expr, MaskFuture::new_true(3))? + .await?; + + assert_eq!(result, expected); + Ok(()) + } + + #[tokio::test] + async fn filter_evaluation_intersects_with_input_mask() -> VortexResult<()> { + let list = create_basic_list_array(true); + let ctx = LayoutReaderContext::new(); + let (segments, layout) = write_layout(&flat_list_strategy(), list).await?; + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + + let input_mask = Mask::from_iter([true, true, false]); + let result = reader + .filter_evaluation(&(0..3), &is_not_null(root()), MaskFuture::ready(input_mask))? + .await?; + + assert_eq!(result, Mask::from_iter([true, false, false])); + Ok(()) + } + + #[tokio::test] + async fn filter_evaluation_sparse_mask_maps_by_rank() -> VortexResult<()> { + let list = create_six_list_array(); + let ctx = LayoutReaderContext::new(); + let (segments, layout) = write_layout(&flat_list_strategy(), list).await?; + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + + let input_mask = Mask::from_iter([false, false, false, false, true, false]); + let result = reader + .filter_evaluation(&(0..6), &is_not_null(root()), MaskFuture::ready(input_mask))? + .await?; + + assert_eq!( + result, + Mask::from_iter([false, false, false, false, true, false]) + ); + Ok(()) + } + + fn flat_list_strategy() -> ListLayoutStrategy { + ListLayoutStrategy::default() + } + + async fn write_layout( + strategy: &S, + array: ArrayRef, + ) -> VortexResult<(Arc, LayoutRef)> { + let segments = Arc::new(TestSegments::default()); + let segments_ref: Arc = Arc::::clone(&segments); + let (ptr, eof) = SequenceId::root().split(); + let stream = array.to_array_stream().sequenced(ptr); + let layout = strategy + .write_stream(ArrayContext::empty(), segments, stream, eof, &SESSION) + .await?; + Ok((segments_ref, layout)) + } + + fn materialize_u32_array(array: ArrayRef) -> Vec { + let mut ctx = SESSION.create_execution_ctx(); + array + .execute::(&mut ctx) + .unwrap() + .as_slice::() + .to_vec() + } + + #[rstest] + #[case::full(buffer![0u32, 2, 5, 5].into_array(), 0..5)] + #[case::partial_slice(buffer![2u32, 5, 5, 8].into_array(), 2..8)] + #[case::single_offset_is_empty(buffer![7u32].into_array(), 7..7)] + #[case::u64_offsets(buffer![10u64, 12, 15, 15].into_array(), 10..15)] + fn test_calculate_elements_range( + #[case] offsets: ArrayRef, + #[case] expected: Range, + ) -> VortexResult<()> { + assert_eq!(calculate_elements_range(&offsets, &SESSION)?, expected); + Ok(()) + } + + #[test] + fn calculate_elements_range_empty_offsets() -> VortexResult<()> { + let offsets = PrimitiveArray::empty::(NonNullable).into_array(); + assert_eq!(calculate_elements_range(&offsets, &SESSION)?, 0..0); + Ok(()) + } + + #[rstest] + #[case::first_zero_is_identity(buffer![0u32, 2, 5, 5].into_array(), 0, vec![0, 2, 5, 5])] + #[case::subtracts_first(buffer![3u32, 5, 8].into_array(), 3, vec![0, 2, 5])] + fn test_rebase_offsets( + #[case] offsets: ArrayRef, + #[case] first: u64, + #[case] expected: Vec, + ) -> VortexResult<()> { + let rebased = rebase_offsets(offsets, first)?; + assert_eq!(materialize_u32_array(rebased), expected); + Ok(()) + } + + // ---- compute_scatter_gather -------------------------------------------------------------- + + /// Run `compute_scatter_gather` and unwrap the three derived fields plus the kept count. + /// Returns the raw `new_offsets` ArrayRef so callers with non-u32 offsets can materialize + /// the ptype themselves. + fn run_scatter_gather( + offsets: ArrayRef, + mask: Mask, + ) -> VortexResult<(Range, Vec, ArrayRef, usize)> { + let sg = compute_scatter_gather(&offsets, &mask, &SESSION)?; + let element_mask_bits: Vec = (0..sg.element_mask.len()) + .map(|i| sg.element_mask.value(i)) + .collect(); + Ok(( + sg.elements_range, + element_mask_bits, + sg.new_offsets, + sg.kept_count, + )) + } + + /// Source layout for these tests: 5 lists with offsets `[0, 2, 5, 5, 8, 10]`, i.e. + /// lengths `[2, 3, 0, 3, 2]`. Element positions for list i are `offsets[i]..offsets[i+1]`. + fn five_list_offsets() -> ArrayRef { + buffer![0u32, 2, 5, 5, 8, 10].into_array() + } + + #[test] + fn scatter_gather_single_middle_row() -> VortexResult<()> { + // Keep only list 1 (positions 2..5). + let mask = Mask::from_iter([false, true, false, false, false]); + let (range, elem_mask, new_off, kept) = run_scatter_gather(five_list_offsets(), mask)?; + assert_eq!(range, 2..5); + assert_eq!(elem_mask, vec![true; 3]); // entire bounded range is the kept span + assert_eq!(materialize_u32_array(new_off), vec![0, 3]); + assert_eq!(kept, 1); + Ok(()) + } + + #[test] + fn scatter_gather_two_adjacent_rows() -> VortexResult<()> { + // Keep lists 1 and 2 (positions 2..5 and 5..5 — second is empty). + let mask = Mask::from_iter([false, true, true, false, false]); + let (range, elem_mask, new_off, kept) = run_scatter_gather(five_list_offsets(), mask)?; + assert_eq!(range, 2..5); + assert_eq!(elem_mask, vec![true; 3]); + assert_eq!(materialize_u32_array(new_off), vec![0, 3, 3]); // second kept row has length 0 + assert_eq!(kept, 2); + Ok(()) + } + + #[test] + fn scatter_gather_two_far_apart_rows() -> VortexResult<()> { + // Keep lists 0 and 3 (positions 0..2 and 5..8). Element mask must skip position 2..5. + let mask = Mask::from_iter([true, false, false, true, false]); + let (range, elem_mask, new_off, kept) = run_scatter_gather(five_list_offsets(), mask)?; + assert_eq!(range, 0..8); + // positions 0..2 and 5..8 set, 2..5 unset. + assert_eq!( + elem_mask, + vec![true, true, false, false, false, true, true, true] + ); + assert_eq!(materialize_u32_array(new_off), vec![0, 2, 5]); // lengths 2 and 3 + assert_eq!(kept, 2); + Ok(()) + } + + #[test] + fn scatter_gather_at_boundaries() -> VortexResult<()> { + // Keep first and last list (positions 0..2 and 8..10). + let mask = Mask::from_iter([true, false, false, false, true]); + let (range, elem_mask, new_off, kept) = run_scatter_gather(five_list_offsets(), mask)?; + assert_eq!(range, 0..10); + let mut expected = vec![false; 10]; + expected[0] = true; + expected[1] = true; + expected[8] = true; + expected[9] = true; + assert_eq!(elem_mask, expected); + assert_eq!(materialize_u32_array(new_off), vec![0, 2, 4]); + assert_eq!(kept, 2); + Ok(()) + } + + #[test] + fn scatter_gather_empty_mask_returns_empty_plan() -> VortexResult<()> { + let mask = Mask::new_false(5); + let (range, elem_mask, new_off, kept) = run_scatter_gather(five_list_offsets(), mask)?; + assert_eq!(range, 0..0); + assert!(elem_mask.is_empty()); + // single zero, ready to be a 0-row ListArray's offsets (offsets.len() - 1 == 0 rows) + assert_eq!(materialize_u32_array(new_off), vec![0]); + assert_eq!(kept, 0); + Ok(()) + } + + #[test] + fn scatter_gather_kept_row_is_empty_list() -> VortexResult<()> { + // Keep only list 2, which has length 0 (offsets[2] == offsets[3] == 5). + let mask = Mask::from_iter([false, false, true, false, false]); + let (range, elem_mask, new_off, kept) = run_scatter_gather(five_list_offsets(), mask)?; + assert_eq!(range, 0..0); + assert!(elem_mask.is_empty()); + assert_eq!(materialize_u32_array(new_off), vec![0, 0]); + assert_eq!(kept, 1); + Ok(()) + } + + #[test] + fn scatter_gather_ignores_empty_kept_boundary_rows() -> VortexResult<()> { + // The first and last kept rows are empty. The read range should be anchored to the one + // non-empty kept row, not widened across skipped rows. + let offsets = buffer![0u32, 0, 100, 102, 200, 200].into_array(); + let mask = Mask::from_iter([true, false, true, false, true]); + let (range, elem_mask, new_off, kept) = run_scatter_gather(offsets, mask)?; + assert_eq!(range, 100..102); + assert_eq!(elem_mask, vec![true, true]); + assert_eq!(materialize_u32_array(new_off), vec![0, 0, 2, 2]); + assert_eq!(kept, 3); + Ok(()) + } + + #[test] + fn scatter_gather_u64_offsets() -> VortexResult<()> { + // Verify the ptype-dispatch path works for u64 offsets, not just u32. + let offsets = buffer![0u64, 3, 7, 7, 12].into_array(); + let mask = Mask::from_iter([false, true, false, true]); + let (range, elem_mask, new_off, kept) = run_scatter_gather(offsets, mask)?; + assert_eq!(range, 3..12); + // positions 3..7 (4 bits) and 7..12 (5 bits) — middle "gap" at 7..7 is zero-width. + assert_eq!(elem_mask, vec![true; 9]); + // Walk the new_offsets slice as u64. + let mut ctx = SESSION.create_execution_ctx(); + let new_off_prim = new_off.execute::(&mut ctx)?; + assert_eq!(new_off_prim.as_slice::(), &[0u64, 4, 9]); + assert_eq!(kept, 2); + Ok(()) + } + + fn create_basic_list_array(nullable: bool) -> ArrayRef { + let validity = if nullable { + Validity::Array(BoolArray::from_iter([true, false, true]).into_array()) + } else { + Validity::NonNullable + }; + + ListArray::try_new( + buffer![1i32, 2, 3, 4, 5].into_array(), + buffer![0u32, 2, 4, 5].into_array(), + validity, + ) + .expect("array is valid") + .into_array() + } + + fn create_six_list_array() -> ArrayRef { + let validity = Validity::Array( + BoolArray::from_iter([true, false, true, false, true, true]).into_array(), + ); + + ListArray::try_new( + buffer![1i32, 2, 3, 4, 5, 6].into_array(), + buffer![0u32, 1, 2, 3, 4, 5, 6].into_array(), + validity, + ) + .expect("array is valid") + .into_array() + } + + #[tokio::test] + async fn fetch_offsets_includes_extra_endpoint() -> VortexResult<()> { + let list = create_basic_list_array(false); + + let (segments, layout) = write_layout(&flat_list_strategy(), list).await?; + let ctx = LayoutReaderContext::new(); + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + let reader = reader + .as_any() + .downcast_ref::() + .expect("ListReader"); + + let offsets = reader.fetch_offsets(&(1..3))?.await?; + assert_eq!(materialize_u32_array(offsets), vec![2, 4, 5]); + + Ok(()) + } + + #[rstest] + #[case::full_range(0..3, false)] + #[case::partial_start(0..2, false)] + #[case::partial_end(1..3, false)] + #[case::middle_single(1..2, false)] + #[case::empty_range(1..1, false)] + #[case::full_range_null(0..3, true)] + #[tokio::test] + async fn projection_evaluation_round_trips( + #[case] row_range: Range, + #[case] nullable: bool, + ) -> VortexResult<()> { + let list = create_basic_list_array(nullable); + let ctx = LayoutReaderContext::new(); + + let len = usize::try_from(row_range.end - row_range.start)?; + let (segments, layout) = write_layout(&flat_list_strategy(), list.clone()).await?; + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + + let result = reader + .projection_evaluation(&row_range, &root(), MaskFuture::new_true(len))? + .await?; + + let expected = + list.slice(usize::try_from(row_range.start)?..usize::try_from(row_range.end)?)?; + let mut exec_ctx = SESSION.create_execution_ctx(); + assert_arrays_eq!(result, expected, &mut exec_ctx); + Ok(()) + } + + #[tokio::test] + async fn projection_evaluation_applies_mask() -> VortexResult<()> { + let list = create_basic_list_array(false); + let ctx = LayoutReaderContext::new(); + let (segments, layout) = write_layout(&flat_list_strategy(), list.clone()).await?; + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + + let mask = Mask::from_iter([true, false, true]); + let result = reader + .projection_evaluation(&(0..3), &root(), MaskFuture::ready(mask.clone()))? + .await?; + + let expected = list.filter(mask)?; + let mut exec_ctx = SESSION.create_execution_ctx(); + assert_arrays_eq!(result, expected, &mut exec_ctx); + Ok(()) + } + + /// Build a list with 5 rows and lengths [2, 3, 0, 3, 2]. Mirrors `five_list_offsets()`. + fn create_wider_list_array(nullable: bool) -> ArrayRef { + let validity = if nullable { + Validity::Array(BoolArray::from_iter([true, true, false, true, true]).into_array()) + } else { + Validity::NonNullable + }; + ListArray::try_new( + buffer![10i32, 11, 20, 21, 22, 30, 31, 32, 40, 41].into_array(), + buffer![0u32, 2, 5, 5, 8, 10].into_array(), + validity, + ) + .expect("array is valid") + .into_array() + } + + #[rstest] + // Single bit set far from start — exercises sparse path with tight elements range. + #[case::single_middle(Mask::from_iter([false, false, false, true, false]), false)] + // Two far-apart rows — element_mask has a gap between kept spans. + #[case::two_far_apart(Mask::from_iter([true, false, false, true, false]), false)] + // Boundary rows — first and last list. + #[case::boundaries(Mask::from_iter([true, false, false, false, true]), false)] + // Kept row is the empty list (zero-width span). + #[case::kept_empty_row(Mask::from_iter([false, false, true, false, false]), false)] + // Sparse with nullable elements/validity child — exercises validity push-down. + #[case::sparse_nullable(Mask::from_iter([true, false, true, false, true]), true)] + // No rows kept — degenerate empty output. + #[case::all_false(Mask::new_false(5), false)] + #[tokio::test] + async fn projection_evaluation_sparse_mask_round_trips( + #[case] mask: Mask, + #[case] nullable: bool, + ) -> VortexResult<()> { + let list = create_wider_list_array(nullable); + let ctx = LayoutReaderContext::new(); + let (segments, layout) = write_layout(&flat_list_strategy(), list.clone()).await?; + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + + let result = reader + .projection_evaluation(&(0..5), &root(), MaskFuture::ready(mask.clone()))? + .await?; + + let expected = list.filter(mask)?; + let mut exec_ctx = SESSION.create_execution_ctx(); + assert_arrays_eq!(result, expected, &mut exec_ctx); + Ok(()) + } +} diff --git a/vortex-layout/src/layouts/list/writer.rs b/vortex-layout/src/layouts/list/writer.rs new file mode 100644 index 00000000000..8d8998333bd --- /dev/null +++ b/vortex-layout/src/layouts/list/writer.rs @@ -0,0 +1,501 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::StreamExt; +use futures::stream; +use vortex_array::ArrayContext; +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::List; +use vortex_array::arrays::ListView; +use vortex_array::arrays::list::ListDataParts; +use vortex_array::arrays::listview::list_from_list_view; +use vortex_array::dtype::DType; +use vortex_array::matcher::Matcher; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_io::session::RuntimeSessionExt; +use vortex_session::VortexSession; + +use crate::IntoLayout; +use crate::LayoutRef; +use crate::LayoutStrategy; +use crate::layouts::flat::writer::FlatLayoutStrategy; +use crate::layouts::list::ListLayout; +use crate::segments::SegmentSinkRef; +use crate::sequence::SendableSequentialStream; +use crate::sequence::SequenceId; +use crate::sequence::SequencePointer; +use crate::sequence::SequentialStream; +use crate::sequence::SequentialStreamAdapter; +use crate::sequence::SequentialStreamExt; + +/// Strategy for writing list-typed arrays, with a fallback for non-list dtypes. +/// +/// Single-chunk only. For list-typed input the strategy: +/// 1. Canonicalizes the input chunk into a [`ListView`]. +/// 2. Calls [`list_from_list_view`] to rebuild it into zero-copy-to-list form +/// (sorted, gapless, non-overlapping offsets) and produce a [`ListArray`]. +/// 3. Writes the `elements`, `offsets`, and (when nullable) `validity` columns into +/// separately configurable downstream strategies, producing a single [`ListLayout`]. +/// +/// For input whose dtype is not [`DType::List`], the stream is forwarded unchanged to the +/// configured `fallback` strategy. This lets `ListLayoutStrategy` slot in as a leaf strategy in +/// a heterogeneous column writer where some columns are lists and others are not. +/// +/// # Chunking +/// +/// `ListLayoutStrategy` bails on empty or multi-chunk input, matching the convention used by +/// [`FlatLayoutStrategy`]. +/// +/// [`ListArray`]: vortex_array::arrays::ListArray +#[derive(Clone)] +pub struct ListLayoutStrategy { + elements: Arc, + offsets: Arc, + validity: Arc, + fallback: Arc, +} + +impl Default for ListLayoutStrategy { + /// Routes every child (elements, offsets, validity) and the non-list fallback through + /// [`FlatLayoutStrategy`]. Override individual children with the `with_*` builder methods. + fn default() -> Self { + let flat: Arc = Arc::new(FlatLayoutStrategy::default()); + Self { + elements: Arc::clone(&flat), + offsets: Arc::clone(&flat), + validity: Arc::clone(&flat), + fallback: flat, + } + } +} + +impl ListLayoutStrategy { + /// Strategy for the `elements` child. + pub fn with_elements(mut self, elements: Arc) -> Self { + self.elements = elements; + self + } + + /// Strategy for the `offsets` child. + pub fn with_offsets(mut self, offsets: Arc) -> Self { + self.offsets = offsets; + self + } + + /// Strategy for the `validity` child (written only when the list is nullable). + pub fn with_validity(mut self, validity: Arc) -> Self { + self.validity = validity; + self + } + + /// Strategy for non-list input, which is forwarded through this strategy unchanged. + pub fn with_fallback(mut self, fallback: Arc) -> Self { + self.fallback = fallback; + self + } +} + +#[async_trait] +impl LayoutStrategy for ListLayoutStrategy { + async fn write_stream( + &self, + ctx: ArrayContext, + segment_sink: SegmentSinkRef, + mut stream: SendableSequentialStream, + mut eof: SequencePointer, + session: &VortexSession, + ) -> VortexResult { + let dtype = stream.dtype().clone(); + if !dtype.is_list() { + // Non-list input: route to the configured fallback strategy unchanged. + return self + .fallback + .write_stream(ctx, segment_sink, stream, eof, session) + .await; + } + + // Writer wants exactly one chunk + let Some(chunk) = stream.next().await else { + vortex_bail!("ListLayoutStrategy needs a single chunk"); + }; + let (sequence_id, array) = chunk?; + + let mut exec_ctx = session.create_execution_ctx(); + let ListDataParts { + elements, + offsets, + validity, + .. + } = canonicalize_to_list_parts(array, &mut exec_ctx)?; + + // There is one extra element in `offsets` + let row_count = offsets.len().saturating_sub(1); + let validity_array = dtype + .is_nullable() + .then(|| { + validity + .execute_mask(row_count, &mut exec_ctx) + .map(|m| m.into_array()) + }) + .transpose()?; + + // Spawn each child write onto the runtime so they run concurrently + let handle = session.handle(); + let (elements_task, offsets_task, validity_task) = { + let mut sp = sequence_id.descend(); + let mut spawn_layout_writer = |strategy: Arc, array: ArrayRef| { + let stream = single_chunk_stream(array.dtype().clone(), sp.advance(), array); + let child_eof = eof.split_off(); + let ctx = ctx.clone(); + let segment_sink = Arc::clone(&segment_sink); + let session = session.clone(); + handle.spawn_nested(move |h| async move { + let session = session.with_handle(h); + strategy + .write_stream(ctx, segment_sink, stream, child_eof, &session) + .await + }) + }; + ( + spawn_layout_writer(Arc::clone(&self.elements), elements), + spawn_layout_writer(Arc::clone(&self.offsets), offsets), + validity_array.map(|arr| spawn_layout_writer(Arc::clone(&self.validity), arr)), + ) + }; + + // Should not have more than one chunk + if stream.next().await.is_some() { + vortex_bail!("ListLayoutStrategy received more than a single chunk"); + } + + let (elements_layout, offsets_layout, validity_layout) = + futures::try_join!(elements_task, offsets_task, async move { + match validity_task { + Some(t) => t.await.map(Some), + None => Ok(None), + } + },)?; + + Ok(ListLayout::new(dtype, elements_layout, offsets_layout, validity_layout).into_layout()) + } + + fn buffered_bytes(&self) -> u64 { + let list_bytes = self.elements.buffered_bytes() + + self.offsets.buffered_bytes() + + self.validity.buffered_bytes(); + list_bytes.max(self.fallback.buffered_bytes()) + } +} + +/// Canonicalize a list-dtype array into [`ListDataParts`]. Short-circuits when the input is +/// already a `List` or `ListView` array — otherwise drives the execution loop until one of +/// those forms appears. `ListView` is rebuilt into zero-copy-to-list form via +/// [`list_from_list_view`] before its parts are extracted. +fn canonicalize_to_list_parts( + array: ArrayRef, + exec_ctx: &mut ExecutionCtx, +) -> VortexResult { + let canonical = array.execute_until::(exec_ctx)?; + if let Some(list) = canonical.as_opt::() { + Ok(list.into_owned().into_data_parts()) + } else if let Some(view) = canonical.as_opt::() { + Ok(list_from_list_view(view.into_owned(), exec_ctx)?.into_data_parts()) + } else { + unreachable!("AnyList matcher guarantees List or ListView") + } +} + +/// Wrap a single array as a one-shot [`SendableSequentialStream`] for handoff to a child writer. +fn single_chunk_stream( + dtype: DType, + sequence_id: SequenceId, + array: ArrayRef, +) -> SendableSequentialStream { + SequentialStreamAdapter::new( + dtype, + stream::once(async move { Ok((sequence_id, array)) }).boxed(), + ) + .sendable() +} + +/// Matcher for `Array` or `Array`. Used to short-circuit the execution loop +/// when the input is already in (or directly produces) a list form, avoiding a redundant +/// `ListView` round-trip when the writer already has the parts it needs. +struct AnyList; + +impl Matcher for AnyList { + type Match<'a> = (); + + fn try_match(array: &ArrayRef) -> Option> { + (array.as_opt::().is_some() || array.as_opt::().is_some()).then_some(()) + } +} + +#[cfg(test)] +mod tests { + use vortex_array::arrays::BoolArray; + use vortex_array::arrays::ChunkedArray; + use vortex_array::arrays::ListArray; + use vortex_array::arrays::StructArray; + use vortex_array::dtype::Nullability; + use vortex_array::dtype::PType; + use vortex_array::validity::Validity; + use vortex_buffer::buffer; + + use super::*; + use crate::layouts::chunked::writer::ChunkedLayoutStrategy; + use crate::layouts::flat::writer::FlatLayoutStrategy; + use crate::layouts::table::TableStrategy; + use crate::segments::TestSegments; + use crate::sequence::SequentialArrayStreamExt; + use crate::test::SESSION; + + fn flat_list_strategy() -> ListLayoutStrategy { + ListLayoutStrategy::default() + } + + async fn write(strategy: &S, array: ArrayRef) -> VortexResult { + let segments = Arc::new(TestSegments::default()); + let (ptr, eof) = SequenceId::root().split(); + let stream = array.to_array_stream().sequenced(ptr); + strategy + .write_stream(ArrayContext::empty(), segments, stream, eof, &SESSION) + .await + } + + fn i32_list_dtype(nullable: bool) -> DType { + DType::List( + Arc::new(DType::Primitive(PType::I32, Nullability::NonNullable)), + if nullable { + Nullability::Nullable + } else { + Nullability::NonNullable + }, + ) + } + + fn create_basic_list(validity: Validity) -> ArrayRef { + ListArray::try_new( + buffer![1i32, 2, 3, 4, 5].into_array(), + buffer![0u32, 2, 5, 5].into_array(), + validity, + ) + .unwrap() + .into_array() + } + + #[tokio::test] + async fn basic_non_nullable_input() -> VortexResult<()> { + let list = create_basic_list(Validity::NonNullable); + + let layout = write(&flat_list_strategy(), list).await?; + assert_eq!(layout.row_count(), 3); + + insta::assert_snapshot!(layout.display_tree(), @" + vortex.list, dtype: list(i32), children: 2 + ├── elements: vortex.flat, dtype: i32, segment: 0 + └── offsets: vortex.flat, dtype: u32, segment: 1 + "); + Ok(()) + } + + #[tokio::test] + async fn basic_nullable_input() -> VortexResult<()> { + let list = create_basic_list(Validity::Array( + BoolArray::from_iter([true, false, true]).into_array(), + )); + + let layout = write(&flat_list_strategy(), list).await?; + assert_eq!(layout.row_count(), 3); + + insta::assert_snapshot!(layout.display_tree(), @" + vortex.list, dtype: list(i32)?, children: 3 + ├── elements: vortex.flat, dtype: i32, segment: 0 + ├── offsets: vortex.flat, dtype: u32, segment: 1 + └── validity: vortex.flat, dtype: bool, segment: 2 + "); + Ok(()) + } + + /// Non-list input dispatches to the fallback strategy unchanged. + #[tokio::test] + async fn non_list_input_routes_to_fallback() -> VortexResult<()> { + let primitive = buffer![1i32, 2, 3].into_array(); + let layout = write(&flat_list_strategy(), primitive).await?; + insta::assert_snapshot!(layout.display_tree(), @"vortex.flat, dtype: i32, segment: 0"); + Ok(()) + } + + #[tokio::test] + async fn empty_stream_errors() { + let segments = Arc::new(TestSegments::default()); + let (_, eof) = SequenceId::root().split(); + let empty = stream::empty::>().boxed(); + let stream = SequentialStreamAdapter::new(i32_list_dtype(false), empty).sendable(); + + let res = flat_list_strategy() + .write_stream(ArrayContext::empty(), segments, stream, eof, &SESSION) + .await; + assert!(res.is_err()) + } + + #[tokio::test] + async fn chunked_list_input_without_chunked_strategy_fails() -> VortexResult<()> { + let chunk0 = ListArray::try_new( + buffer![1i32, 2].into_array(), + buffer![0u32, 2].into_array(), + Validity::NonNullable, + ) + .unwrap() + .into_array(); + let chunk1 = ListArray::try_new( + buffer![3i32, 4, 5].into_array(), + buffer![0u32, 3].into_array(), + Validity::NonNullable, + ) + .unwrap() + .into_array(); + let chunked = + ChunkedArray::try_new(vec![chunk0, chunk1], i32_list_dtype(false))?.into_array(); + + let res = write(&flat_list_strategy(), chunked).await; + assert!(res.is_err()); + Ok(()) + } + + #[tokio::test] + async fn list_of_struct_tree() -> VortexResult<()> { + let struct_array = StructArray::from_fields( + [ + ("a", buffer![1i32, 2, 3, 4, 5].into_array()), + ("b", buffer![10i32, 20, 30, 40, 50].into_array()), + ] + .as_slice(), + )? + .into_array(); + let list = ListArray::try_new( + struct_array, + buffer![0u32, 2, 5, 5].into_array(), + Validity::NonNullable, + )? + .into_array(); + + let flat: Arc = Arc::new(FlatLayoutStrategy::default()); + let table_strategy: Arc = + Arc::new(TableStrategy::new(Arc::clone(&flat), Arc::clone(&flat))); + let writer = ListLayoutStrategy::default().with_elements(table_strategy); + + let layout = write(&writer, list).await?; + insta::assert_snapshot!(layout.display_tree(), @" + vortex.list, dtype: list({a=i32, b=i32}), children: 2 + ├── elements: vortex.struct, dtype: {a=i32, b=i32}, children: 2 + │ ├── a: vortex.flat, dtype: i32, segment: 1 + │ └── b: vortex.flat, dtype: i32, segment: 2 + └── offsets: vortex.flat, dtype: u32, segment: 0 + "); + Ok(()) + } + + #[tokio::test] + async fn list_of_list_tree() -> VortexResult<()> { + let inner_list = ListArray::try_new( + buffer![1i32, 2, 3, 4, 5, 6].into_array(), + buffer![0u32, 2, 5, 5, 6].into_array(), + Validity::NonNullable, + )? + .into_array(); + let list = ListArray::try_new( + inner_list, + buffer![0u32, 2, 4].into_array(), + Validity::NonNullable, + )? + .into_array(); + + let writer = + ListLayoutStrategy::default().with_elements(Arc::new(ListLayoutStrategy::default())); + let layout = write(&writer, list).await?; + insta::assert_snapshot!(layout.display_tree(), @" + vortex.list, dtype: list(list(i32)), children: 2 + ├── elements: vortex.list, dtype: list(i32), children: 2 + │ ├── elements: vortex.flat, dtype: i32, segment: 1 + │ └── offsets: vortex.flat, dtype: u32, segment: 2 + └── offsets: vortex.flat, dtype: u32, segment: 0 + "); + Ok(()) + } + + #[tokio::test] + async fn list_of_list_of_list_tree() -> VortexResult<()> { + let innermost = ListArray::try_new( + buffer![1i32, 2, 3, 4].into_array(), + buffer![0u32, 2, 4].into_array(), + Validity::NonNullable, + )? + .into_array(); + let middle = ListArray::try_new( + innermost, + buffer![0u32, 2].into_array(), + Validity::NonNullable, + )? + .into_array(); + let outer = + ListArray::try_new(middle, buffer![0u32, 1].into_array(), Validity::NonNullable)? + .into_array(); + + let writer = ListLayoutStrategy::default().with_elements(Arc::new( + ListLayoutStrategy::default().with_elements(Arc::new(ListLayoutStrategy::default())), + )); + let layout = write(&writer, outer).await?; + insta::assert_snapshot!(layout.display_tree(), @" + vortex.list, dtype: list(list(list(i32))), children: 2 + ├── elements: vortex.list, dtype: list(list(i32)), children: 2 + │ ├── elements: vortex.list, dtype: list(i32), children: 2 + │ │ ├── elements: vortex.flat, dtype: i32, segment: 2 + │ │ └── offsets: vortex.flat, dtype: u32, segment: 3 + │ └── offsets: vortex.flat, dtype: u32, segment: 1 + └── offsets: vortex.flat, dtype: u32, segment: 0 + "); + Ok(()) + } + + #[tokio::test] + async fn chunked_list_input_with_chunked_strategy_succeeds() -> VortexResult<()> { + let chunk0 = ListArray::try_new( + buffer![1i32, 2, 3].into_array(), + buffer![0u32, 2, 3].into_array(), + Validity::NonNullable, + ) + .unwrap() + .into_array(); + let chunk1 = ListArray::try_new( + buffer![4i32, 5, 6, 7].into_array(), + buffer![0u32, 1, 4].into_array(), + Validity::NonNullable, + ) + .unwrap() + .into_array(); + + let chunked = + ChunkedArray::try_new(vec![chunk0, chunk1], i32_list_dtype(false))?.into_array(); + + let layout = write(&ChunkedLayoutStrategy::new(flat_list_strategy()), chunked).await?; + + insta::assert_snapshot!(layout.display_tree(), @" + vortex.chunked, dtype: list(i32), children: 2 + ├── [0]: vortex.list, dtype: list(i32), children: 2 + │ ├── elements: vortex.flat, dtype: i32, segment: 0 + │ └── offsets: vortex.flat, dtype: u32, segment: 1 + └── [1]: vortex.list, dtype: list(i32), children: 2 + ├── elements: vortex.flat, dtype: i32, segment: 2 + └── offsets: vortex.flat, dtype: u32, segment: 3 + "); + Ok(()) + } +} diff --git a/vortex-layout/src/layouts/mod.rs b/vortex-layout/src/layouts/mod.rs index 18df5b8f347..47fa31aa3d9 100644 --- a/vortex-layout/src/layouts/mod.rs +++ b/vortex-layout/src/layouts/mod.rs @@ -16,6 +16,7 @@ pub mod dict; pub mod file_stats; pub mod flat; pub(crate) mod foreign; +pub mod list; pub(crate) mod partitioned; pub mod repartition; pub mod row_idx; diff --git a/vortex-layout/src/session.rs b/vortex-layout/src/session.rs index 42c8906d107..a6831abcdcd 100644 --- a/vortex-layout/src/session.rs +++ b/vortex-layout/src/session.rs @@ -11,6 +11,7 @@ use crate::LayoutEncodingRef; use crate::layouts::chunked::ChunkedLayoutEncoding; use crate::layouts::dict::DictLayoutEncoding; use crate::layouts::flat::FlatLayoutEncoding; +use crate::layouts::list::ListLayoutEncoding; use crate::layouts::struct_::StructLayoutEncoding; use crate::layouts::zoned::LegacyStatsLayoutEncoding; use crate::layouts::zoned::ZonedLayoutEncoding; @@ -56,6 +57,7 @@ impl Default for LayoutSession { LegacyStatsLayoutEncoding.as_ref(), ); layouts.register(DictLayoutEncoding.id(), DictLayoutEncoding.as_ref()); + layouts.register(ListLayoutEncoding.id(), ListLayoutEncoding.as_ref()); Self { registry: layouts } }