From 0bf2dea3ccac35e3cfed341a8797d6211b9afa58 Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Wed, 7 Jan 2026 23:24:39 +0800 Subject: [PATCH 1/6] Reduce default INITIAL_READ_SIZE to MAX_POSTSCRIPT_SIZE+EOF and add regression test Signed-off-by: godnight10061 --- vortex-file/src/open.rs | 93 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 92 insertions(+), 1 deletion(-) diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 1ee6d08feb2..27bb2ab57f2 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -36,7 +36,7 @@ use crate::footer::Footer; use crate::segments::FileSegmentSource; use crate::segments::InitialReadSegmentCache; -const INITIAL_READ_SIZE: usize = 1 << 20; // 1 MB +const INITIAL_READ_SIZE: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE; /// Open options for a Vortex file reader. pub struct VortexOpenOptions { @@ -281,3 +281,94 @@ impl VortexOpenOptions { .await } } + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + + use futures::future::BoxFuture; + use vortex_array::IntoArray; + use vortex_array::expr::session::ExprSession; + use vortex_array::session::ArraySession; + use vortex_buffer::{Buffer, ByteBufferMut}; + use vortex_io::session::RuntimeSession; + use vortex_layout::session::LayoutSession; + + use super::*; + use crate::WriteOptionsSessionExt; + + // Define CountingReadAt struct + struct CountingReadAt { + inner: R, + bytes_read: Arc, + } + + impl VortexReadAt for CountingReadAt { + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + self.bytes_read.fetch_add(length, Ordering::Relaxed); + self.inner.read_at(offset, length, alignment) + } + + fn size(&self) -> BoxFuture<'static, VortexResult> { + self.inner.size() + } + } + + #[tokio::test] + async fn test_initial_read_size() { + // Create a large file (> 1MB) + let mut buf = ByteBufferMut::empty(); + let mut session = VortexSession::empty() + .with::() + .with::() + .with::() + .with::() + .with::(); + + crate::register_default_encodings(&mut session); + + // 1.5M integers -> ~6MB. We use a pattern to avoid Sequence encoding. + let array = Buffer::from( + (0..1_500_000) + .map(|i| if i % 2 == 0 { i as i32 } else { -(i as i32) }) + .collect::>(), + ) + .into_array(); + + session + .write_options() + .write(&mut buf, array.to_array_stream()) + .await + .unwrap(); + + let buffer = ByteBuffer::from(buf); + assert!( + buffer.len() > 1024 * 1024, + "Buffer length is only {} bytes", + buffer.len() + ); + + let bytes_read = Arc::new(AtomicUsize::new(0)); + let reader = CountingReadAt { + inner: buffer, + bytes_read: bytes_read.clone(), + }; + + // Open the file + let _file = session.open_options().open_read_at(reader).await.unwrap(); + + // Assert that we read approximately the postscript size, not 1MB + let read = bytes_read.load(Ordering::Relaxed); + assert!(read < 1024 * 1024, "Read {} bytes, expected < 1MB", read); + assert_eq!( + read, + MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE, + "Read exactly the postscript size" + ); + } +} From 0376b5c327ca8d3e0735fef735c165618dd6ade3 Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Wed, 7 Jan 2026 23:51:58 +0800 Subject: [PATCH 2/6] Make ScanBuilder::into_stream lazy Signed-off-by: godnight10061 --- vortex-scan/src/scan_builder.rs | 161 +++++++++++++++++++++++++++++++- 1 file changed, 160 insertions(+), 1 deletion(-) diff --git a/vortex-scan/src/scan_builder.rs b/vortex-scan/src/scan_builder.rs index 8489d4230f5..c248c4d5b14 100644 --- a/vortex-scan/src/scan_builder.rs +++ b/vortex-scan/src/scan_builder.rs @@ -2,10 +2,15 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::ops::Range; +use std::pin::Pin; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use futures::Stream; +use futures::StreamExt; use futures::future::BoxFuture; +use futures::stream::BoxStream; use itertools::Itertools; use vortex_array::ArrayRef; use vortex_array::expr::Expression; @@ -287,7 +292,7 @@ impl ScanBuilder { pub fn into_stream( self, ) -> VortexResult> + Send + 'static + use> { - self.prepare()?.execute_stream(None) + Ok(LazyScanStream::new(self)) } /// Returns an [`Iterator`] using the session's runtime. @@ -300,6 +305,49 @@ impl ScanBuilder { } } +enum LazyScanState { + Builder(Option>), + Stream(BoxStream<'static, VortexResult>), + Error(Option), +} + +struct LazyScanStream { + state: LazyScanState, +} + +impl LazyScanStream { + fn new(builder: ScanBuilder) -> Self { + Self { + state: LazyScanState::Builder(Some(builder)), + } + } +} + +impl Unpin for LazyScanStream {} + +impl Stream for LazyScanStream { + type Item = VortexResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match &mut self.state { + LazyScanState::Builder(builder) => { + let builder = builder.take().expect("polled after completion"); + match builder + .prepare() + .and_then(|scan| scan.execute_stream(None).map(|s| s.boxed())) + { + Ok(stream) => self.state = LazyScanState::Stream(stream), + Err(err) => self.state = LazyScanState::Error(Some(err)), + } + } + LazyScanState::Stream(stream) => return stream.as_mut().poll_next(cx), + LazyScanState::Error(err) => return Poll::Ready(err.take().map(Err)), + } + } + } +} + /// Compute masks of field paths referenced by the projection and filter in the scan. /// /// Projection and filter must be pre-simplified. @@ -338,3 +386,114 @@ pub(crate) fn filter_and_projection_masks( fn to_field_mask(field: FieldName) -> FieldMask { FieldMask::Prefix(FieldPath::from(Field::Name(field))) } + +#[cfg(test)] +mod test { + use std::collections::BTreeSet; + use std::ops::Range; + use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + use vortex_array::MaskFuture; + use vortex_array::expr::Expression; + use vortex_dtype::DType; + use vortex_dtype::FieldMask; + use vortex_dtype::Nullability; + use vortex_dtype::PType; + use vortex_error::VortexResult; + use vortex_io::runtime::BlockingRuntime; + use vortex_io::runtime::single::SingleThreadRuntime; + use vortex_io::session::RuntimeSessionExt; + use vortex_layout::ArrayFuture; + use vortex_layout::LayoutReader; + use vortex_mask::Mask; + + use super::ScanBuilder; + + #[derive(Debug)] + struct CountingLayoutReader { + name: Arc, + dtype: DType, + row_count: u64, + register_splits_calls: Arc, + } + + impl CountingLayoutReader { + fn new(register_splits_calls: Arc) -> Self { + Self { + name: Arc::from("counting"), + dtype: DType::Primitive(PType::I32, Nullability::NonNullable), + row_count: 1, + register_splits_calls, + } + } + } + + impl LayoutReader for CountingLayoutReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + self.register_splits_calls.fetch_add(1, Ordering::Relaxed); + splits.insert(row_range.end); + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: Mask, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + + fn filter_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + + fn projection_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + Ok(Box::pin(async move { + unreachable!("scan should not be polled in this test") + })) + } + } + + #[test] + fn into_stream_is_lazy() { + let calls = Arc::new(AtomicUsize::new(0)); + let reader = Arc::new(CountingLayoutReader::new(calls.clone())); + + let runtime = SingleThreadRuntime::default(); + let session = crate::test::SESSION.clone().with_handle(runtime.handle()); + + let _stream = ScanBuilder::new(session, reader).into_stream().unwrap(); + + assert_eq!(calls.load(Ordering::Relaxed), 0); + } +} From db9958d3250f55817207a3a0690a4316412727d5 Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Thu, 8 Jan 2026 00:33:06 +0800 Subject: [PATCH 3/6] Loosen footer read regression test Signed-off-by: godnight10061 --- vortex-file/src/open.rs | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 27bb2ab57f2..e1ceaee23ac 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -300,7 +300,8 @@ mod tests { // Define CountingReadAt struct struct CountingReadAt { inner: R, - bytes_read: Arc, + total_read: Arc, + first_read_len: Arc, } impl VortexReadAt for CountingReadAt { @@ -310,7 +311,13 @@ mod tests { length: usize, alignment: Alignment, ) -> BoxFuture<'static, VortexResult> { - self.bytes_read.fetch_add(length, Ordering::Relaxed); + self.total_read.fetch_add(length, Ordering::Relaxed); + let _ = self.first_read_len.compare_exchange( + 0, + length, + Ordering::Relaxed, + Ordering::Relaxed, + ); self.inner.read_at(offset, length, alignment) } @@ -353,22 +360,25 @@ mod tests { buffer.len() ); - let bytes_read = Arc::new(AtomicUsize::new(0)); + let total_read = Arc::new(AtomicUsize::new(0)); + let first_read_len = Arc::new(AtomicUsize::new(0)); let reader = CountingReadAt { inner: buffer, - bytes_read: bytes_read.clone(), + total_read: total_read.clone(), + first_read_len: first_read_len.clone(), }; // Open the file let _file = session.open_options().open_read_at(reader).await.unwrap(); // Assert that we read approximately the postscript size, not 1MB - let read = bytes_read.load(Ordering::Relaxed); - assert!(read < 1024 * 1024, "Read {} bytes, expected < 1MB", read); + let first = first_read_len.load(Ordering::Relaxed); assert_eq!( - read, + first, MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE, "Read exactly the postscript size" ); + let read = total_read.load(Ordering::Relaxed); + assert!(read < 1024 * 1024, "Read {} bytes, expected < 1MB", read); } } From 1b09efc06a17757cc383fcafcce294e86cc4056c Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Thu, 8 Jan 2026 02:26:14 +0800 Subject: [PATCH 4/6] Fix lint failures in lazy scan and footer read test Signed-off-by: godnight10061 --- vortex-file/src/open.rs | 10 ++++++---- vortex-io/src/file/std_file.rs | 21 ++++++++++++++++++--- vortex-scan/src/scan_builder.rs | 7 ++++--- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index e1ceaee23ac..eb7261d92db 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -284,13 +284,15 @@ impl VortexOpenOptions { #[cfg(test)] mod tests { - use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; use futures::future::BoxFuture; use vortex_array::IntoArray; use vortex_array::expr::session::ExprSession; use vortex_array::session::ArraySession; - use vortex_buffer::{Buffer, ByteBufferMut}; + use vortex_buffer::Buffer; + use vortex_buffer::ByteBufferMut; use vortex_io::session::RuntimeSession; use vortex_layout::session::LayoutSession; @@ -341,8 +343,8 @@ mod tests { // 1.5M integers -> ~6MB. We use a pattern to avoid Sequence encoding. let array = Buffer::from( - (0..1_500_000) - .map(|i| if i % 2 == 0 { i as i32 } else { -(i as i32) }) + (0i32..1_500_000) + .map(|i| if i % 2 == 0 { i } else { -i }) .collect::>(), ) .into_array(); diff --git a/vortex-io/src/file/std_file.rs b/vortex-io/src/file/std_file.rs index a6b43834254..f1527f0ea71 100644 --- a/vortex-io/src/file/std_file.rs +++ b/vortex-io/src/file/std_file.rs @@ -2,9 +2,9 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::fs::File; -#[cfg(not(unix))] +#[cfg(all(not(unix), not(windows)))] use std::io::Read; -#[cfg(not(unix))] +#[cfg(all(not(unix), not(windows)))] use std::io::Seek; #[cfg(unix)] use std::os::unix::fs::FileExt; @@ -37,7 +37,22 @@ pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> std: { file.read_exact_at(buffer, offset) } - #[cfg(not(unix))] + #[cfg(windows)] + { + let mut bytes_read = 0; + while bytes_read < buffer.len() { + let read = file.seek_read(&mut buffer[bytes_read..], offset + bytes_read as u64)?; + if read == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )); + } + bytes_read += read; + } + Ok(()) + } + #[cfg(all(not(unix), not(windows)))] { use std::io::SeekFrom; let mut file_ref = file; diff --git a/vortex-scan/src/scan_builder.rs b/vortex-scan/src/scan_builder.rs index c248c4d5b14..fd91c0608a7 100644 --- a/vortex-scan/src/scan_builder.rs +++ b/vortex-scan/src/scan_builder.rs @@ -27,6 +27,7 @@ use vortex_dtype::Field; use vortex_dtype::FieldMask; use vortex_dtype::FieldName; use vortex_dtype::FieldPath; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_io::runtime::BlockingRuntime; @@ -306,7 +307,7 @@ impl ScanBuilder { } enum LazyScanState { - Builder(Option>), + Builder(Option>>), Stream(BoxStream<'static, VortexResult>), Error(Option), } @@ -318,7 +319,7 @@ struct LazyScanStream { impl LazyScanStream { fn new(builder: ScanBuilder) -> Self { Self { - state: LazyScanState::Builder(Some(builder)), + state: LazyScanState::Builder(Some(Box::new(builder))), } } } @@ -332,7 +333,7 @@ impl Stream for LazyScanStream { loop { match &mut self.state { LazyScanState::Builder(builder) => { - let builder = builder.take().expect("polled after completion"); + let builder = builder.take().vortex_expect("polled after completion"); match builder .prepare() .and_then(|scan| scan.execute_stream(None).map(|s| s.boxed())) From b662b2086ca53e0d484e788914fa37cd3490fa7b Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Fri, 9 Jan 2026 00:00:45 +0800 Subject: [PATCH 5/6] datafusion: make footer initial read size configurable Signed-off-by: godnight10061 --- vortex-datafusion/src/persistent/cache.rs | 14 +++++++++++++- vortex-datafusion/src/persistent/format.rs | 21 ++++++++++++++++++++- vortex-datafusion/src/persistent/opener.rs | 10 +++++----- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index 8f8220e470f..a64e729b837 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -33,6 +33,7 @@ pub(crate) struct VortexFileCache { file_cache: Cache, segment_cache: Cache, session: VortexSession, + footer_initial_read_size_bytes: usize, } /// Cache key for a [`VortexFile`]. @@ -59,7 +60,12 @@ struct SegmentKey { } impl VortexFileCache { - pub fn new(size_mb: usize, segment_size_mb: usize, session: VortexSession) -> Self { + pub fn new( + size_mb: usize, + segment_size_mb: usize, + footer_initial_read_size_bytes: usize, + session: VortexSession, + ) -> Self { let file_cache = Cache::builder() .max_capacity(size_mb as u64 * (1 << 20)) .eviction_listener(|k: Arc, _v: VortexFile, cause| { @@ -82,9 +88,14 @@ impl VortexFileCache { file_cache, segment_cache, session, + footer_initial_read_size_bytes, } } + pub(crate) fn footer_initial_read_size_bytes(&self) -> usize { + self.footer_initial_read_size_bytes + } + pub async fn try_get( &self, object: &ObjectMeta, @@ -101,6 +112,7 @@ impl VortexFileCache { .metrics() .child_with_tags([("filename", object.location.to_string())]), ) + .with_initial_read_size(self.footer_initial_read_size_bytes) .with_file_size(object.size) .with_segment_cache(Arc::new(VortexFileSegmentCache { file_key, diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index f8a64c0320c..f1b11e0a45b 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -51,6 +51,8 @@ use vortex::error::VortexResult; use vortex::error::vortex_err; use vortex::expr::stats; use vortex::expr::stats::Stat; +use vortex::file::EOF_SIZE; +use vortex::file::MAX_POSTSCRIPT_SIZE; use vortex::file::VORTEX_FILE_EXTENSION; use vortex::scalar::Scalar; use vortex::session::VortexSession; @@ -61,6 +63,8 @@ use super::source::VortexSource; use crate::PrecisionExt as _; use crate::convert::TryToDataFusion; +const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE; + /// Vortex implementation of a DataFusion [`FileFormat`]. pub struct VortexFormat { session: VortexSession, @@ -87,6 +91,11 @@ config_namespace! { pub footer_cache_size_mb: usize, default = 64 /// The size of the in-memory segment cache. pub segment_cache_size_mb: usize, default = 0 + /// The number of bytes to read when parsing a file footer. + /// + /// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum + /// during footer parsing. + pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES } } @@ -186,6 +195,7 @@ impl VortexFormat { file_cache: VortexFileCache::new( opts.footer_cache_size_mb, opts.segment_cache_size_mb, + opts.footer_initial_read_size_bytes, session, ), opts, @@ -468,7 +478,7 @@ mod tests { (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \ STORED AS vortex \ LOCATION '{}' \ - OPTIONS( segment_cache_size_mb '5' );", + OPTIONS( segment_cache_size_mb '5', footer_initial_read_size_bytes '12345' );", dir.path().to_str().unwrap() )) .await @@ -477,4 +487,13 @@ mod tests { .await .unwrap(); } + + #[test] + fn format_plumbs_footer_initial_read_size() { + let mut opts = VortexOptions::default(); + opts.set("footer_initial_read_size_bytes", "12345").unwrap(); + + let format = VortexFormat::new_with_options(VortexSession::default(), opts); + assert_eq!(format.file_cache.footer_initial_read_size_bytes(), 12345); + } } diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index b5474f7bc1e..71235d943ea 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -489,7 +489,7 @@ mod tests { // no adapter expr_adapter_factory, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()), table_schema, batch_size: 100, limit: None, @@ -635,7 +635,7 @@ mod tests { file_pruning_predicate: None, expr_adapter_factory: expr_adapter_factory.clone(), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()), table_schema: table_schema.clone(), batch_size: 100, limit: None, @@ -719,7 +719,7 @@ mod tests { file_pruning_predicate: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()), table_schema: TableSchema::from_file_schema(table_schema.clone()), batch_size: 100, limit: None, @@ -870,7 +870,7 @@ mod tests { file_pruning_predicate: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()), table_schema: table_schema.clone(), batch_size: 100, limit: None, @@ -927,7 +927,7 @@ mod tests { file_pruning_predicate: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + file_cache: VortexFileCache::new(1, 1, 0, SESSION.clone()), table_schema: TableSchema::from_file_schema(schema), batch_size: 100, limit: None, From 4856cf10b51ae173a2b0af3d99037073e262945e Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Fri, 9 Jan 2026 00:03:52 +0800 Subject: [PATCH 6/6] datafusion: fix clippy dead_code in tests Signed-off-by: godnight10061 --- vortex-datafusion/src/persistent/cache.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index a64e729b837..c910f728da7 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -92,6 +92,7 @@ impl VortexFileCache { } } + #[cfg(test)] pub(crate) fn footer_initial_read_size_bytes(&self) -> usize { self.footer_initial_read_size_bytes }