From 6ea40f53fe7f24cf210ad641fbd9e632e11b3d12 Mon Sep 17 00:00:00 2001 From: Luke Kim <80174+lukekim@users.noreply.github.com> Date: Fri, 12 Jun 2026 13:46:36 -0700 Subject: [PATCH 1/4] perf(scan): sub-split chunk-boundary spans wider than IDEAL_SPLIT_SIZE SplitBy::Layout now sub-divides any span between adjacent chunk boundaries wider than IDEAL_SPLIT_SIZE (100k rows) into evenly sized row-range splits, so files with few large chunks decode across multiple cores. Subdivision only inserts points strictly between existing adjacent boundaries: the half-open ranges consumers derive remain a contiguous, non-overlapping, exact partition of the same rows. The arithmetic saturates at u64::MAX. Tests: unit/property/overflow coverage for the subdivision helper, an end-to-end test that a 250k-row single flat chunk scans correctly across sub-divided splits with bounded split sizes, an rstest-parameterized end-to-end test for fixed-size SplitBy::RowCount scans (previously only covered at the boundary-math level), and an end-to-end test that ~120-byte string columns written with the default strategy keep their natural ~8k-row chunk splits untouched by the cap. Signed-off-by: Luke Kim <80174+lukekim@users.noreply.github.com> --- Cargo.lock | 1 + vortex-file/Cargo.toml | 1 + vortex-file/src/tests.rs | 134 ++++++++++++++++++++++++++ vortex-layout/src/scan/split_by.rs | 145 ++++++++++++++++++++++++++++- 4 files changed, 279 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 00f5345aa76..caaa714901d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9775,6 +9775,7 @@ dependencies = [ "oneshot", "parking_lot", "pin-project-lite", + "rstest", "tokio", "tracing", "vortex-alp", diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index c4bf980d683..752ccbc753d 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -59,6 +59,7 @@ vortex-zigzag = { workspace = true } vortex-zstd = { workspace = true, optional = true } [dev-dependencies] +rstest = { workspace = true } tokio = { workspace = true, features = ["full"] } vortex-array = { workspace = true, features = ["_test-harness"] } vortex-io = { workspace = true, features = ["tokio"] } diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index e320cf2e9d9..9b34b25fdea 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -10,6 +10,7 @@ use bytes::Bytes; use futures::StreamExt; use futures::TryStreamExt; use futures::pin_mut; +use rstest::rstest; use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; @@ -65,7 +66,9 @@ use vortex_buffer::buffer; use vortex_error::VortexResult; use vortex_io::session::RuntimeSession; use vortex_layout::Layout; +use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; use vortex_layout::scan::scan_builder::ScanBuilder; +use vortex_layout::scan::split_by::SplitBy; use vortex_layout::session::LayoutSession; use vortex_session::VortexSession; @@ -1813,6 +1816,137 @@ fn assert_offsets_ordered(before: &[u64], after: &[u64], context: &str) { } } +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn test_large_flat_chunk_scan_subdivides_splits() -> VortexResult<()> { + // A single flat (unchunked) 250k-row layout spans the 100k sub-split threshold, so the scan + // must decode it as multiple row-range splits. + const N_ROWS: u64 = 250_000; + let values = + Buffer::from_iter((0..N_ROWS as i32).map(|i| if i % 2 == 0 { i } else { -i })).into_array(); + + let mut buf = ByteBufferMut::empty(); + SESSION + .write_options() + .with_strategy(Arc::new(FlatLayoutStrategy::default())) + .write(&mut buf, values.to_array_stream()) + .await?; + + let file = SESSION.open_options().open_buffer(buf)?; + + // Sub-division caps each split at 100k rows while tiling the file exactly. + let splits = file.splits()?; + assert!(splits.len() > 1, "expected sub-divided splits: {splits:?}"); + assert!(splits.iter().all(|r| r.end - r.start <= 100_000)); + assert_eq!(splits.first().map(|r| r.start), Some(0)); + assert_eq!(splits.last().map(|r| r.end), Some(N_ROWS)); + assert!(splits.windows(2).all(|w| w[0].end == w[1].start)); + + // A full scan across the sub-splits returns the original rows. + let result = file.scan()?.into_array_stream()?.read_all().await?; + assert_arrays_eq!(result, values); + + // A filtered scan crossing sub-split boundaries selects exactly the matching rows. + let result = file + .scan()? + .with_filter(gt(root(), lit(0i32))) + .into_array_stream()? + .read_all() + .await?; + let expected = + Buffer::from_iter((0..N_ROWS as i32).filter(|i| i % 2 == 0 && *i > 0)).into_array(); + assert_arrays_eq!(result, expected); + + Ok(()) +} + +#[rstest] +#[case::unaligned(33_333)] +#[case::exceeds_file(300_000)] +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn test_flat_chunk_scan_with_row_count_splits( + #[case] rows_per_split: usize, +) -> VortexResult<()> { + // Fixed-size splits ignore chunk boundaries entirely, so scans must produce identical + // results whether the split size straddles the chunk arbitrarily or exceeds the file's + // row count (a single split). + const N_ROWS: u64 = 250_000; + let values = + Buffer::from_iter((0..N_ROWS as i32).map(|i| if i % 2 == 0 { i } else { -i })).into_array(); + + let mut buf = ByteBufferMut::empty(); + SESSION + .write_options() + .with_strategy(Arc::new(FlatLayoutStrategy::default())) + .write(&mut buf, values.to_array_stream()) + .await?; + + let file = SESSION.open_options().open_buffer(buf)?; + + let result = file + .scan()? + .with_split_by(SplitBy::RowCount(rows_per_split)) + .into_array_stream()? + .read_all() + .await?; + assert_arrays_eq!(result, values); + + let result = file + .scan()? + .with_split_by(SplitBy::RowCount(rows_per_split)) + .with_filter(gt(root(), lit(0i32))) + .into_array_stream()? + .read_all() + .await?; + let expected = + Buffer::from_iter((0..N_ROWS as i32).filter(|i| i % 2 == 0 && *i > 0)).into_array(); + assert_arrays_eq!(result, expected); + + Ok(()) +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn test_string_chunks_stay_fine_grained_under_split_cap() -> VortexResult<()> { + // Default writing targets ~1MiB uncompressed blocks, so ~120-byte strings chunk near the + // 8192-row block multiple. These natural boundaries sit far below the 100k-row sub-split + // cap, and SplitBy::Layout must pass them through untouched. + const N_ROWS: usize = 40_000; + let strings = VarBinArray::from_iter( + (0..N_ROWS).map(|i| Some(format!("{i:0>120}"))), + DType::Utf8(Nullability::Nullable), + ) + .into_array(); + let st = StructArray::from_fields(&[("s", strings)])?.into_array(); + + let mut buf = ByteBufferMut::empty(); + SESSION + .write_options() + .write(&mut buf, st.to_array_stream()) + .await?; + + let file = SESSION.open_options().open_buffer(buf)?; + + let splits = file.splits()?; + assert!( + splits.len() > 1, + "expected multiple natural chunks: {splits:?}" + ); + assert!( + splits.iter().all(|r| r.end - r.start <= 16_384), + "string chunks should stay fine-grained, not anywhere near the 100k cap: {splits:?}" + ); + assert_eq!(splits.first().map(|r| r.start), Some(0)); + assert_eq!(splits.last().map(|r| r.end), Some(N_ROWS as u64)); + assert!(splits.windows(2).all(|w| w[0].end == w[1].start)); + + let result = file.scan()?.into_array_stream()?.read_all().await?; + assert_arrays_eq!(result, st); + + Ok(()) +} + #[tokio::test] #[cfg_attr(miri, ignore)] async fn test_segment_ordering_dict_codes_before_values() -> VortexResult<()> { diff --git a/vortex-layout/src/scan/split_by.rs b/vortex-layout/src/scan/split_by.rs index cdaf260cdb8..2af312f7537 100644 --- a/vortex-layout/src/scan/split_by.rs +++ b/vortex-layout/src/scan/split_by.rs @@ -5,11 +5,19 @@ use std::iter::once; use std::ops::Range; use vortex_array::dtype::FieldMask; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use crate::LayoutReader; use crate::RowSplits; use crate::SplitRange; +use crate::scan::IDEAL_SPLIT_SIZE; + +/// Chunk-boundary spans wider than this are sub-divided into multiple row-range splits so that a +/// file with few, large chunks can be decoded across multiple cores rather than one. +/// +/// Reuses [`IDEAL_SPLIT_SIZE`] as the target span per split. +const MAX_SPLIT_ROWS: u64 = IDEAL_SPLIT_SIZE; /// Defines how the Vortex file is split into batches for reading. /// @@ -17,7 +25,9 @@ use crate::SplitRange; #[derive(Default, Copy, Clone, Debug)] pub enum SplitBy { #[default] - /// Splits any time there is a chunk boundary in the file. + /// Splits any time there is a chunk boundary in the file. Spans between adjacent boundaries + /// wider than `MAX_SPLIT_ROWS` are further sub-divided so that a file with few, large chunks + /// can still be decoded across multiple cores. Layout, /// Splits every n rows. RowCount(usize), @@ -44,7 +54,7 @@ impl SplitBy { &SplitRange::root(row_range.clone())?, &mut row_splits, )?; - row_splits.into_sorted_deduped() + subdivide_large_spans(row_splits.into_sorted_deduped(), MAX_SPLIT_ROWS) } SplitBy::RowCount(n) => row_range .clone() @@ -55,6 +65,60 @@ impl SplitBy { } } +/// Sub-divide any gap between adjacent split boundaries that is wider than `max_span` into evenly +/// sized row-range sub-splits. +/// +/// `boundaries` is the sorted, deduplicated list of split points produced by the layout (chunk +/// boundaries). Downstream consumers turn this list into half-open ranges by pairing adjacent +/// entries (`tuple_windows().map(|(s, e)| s..e)`), so the row coverage is fully determined by the +/// boundary set. This function only *inserts* points that lie strictly between two existing +/// adjacent boundaries; it never moves or removes a boundary. Splitting `[lo, hi)` at an interior +/// point `m` (with `lo < m < hi`) yields exactly `[lo, m) + [m, hi)`, so the union of ranges is +/// unchanged: the rows are still partitioned contiguously, with no gaps and no overlaps, covering +/// every row exactly once. The output remains sorted and deduplicated. +fn subdivide_large_spans(boundaries: Vec, max_span: u64) -> Vec { + debug_assert!(boundaries.is_sorted(), "boundaries must be sorted"); + debug_assert!(max_span > 0, "max_span must be non-zero"); + + // Fast path: nothing to split (also covers the empty / single-boundary cases). + if boundaries.len() < 2 || boundaries.windows(2).all(|w| w[1] - w[0] <= max_span) { + return boundaries; + } + + let mut out = Vec::with_capacity(boundaries.len() * 2); + for window in boundaries.windows(2) { + let lo = window[0]; + let hi = window[1]; + // Always emit the lower boundary; the final `hi` is appended once after the loop. + out.push(lo); + + let span = hi - lo; + if span > max_span { + // Number of sub-ranges so that each is <= max_span. `span > max_span` and + // `max_span >= 1` guarantee `sub_count >= 2`. + let sub_count = span.div_ceil(max_span); + // Even sub-range size (rounded up); the last sub-range absorbs any remainder and is + // bounded by `hi`. Inserted points `lo + k*sub_size` are strictly in `(lo, hi)`. + let sub_size = span.div_ceil(sub_count); + let mut point = lo + sub_size; + while point < hi { + out.push(point); + // Saturating: a sum past u64::MAX can never be < `hi`, so the loop exits. + point = point.saturating_add(sub_size); + } + } + } + // Append the final boundary (the `hi` of the last window). + out.push(*boundaries.last().vortex_expect("len >= 2 checked above")); + + debug_assert!(out.is_sorted(), "subdivided boundaries must stay sorted"); + debug_assert!( + out.windows(2).all(|w| w[0] < w[1]), + "subdivided boundaries must stay strictly increasing (deduped)" + ); + out +} + #[cfg(test)] mod test { use std::any::Any; @@ -212,4 +276,81 @@ mod test { .unwrap(); assert_eq!(splits, vec![0u64, 5, 10]); } + + #[test] + fn subdivide_below_threshold_is_noop() { + // Gaps all <= max_span: boundaries returned unchanged. + assert_eq!(subdivide_large_spans(vec![0, 5, 10], 100), vec![0, 5, 10]); + assert_eq!(subdivide_large_spans(vec![0, 100], 100), vec![0, 100]); + assert_eq!( + subdivide_large_spans(Vec::::new(), 100), + Vec::::new() + ); + assert_eq!(subdivide_large_spans(vec![7], 100), vec![7]); + } + + #[test] + fn subdivide_near_u64_max_does_not_overflow() { + // The increment past the last interior point would overflow without saturating math. + let hi = u64::MAX; + let out = subdivide_large_spans(vec![hi - 3, hi], 2); + assert_eq!(out, vec![hi - 3, hi - 1, hi]); + } + + #[test] + fn subdivide_splits_large_single_chunk() { + // One large chunk [0, 1000) with max_span 100 -> 10 contiguous sub-splits. + let out = subdivide_large_spans(vec![0, 1000], 100); + assert_eq!( + out, + vec![0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000] + ); + } + + #[test] + fn subdivide_only_large_gaps() { + // Mixed: [0,50) stays whole, [50, 350) splits into 100-row pieces, [350, 360) stays whole. + let out = subdivide_large_spans(vec![0, 50, 350, 360], 100); + assert_eq!(out, vec![0, 50, 150, 250, 350, 360]); + } + + /// Property: for any sorted, deduped boundary set, subdivision (a) keeps the first and last + /// boundary, (b) stays strictly increasing, and (c) preserves exact row coverage — the union + /// of the half-open ranges the consumer derives is identical before and after. + #[test] + fn subdivide_preserves_exact_coverage() { + let cases: Vec> = vec![ + vec![0, 1000], + vec![0, 7, 250_001], + vec![0, 5, 10, 15, 20, 25, 30], + vec![3, 1_000_003], + vec![0, 99_999, 100_000, 300_000], + ]; + for boundaries in cases { + let out = subdivide_large_spans(boundaries.clone(), MAX_SPLIT_ROWS); + // (a) endpoints preserved + assert_eq!(out.first(), boundaries.first()); + assert_eq!(out.last(), boundaries.last()); + // (b) strictly increasing (sorted + deduped) + assert!( + out.windows(2).all(|w| w[0] < w[1]), + "not strictly increasing: {out:?}" + ); + // (c) exact coverage: ranges from `out` tile the same span with no gap/overlap, and + // every original boundary is still present (so original ranges are sub-divided, never + // merged or shifted). + let total: u64 = out.windows(2).map(|w| w[1] - w[0]).sum(); + let expected_total = boundaries.last().unwrap() - boundaries.first().unwrap(); + assert_eq!( + total, expected_total, + "coverage span changed for {boundaries:?}" + ); + for b in &boundaries { + assert!( + out.contains(b), + "original boundary {b} dropped from {out:?}" + ); + } + } + } } From 9e9838caed0f81bbb41a00c0b4053c3297eaad0d Mon Sep 17 00:00:00 2001 From: Luke Kim <80174+lukekim@users.noreply.github.com> Date: Fri, 12 Jun 2026 13:56:44 -0700 Subject: [PATCH 2/4] test(file): decouple split tests from writer defaults per review Introduce a test-local MAX_SPLIT_ROWS mirroring the private IDEAL_SPLIT_SIZE instead of repeating 100_000, and bound the string-chunk test relative to the cap (< MAX_SPLIT_ROWS / 4) rather than pinning the current ~8k repartition default. Signed-off-by: Luke Kim <80174+lukekim@users.noreply.github.com> --- vortex-file/src/tests.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index 9b34b25fdea..e5b372cddca 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -1816,6 +1816,10 @@ fn assert_offsets_ordered(before: &[u64], after: &[u64], context: &str) { } } +/// Mirrors the (private) `IDEAL_SPLIT_SIZE` that `SplitBy::Layout` uses to sub-divide wide +/// chunk-boundary spans: layout splits are never wider than this many rows. +const MAX_SPLIT_ROWS: u64 = 100_000; + #[tokio::test] #[cfg_attr(miri, ignore)] async fn test_large_flat_chunk_scan_subdivides_splits() -> VortexResult<()> { @@ -1834,10 +1838,10 @@ async fn test_large_flat_chunk_scan_subdivides_splits() -> VortexResult<()> { let file = SESSION.open_options().open_buffer(buf)?; - // Sub-division caps each split at 100k rows while tiling the file exactly. + // Sub-division caps each split at MAX_SPLIT_ROWS while tiling the file exactly. let splits = file.splits()?; assert!(splits.len() > 1, "expected sub-divided splits: {splits:?}"); - assert!(splits.iter().all(|r| r.end - r.start <= 100_000)); + assert!(splits.iter().all(|r| r.end - r.start <= MAX_SPLIT_ROWS)); assert_eq!(splits.first().map(|r| r.start), Some(0)); assert_eq!(splits.last().map(|r| r.end), Some(N_ROWS)); assert!(splits.windows(2).all(|w| w[0].end == w[1].start)); @@ -1909,9 +1913,9 @@ async fn test_flat_chunk_scan_with_row_count_splits( #[tokio::test] #[cfg_attr(miri, ignore)] async fn test_string_chunks_stay_fine_grained_under_split_cap() -> VortexResult<()> { - // Default writing targets ~1MiB uncompressed blocks, so ~120-byte strings chunk near the - // 8192-row block multiple. These natural boundaries sit far below the 100k-row sub-split - // cap, and SplitBy::Layout must pass them through untouched. + // Default writing targets ~1MiB uncompressed blocks, so ~120-byte strings chunk at a few + // thousand rows (~8k with today's defaults). These natural boundaries sit far below the + // sub-split cap, and SplitBy::Layout must pass them through untouched. const N_ROWS: usize = 40_000; let strings = VarBinArray::from_iter( (0..N_ROWS).map(|i| Some(format!("{i:0>120}"))), @@ -1934,8 +1938,8 @@ async fn test_string_chunks_stay_fine_grained_under_split_cap() -> VortexResult< "expected multiple natural chunks: {splits:?}" ); assert!( - splits.iter().all(|r| r.end - r.start <= 16_384), - "string chunks should stay fine-grained, not anywhere near the 100k cap: {splits:?}" + splits.iter().all(|r| r.end - r.start < MAX_SPLIT_ROWS / 4), + "string chunks should stay fine-grained, nowhere near the split cap: {splits:?}" ); assert_eq!(splits.first().map(|r| r.start), Some(0)); assert_eq!(splits.last().map(|r| r.end), Some(N_ROWS as u64)); From f6534fc65c6fd837a1206d321ddb7056ae551666 Mon Sep 17 00:00:00 2001 From: Luke Kim <80174+lukekim@users.noreply.github.com> Date: Wed, 24 Jun 2026 08:37:14 -0700 Subject: [PATCH 3/4] test: add execution context to assert_arrays_eq in large chunk scan tests --- vortex-file/src/tests.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index 004a695bbd4..32ec3c75f8d 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -1824,6 +1824,7 @@ const MAX_SPLIT_ROWS: u64 = 100_000; async fn test_large_flat_chunk_scan_subdivides_splits() -> VortexResult<()> { // A single flat (unchunked) 250k-row layout spans the 100k sub-split threshold, so the scan // must decode it as multiple row-range splits. + let mut ctx = SESSION.create_execution_ctx(); const N_ROWS: u64 = 250_000; let values = Buffer::from_iter((0..N_ROWS as i32).map(|i| if i % 2 == 0 { i } else { -i })).into_array(); @@ -1847,7 +1848,7 @@ async fn test_large_flat_chunk_scan_subdivides_splits() -> VortexResult<()> { // A full scan across the sub-splits returns the original rows. let result = file.scan()?.into_array_stream()?.read_all().await?; - assert_arrays_eq!(result, values); + assert_arrays_eq!(result, values, &mut ctx); // A filtered scan crossing sub-split boundaries selects exactly the matching rows. let result = file @@ -1858,7 +1859,7 @@ async fn test_large_flat_chunk_scan_subdivides_splits() -> VortexResult<()> { .await?; let expected = Buffer::from_iter((0..N_ROWS as i32).filter(|i| i % 2 == 0 && *i > 0)).into_array(); - assert_arrays_eq!(result, expected); + assert_arrays_eq!(result, expected, &mut ctx); Ok(()) } @@ -1874,6 +1875,7 @@ async fn test_flat_chunk_scan_with_row_count_splits( // Fixed-size splits ignore chunk boundaries entirely, so scans must produce identical // results whether the split size straddles the chunk arbitrarily or exceeds the file's // row count (a single split). + let mut ctx = SESSION.create_execution_ctx(); const N_ROWS: u64 = 250_000; let values = Buffer::from_iter((0..N_ROWS as i32).map(|i| if i % 2 == 0 { i } else { -i })).into_array(); @@ -1893,7 +1895,7 @@ async fn test_flat_chunk_scan_with_row_count_splits( .into_array_stream()? .read_all() .await?; - assert_arrays_eq!(result, values); + assert_arrays_eq!(result, values, &mut ctx); let result = file .scan()? @@ -1904,7 +1906,7 @@ async fn test_flat_chunk_scan_with_row_count_splits( .await?; let expected = Buffer::from_iter((0..N_ROWS as i32).filter(|i| i % 2 == 0 && *i > 0)).into_array(); - assert_arrays_eq!(result, expected); + assert_arrays_eq!(result, expected, &mut ctx); Ok(()) } @@ -1915,6 +1917,7 @@ async fn test_string_chunks_stay_fine_grained_under_split_cap() -> VortexResult< // Default writing targets ~1MiB uncompressed blocks, so ~120-byte strings chunk at a few // thousand rows (~8k with today's defaults). These natural boundaries sit far below the // sub-split cap, and SplitBy::Layout must pass them through untouched. + let mut ctx = SESSION.create_execution_ctx(); const N_ROWS: usize = 40_000; let strings = VarBinArray::from_iter( (0..N_ROWS).map(|i| Some(format!("{i:0>120}"))), @@ -1945,7 +1948,7 @@ async fn test_string_chunks_stay_fine_grained_under_split_cap() -> VortexResult< assert!(splits.windows(2).all(|w| w[0].end == w[1].start)); let result = file.scan()?.into_array_stream()?.read_all().await?; - assert_arrays_eq!(result, st); + assert_arrays_eq!(result, st, &mut ctx); Ok(()) } From 2ef67a87bca2d2fd4ad81a6ebab3fcadb3c84bc4 Mon Sep 17 00:00:00 2001 From: Luke Kim <80174+lukekim@users.noreply.github.com> Date: Wed, 24 Jun 2026 08:49:24 -0700 Subject: [PATCH 4/4] DCO Remediation Commit for Luke Kim <80174+lukekim@users.noreply.github.com> I, Luke Kim <80174+lukekim@users.noreply.github.com>, hereby add my Signed-off-by to this commit: f6534fc65c6fd837a1206d321ddb7056ae551666 Signed-off-by: Luke Kim <80174+lukekim@users.noreply.github.com>