diff --git a/benchmarks/lance-bench/src/random_access.rs b/benchmarks/lance-bench/src/random_access.rs index 53138552742..465a0ee492d 100644 --- a/benchmarks/lance-bench/src/random_access.rs +++ b/benchmarks/lance-bench/src/random_access.rs @@ -82,6 +82,7 @@ pub async fn nested_structs_lance() -> anyhow::Result { pub struct LanceRandomAccessor { name: String, dataset: Dataset, + projection: ProjectionRequest, } impl LanceRandomAccessor { @@ -92,9 +93,11 @@ impl LanceRandomAccessor { .ok_or_else(|| anyhow!("Invalid dataset path"))?, ) .await?; + let projection = ProjectionRequest::from_schema(dataset.schema().clone()); Ok(Self { name: name.into(), dataset, + projection, }) } } @@ -110,8 +113,7 @@ impl RandomAccessor for LanceRandomAccessor { } async fn take(&self, indices: &[u64]) -> anyhow::Result { - let projection = ProjectionRequest::from_schema(self.dataset.schema().clone()); - let result = self.dataset.take(indices, projection).await?; + let result = self.dataset.take(indices, self.projection.clone()).await?; Ok(result.num_rows()) } } diff --git a/vortex-layout/src/scan/repeated_scan.rs b/vortex-layout/src/scan/repeated_scan.rs index a0f2101556c..70b62bdb943 100644 --- a/vortex-layout/src/scan/repeated_scan.rs +++ b/vortex-layout/src/scan/repeated_scan.rs @@ -17,6 +17,7 @@ use vortex_array::iter::ArrayIterator; use vortex_array::iter::ArrayIteratorAdapter; use vortex_array::stream::ArrayStream; use vortex_array::stream::ArrayStreamAdapter; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_io::runtime::BlockingRuntime; use vortex_io::session::RuntimeSessionExt; @@ -122,15 +123,17 @@ impl RepeatedScan { &self, row_range: Option>, ) -> VortexResult>>>> { - let ctx = Arc::new(TaskContext { - selection: self.selection.clone(), - filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))), - reader: Arc::clone(&self.layout_reader), - projection: self.projection.clone(), - mapper: Arc::clone(&self.map_fn), - }); - + let selection_range: Option> = match &self.selection { + Selection::IncludeByIndex(buf) if !buf.is_empty() => { + Some(buf[0]..buf[buf.len() - 1] + 1) + } + Selection::IncludeRoaring(roaring) if !roaring.is_empty() => { + Some(roaring.min().vortex_expect("empty")..roaring.max().vortex_expect("empty") + 1) + } + _ => None, + }; let row_range = intersect_ranges(self.row_range.as_ref(), row_range); + let row_range = intersect_ranges(row_range.as_ref(), selection_range); let ranges = match &self.splits { Splits::Natural(btree_set) => { @@ -167,17 +170,23 @@ impl RepeatedScan { let mut limit = self.limit; let mut tasks = Vec::new(); + let ctx = Arc::new(TaskContext { + filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))), + reader: Arc::clone(&self.layout_reader), + projection: self.projection.clone(), + mapper: Arc::clone(&self.map_fn), + }); for range in ranges { - if range.start >= range.end { + let row_mask = self.selection.row_mask(&range); + if row_mask.mask().all_false() { continue; } + tasks.push(split_exec(Arc::clone(&ctx), row_mask, limit.as_mut())?); if limit.is_some_and(|l| l == 0) { break; } - - tasks.push(split_exec(Arc::clone(&ctx), range, limit.as_mut())?); } Ok(tasks) diff --git a/vortex-layout/src/scan/tasks.rs b/vortex-layout/src/scan/tasks.rs index dc0b489b1f2..a86546e15ef 100644 --- a/vortex-layout/src/scan/tasks.rs +++ b/vortex-layout/src/scan/tasks.rs @@ -4,19 +4,17 @@ //! Split scanning task implementation. use std::ops::BitAnd; -use std::ops::Range; use std::sync::Arc; use bit_vec::BitVec; use futures::FutureExt; use futures::future::BoxFuture; -use futures::future::ok; use vortex_array::ArrayRef; use vortex_array::MaskFuture; use vortex_array::expr::Expression; use vortex_error::VortexResult; use vortex_mask::Mask; -use vortex_scan::selection::Selection; +use vortex_scan::row_mask::RowMask; use crate::LayoutReader; use crate::scan::filter::FilterExpr; @@ -24,6 +22,8 @@ use crate::scan::filter::FilterExpr; pub type TaskFuture = BoxFuture<'static, VortexResult>; /// Logic for executing a single split reading task. +/// N.B. read_mask should be evaluated against all_false() before calling this +/// method to avoid creating an empty TaskFuture. /// /// # Task execution flow /// @@ -37,16 +37,11 @@ pub type TaskFuture = BoxFuture<'static, VortexResult>; /// finally mapping the Vortex columnar record batches into some result type `A`. pub fn split_exec( ctx: Arc>, - split: Range, + read_mask: RowMask, limit: Option<&mut u64>, ) -> VortexResult>> { - // Apply the selection to calculate a read mask - let read_mask = ctx.selection.row_mask(&split); let row_range = read_mask.row_range(); let row_mask = read_mask.mask().clone(); - if row_mask.all_false() { - return Ok(ok(None).boxed()); - } let filter_mask = match ctx.filter.as_ref() { // No filter == immediate mask @@ -156,9 +151,9 @@ pub fn split_exec( } /// Information needed to execute a single split task. +/// +/// Row selection is evaluated before creating a split task so it's not included pub struct TaskContext { - /// A row selection to apply. - pub selection: Selection, /// The shared filter expression. pub filter: Option>, /// The layout reader. diff --git a/vortex-scan/src/selection.rs b/vortex-scan/src/selection.rs index 5f0bbd79939..79abafc3d4d 100644 --- a/vortex-scan/src/selection.rs +++ b/vortex-scan/src/selection.rs @@ -43,6 +43,10 @@ impl Selection { /// Extract the [`RowMask`] for the given range from this selection. pub fn row_mask(&self, range: &Range) -> RowMask { + if range.start >= range.end { + return RowMask::new(0, Mask::AllFalse(0)); + } + // Saturating subtraction to prevent underflow, though range should be valid let range_diff = range.end.saturating_sub(range.start); let range_len = usize::try_from(range_diff).unwrap_or_else(|_| {