diff --git a/Cargo.lock b/Cargo.lock index 2f0965f5a8e..8eb37888660 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9889,6 +9889,7 @@ dependencies = [ "glob", "itertools 0.14.0", "kanal", + "memmap2", "object_store 0.13.2", "oneshot", "parking_lot", diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index c1793b7b517..c73bf0d7893 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -170,10 +170,8 @@ impl VortexOpenOptions { /// Open a Vortex file from a filesystem path. #[cfg(not(target_arch = "wasm32"))] pub async fn open_path(self, path: impl AsRef) -> VortexResult { - use vortex_io::std_file::FileReadAt; - let handle = self.session.handle(); - let allocator = self.session.allocator(); - let source = Arc::new(FileReadAt::open_with_allocator(path, handle, allocator)?); + use vortex_io::std_file::MmapReadAt; + let source = Arc::new(MmapReadAt::open(path)?); self.open(source).await } diff --git a/vortex-io/Cargo.toml b/vortex-io/Cargo.toml index b3f6448484a..7b7d287d477 100644 --- a/vortex-io/Cargo.toml +++ b/vortex-io/Cargo.toml @@ -35,7 +35,7 @@ tokio = { workspace = true, features = [ ] } # this is the maximum subset of fetaures that is safe for wasm32 targets tracing = { workspace = true } vortex-array = { workspace = true } -vortex-buffer = { workspace = true } +vortex-buffer = { workspace = true, features = ["memmap2"] } vortex-error = { workspace = true } vortex-metrics = { workspace = true } vortex-session = { workspace = true } @@ -47,6 +47,7 @@ custom-labels = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] # Smol is our default impl, so we don't want it to be optional, but it cannot be part of wasm smol = { workspace = true } +memmap2 = { workspace = true } # target_os = "unknown" matches wasm32-unknown-unknown (browser), excluding WASI targets # where wasm-bindgen's JS interop is not available. diff --git a/vortex-io/src/std_file/mmap.rs b/vortex-io/src/std_file/mmap.rs new file mode 100644 index 00000000000..6d5e65e28d7 --- /dev/null +++ b/vortex-io/src/std_file/mmap.rs @@ -0,0 +1,88 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fs::File; +use std::path::Path; +use std::sync::Arc; + +use futures::FutureExt; +use futures::future::BoxFuture; +use memmap2::Mmap; +use vortex_array::buffer::BufferHandle; +use vortex_buffer::Alignment; +use vortex_buffer::ByteBuffer; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; + +use super::read_at::DEFAULT_CONCURRENCY; +use crate::CoalesceConfig; +use crate::VortexReadAt; + +pub struct MmapReadAt { + uri: Arc, + buffer: ByteBuffer, +} + +impl MmapReadAt { + /// Memory-map a file for reading. + pub fn open(path: impl AsRef) -> VortexResult { + let path = path.as_ref(); + let uri = path.to_string_lossy().to_string().into(); + let file = File::open(path)?; + // SAFETY: the file is opened read-only and is assumed not to be modified or truncated for + // the lifetime of this mapping (the standard contract for read-only mmap of data files). + let mmap = unsafe { Mmap::map(&file)? }; + #[cfg(unix)] + mmap.advise(memmap2::Advice::Random)?; + Ok(Self { + uri, + buffer: ByteBuffer::from(mmap), + }) + } +} + +impl VortexReadAt for MmapReadAt { + fn uri(&self) -> Option<&Arc> { + Some(&self.uri) + } + + fn coalesce_config(&self) -> Option { + None + } + + fn concurrency(&self) -> usize { + DEFAULT_CONCURRENCY + } + + fn size(&self) -> BoxFuture<'static, VortexResult> { + let len = self.buffer.len() as u64; + async move { Ok(len) }.boxed() + } + + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + let buffer = self.buffer.clone(); + async move { + let start = usize::try_from(offset).vortex_expect("offset too big for usize"); + let end = + usize::try_from(offset + length as u64).vortex_expect("end too big for usize"); + if end > buffer.len() { + vortex_bail!( + "Requested range {}..{} out of bounds for file of length {}", + start, + end, + buffer.len() + ); + } + Ok(BufferHandle::new_host( + buffer.slice_unaligned(start..end).aligned(alignment), + )) + } + .boxed() + } +} diff --git a/vortex-io/src/std_file/mod.rs b/vortex-io/src/std_file/mod.rs index c2e4b12bf40..33d9520253c 100644 --- a/vortex-io/src/std_file/mod.rs +++ b/vortex-io/src/std_file/mod.rs @@ -2,5 +2,9 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors mod read_at; +#[cfg(not(target_arch = "wasm32"))] +mod mmap; pub use read_at::*; +#[cfg(not(target_arch = "wasm32"))] +pub use mmap::*;