Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ vortex-zigzag = { workspace = true }
vortex-zstd = { workspace = true, optional = true }

[dev-dependencies]
rstest = { workspace = true }
tokio = { workspace = true, features = ["full"] }
vortex-array = { workspace = true, features = ["_test-harness"] }
vortex-io = { workspace = true, features = ["tokio"] }
Expand Down
141 changes: 141 additions & 0 deletions vortex-file/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use bytes::Bytes;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::pin_mut;
use rstest::rstest;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
Expand Down Expand Up @@ -64,9 +65,11 @@ use vortex_buffer::buffer;
use vortex_error::VortexResult;
use vortex_io::session::RuntimeSession;
use vortex_layout::Layout;
use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
use vortex_layout::layouts::zoned::LegacyStats;
use vortex_layout::layouts::zoned::Zoned;
use vortex_layout::scan::scan_builder::ScanBuilder;
use vortex_layout::scan::split_by::SplitBy;
use vortex_layout::session::LayoutSession;
use vortex_session::VortexSession;

Expand Down Expand Up @@ -1812,6 +1815,144 @@ fn assert_offsets_ordered(before: &[u64], after: &[u64], context: &str) {
}
}

/// Mirrors the (private) `IDEAL_SPLIT_SIZE` that `SplitBy::Layout` uses to sub-divide wide
/// chunk-boundary spans: layout splits are never wider than this many rows.
const MAX_SPLIT_ROWS: u64 = 100_000;

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_large_flat_chunk_scan_subdivides_splits() -> VortexResult<()> {
// A single flat (unchunked) 250k-row layout spans the 100k sub-split threshold, so the scan
// must decode it as multiple row-range splits.
let mut ctx = SESSION.create_execution_ctx();
const N_ROWS: u64 = 250_000;
let values =
Buffer::from_iter((0..N_ROWS as i32).map(|i| if i % 2 == 0 { i } else { -i })).into_array();

let mut buf = ByteBufferMut::empty();
SESSION
.write_options()
.with_strategy(Arc::new(FlatLayoutStrategy::default()))
.write(&mut buf, values.to_array_stream())
.await?;

let file = SESSION.open_options().open_buffer(buf)?;

// Sub-division caps each split at MAX_SPLIT_ROWS while tiling the file exactly.
let splits = file.splits()?;
assert!(splits.len() > 1, "expected sub-divided splits: {splits:?}");
assert!(splits.iter().all(|r| r.end - r.start <= MAX_SPLIT_ROWS));
assert_eq!(splits.first().map(|r| r.start), Some(0));
assert_eq!(splits.last().map(|r| r.end), Some(N_ROWS));
assert!(splits.windows(2).all(|w| w[0].end == w[1].start));

// A full scan across the sub-splits returns the original rows.
let result = file.scan()?.into_array_stream()?.read_all().await?;
assert_arrays_eq!(result, values, &mut ctx);

// A filtered scan crossing sub-split boundaries selects exactly the matching rows.
let result = file
.scan()?
.with_filter(gt(root(), lit(0i32)))
.into_array_stream()?
.read_all()
.await?;
let expected =
Buffer::from_iter((0..N_ROWS as i32).filter(|i| i % 2 == 0 && *i > 0)).into_array();
assert_arrays_eq!(result, expected, &mut ctx);

Ok(())
}

#[rstest]
#[case::unaligned(33_333)]
#[case::exceeds_file(300_000)]
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_flat_chunk_scan_with_row_count_splits(
#[case] rows_per_split: usize,
) -> VortexResult<()> {
// Fixed-size splits ignore chunk boundaries entirely, so scans must produce identical
// results whether the split size straddles the chunk arbitrarily or exceeds the file's
// row count (a single split).
let mut ctx = SESSION.create_execution_ctx();
const N_ROWS: u64 = 250_000;
let values =
Buffer::from_iter((0..N_ROWS as i32).map(|i| if i % 2 == 0 { i } else { -i })).into_array();

let mut buf = ByteBufferMut::empty();
SESSION
.write_options()
.with_strategy(Arc::new(FlatLayoutStrategy::default()))
.write(&mut buf, values.to_array_stream())
.await?;

let file = SESSION.open_options().open_buffer(buf)?;

let result = file
.scan()?
.with_split_by(SplitBy::RowCount(rows_per_split))
.into_array_stream()?
.read_all()
.await?;
assert_arrays_eq!(result, values, &mut ctx);

let result = file
.scan()?
.with_split_by(SplitBy::RowCount(rows_per_split))
.with_filter(gt(root(), lit(0i32)))
.into_array_stream()?
.read_all()
.await?;
let expected =
Buffer::from_iter((0..N_ROWS as i32).filter(|i| i % 2 == 0 && *i > 0)).into_array();
assert_arrays_eq!(result, expected, &mut ctx);

Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_string_chunks_stay_fine_grained_under_split_cap() -> VortexResult<()> {
// Default writing targets ~1MiB uncompressed blocks, so ~120-byte strings chunk at a few
// thousand rows (~8k with today's defaults). These natural boundaries sit far below the
// sub-split cap, and SplitBy::Layout must pass them through untouched.
let mut ctx = SESSION.create_execution_ctx();
const N_ROWS: usize = 40_000;
let strings = VarBinArray::from_iter(
(0..N_ROWS).map(|i| Some(format!("{i:0>120}"))),
DType::Utf8(Nullability::Nullable),
)
.into_array();
let st = StructArray::from_fields(&[("s", strings)])?.into_array();

let mut buf = ByteBufferMut::empty();
SESSION
.write_options()
.write(&mut buf, st.to_array_stream())
.await?;

let file = SESSION.open_options().open_buffer(buf)?;

let splits = file.splits()?;
assert!(
splits.len() > 1,
"expected multiple natural chunks: {splits:?}"
);
assert!(
splits.iter().all(|r| r.end - r.start < MAX_SPLIT_ROWS / 4),
"string chunks should stay fine-grained, nowhere near the split cap: {splits:?}"
);
assert_eq!(splits.first().map(|r| r.start), Some(0));
assert_eq!(splits.last().map(|r| r.end), Some(N_ROWS as u64));
assert!(splits.windows(2).all(|w| w[0].end == w[1].start));

let result = file.scan()?.into_array_stream()?.read_all().await?;
assert_arrays_eq!(result, st, &mut ctx);

Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_segment_ordering_dict_codes_before_values() -> VortexResult<()> {
Expand Down
145 changes: 143 additions & 2 deletions vortex-layout/src/scan/split_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,29 @@ use std::iter::once;
use std::ops::Range;

use vortex_array::dtype::FieldMask;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;

use crate::LayoutReader;
use crate::RowSplits;
use crate::SplitRange;
use crate::scan::IDEAL_SPLIT_SIZE;

/// Chunk-boundary spans wider than this are sub-divided into multiple row-range splits so that a
/// file with few, large chunks can be decoded across multiple cores rather than one.
///
/// Reuses [`IDEAL_SPLIT_SIZE`] as the target span per split.
const MAX_SPLIT_ROWS: u64 = IDEAL_SPLIT_SIZE;

/// Defines how the Vortex file is split into batches for reading.
///
/// Note that each split must fit into the platform's maximum usize.
#[derive(Default, Copy, Clone, Debug)]
pub enum SplitBy {
#[default]
/// Splits any time there is a chunk boundary in the file.
/// Splits any time there is a chunk boundary in the file. Spans between adjacent boundaries
/// wider than `MAX_SPLIT_ROWS` are further sub-divided so that a file with few, large chunks
/// can still be decoded across multiple cores.
Layout,
/// Splits every n rows.
RowCount(usize),
Expand All @@ -44,7 +54,7 @@ impl SplitBy {
&SplitRange::root(row_range.clone())?,
&mut row_splits,
)?;
row_splits.into_sorted_deduped()
subdivide_large_spans(row_splits.into_sorted_deduped(), MAX_SPLIT_ROWS)
}
SplitBy::RowCount(n) => row_range
.clone()
Expand All @@ -55,6 +65,60 @@ impl SplitBy {
}
}

/// Sub-divide any gap between adjacent split boundaries that is wider than `max_span` into evenly
/// sized row-range sub-splits.
///
/// `boundaries` is the sorted, deduplicated list of split points produced by the layout (chunk
/// boundaries). Downstream consumers turn this list into half-open ranges by pairing adjacent
/// entries (`tuple_windows().map(|(s, e)| s..e)`), so the row coverage is fully determined by the
/// boundary set. This function only *inserts* points that lie strictly between two existing
/// adjacent boundaries; it never moves or removes a boundary. Splitting `[lo, hi)` at an interior
/// point `m` (with `lo < m < hi`) yields exactly `[lo, m) + [m, hi)`, so the union of ranges is
/// unchanged: the rows are still partitioned contiguously, with no gaps and no overlaps, covering
/// every row exactly once. The output remains sorted and deduplicated.
fn subdivide_large_spans(boundaries: Vec<u64>, max_span: u64) -> Vec<u64> {
debug_assert!(boundaries.is_sorted(), "boundaries must be sorted");
debug_assert!(max_span > 0, "max_span must be non-zero");

// Fast path: nothing to split (also covers the empty / single-boundary cases).
if boundaries.len() < 2 || boundaries.windows(2).all(|w| w[1] - w[0] <= max_span) {
return boundaries;
}

let mut out = Vec::with_capacity(boundaries.len() * 2);
for window in boundaries.windows(2) {
let lo = window[0];
let hi = window[1];
// Always emit the lower boundary; the final `hi` is appended once after the loop.
out.push(lo);

let span = hi - lo;
if span > max_span {
// Number of sub-ranges so that each is <= max_span. `span > max_span` and
// `max_span >= 1` guarantee `sub_count >= 2`.
let sub_count = span.div_ceil(max_span);
// Even sub-range size (rounded up); the last sub-range absorbs any remainder and is
// bounded by `hi`. Inserted points `lo + k*sub_size` are strictly in `(lo, hi)`.
let sub_size = span.div_ceil(sub_count);
let mut point = lo + sub_size;
while point < hi {
out.push(point);
// Saturating: a sum past u64::MAX can never be < `hi`, so the loop exits.
point = point.saturating_add(sub_size);
}
}
}
// Append the final boundary (the `hi` of the last window).
out.push(*boundaries.last().vortex_expect("len >= 2 checked above"));

debug_assert!(out.is_sorted(), "subdivided boundaries must stay sorted");
debug_assert!(
out.windows(2).all(|w| w[0] < w[1]),
"subdivided boundaries must stay strictly increasing (deduped)"
);
out
}

#[cfg(test)]
mod test {
use std::any::Any;
Expand Down Expand Up @@ -212,4 +276,81 @@ mod test {
.unwrap();
assert_eq!(splits, vec![0u64, 5, 10]);
}

#[test]
fn subdivide_below_threshold_is_noop() {
// Gaps all <= max_span: boundaries returned unchanged.
assert_eq!(subdivide_large_spans(vec![0, 5, 10], 100), vec![0, 5, 10]);
assert_eq!(subdivide_large_spans(vec![0, 100], 100), vec![0, 100]);
assert_eq!(
subdivide_large_spans(Vec::<u64>::new(), 100),
Vec::<u64>::new()
);
assert_eq!(subdivide_large_spans(vec![7], 100), vec![7]);
}

#[test]
fn subdivide_near_u64_max_does_not_overflow() {
// The increment past the last interior point would overflow without saturating math.
let hi = u64::MAX;
let out = subdivide_large_spans(vec![hi - 3, hi], 2);
assert_eq!(out, vec![hi - 3, hi - 1, hi]);
}

#[test]
fn subdivide_splits_large_single_chunk() {
// One large chunk [0, 1000) with max_span 100 -> 10 contiguous sub-splits.
let out = subdivide_large_spans(vec![0, 1000], 100);
assert_eq!(
out,
vec![0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]
);
}

#[test]
fn subdivide_only_large_gaps() {
// Mixed: [0,50) stays whole, [50, 350) splits into 100-row pieces, [350, 360) stays whole.
let out = subdivide_large_spans(vec![0, 50, 350, 360], 100);
assert_eq!(out, vec![0, 50, 150, 250, 350, 360]);
}

/// Property: for any sorted, deduped boundary set, subdivision (a) keeps the first and last
/// boundary, (b) stays strictly increasing, and (c) preserves exact row coverage — the union
/// of the half-open ranges the consumer derives is identical before and after.
#[test]
fn subdivide_preserves_exact_coverage() {
let cases: Vec<Vec<u64>> = vec![
vec![0, 1000],
vec![0, 7, 250_001],
vec![0, 5, 10, 15, 20, 25, 30],
vec![3, 1_000_003],
vec![0, 99_999, 100_000, 300_000],
];
for boundaries in cases {
let out = subdivide_large_spans(boundaries.clone(), MAX_SPLIT_ROWS);
// (a) endpoints preserved
assert_eq!(out.first(), boundaries.first());
assert_eq!(out.last(), boundaries.last());
// (b) strictly increasing (sorted + deduped)
assert!(
out.windows(2).all(|w| w[0] < w[1]),
"not strictly increasing: {out:?}"
);
// (c) exact coverage: ranges from `out` tile the same span with no gap/overlap, and
// every original boundary is still present (so original ranges are sub-divided, never
// merged or shifted).
let total: u64 = out.windows(2).map(|w| w[1] - w[0]).sum();
let expected_total = boundaries.last().unwrap() - boundaries.first().unwrap();
assert_eq!(
total, expected_total,
"coverage span changed for {boundaries:?}"
);
for b in &boundaries {
assert!(
out.contains(b),
"original boundary {b} dropped from {out:?}"
);
}
}
}
}
Loading