Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions vortex-array/src/expr/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Factory functions for creating [`Expression`]s from scalar function vtables.

use std::sync::Arc;
use std::sync::LazyLock;

use vortex_error::VortexExpect;
use vortex_error::vortex_panic;
Expand Down Expand Up @@ -52,20 +53,24 @@ use crate::scalar_fn::fns::variant_get::VariantGetOptions;
use crate::scalar_fn::fns::variant_get::VariantPath;
use crate::scalar_fn::fns::zip::Zip;

// ---- Root ----
static ROOT: LazyLock<Expression> = LazyLock::new(|| {
Root.try_new_expr(EmptyOptions, vec![])
.vortex_expect("Creating root() shouldn't fail")
});

/// Creates an expression that references the root scope.
///
/// Returns the entire input array as passed to the expression evaluator.
/// This is commonly used as the starting point for field access and other operations.
pub fn root() -> Expression {
Root.try_new_expr(EmptyOptions, vec![])
.vortex_expect("Failed to create Root expression")
ROOT.clone()
}

/// Return whether the expression is a root expression.
pub fn is_root(expr: &Expression) -> bool {
expr.is::<Root>()
// root doesn't have any children, and scalar_fns have distinct ids
// so we should almost always hit this eq check
(expr.scalar_fn().id() == ROOT.scalar_fn().id()) || expr.is::<Root>()
}

// ---- Literal ----
Expand Down
6 changes: 5 additions & 1 deletion vortex-bench/src/random_access/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ impl VortexRandomAccessor {
name: impl Into<String>,
format: Format,
) -> anyhow::Result<Self> {
let file = SESSION.open_options().open_path(path.as_ref()).await?;
let file = SESSION
.open_options()
.with_layout_reader_cache()
.open_path(path.as_ref())
.await?;
Ok(Self {
name: name.into(),
format,
Expand Down
79 changes: 58 additions & 21 deletions vortex-file/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use std::ops::Range;
use std::sync::Arc;
use std::sync::OnceLock;

use itertools::Itertools;
use vortex_array::ArrayRef;
Expand Down Expand Up @@ -51,6 +52,29 @@ pub struct VortexFile {
segment_source: Arc<dyn SegmentSource>,
/// The Vortex session used to open this file.
session: VortexSession,
/// None id LayoutReader caching is turned off
layout_reader_cache: Option<OnceLock<Arc<dyn LayoutReader>>>,
}

fn layout_reader(
segment_source: Arc<dyn SegmentSource>,
footer: &Footer,
session: &VortexSession,
) -> VortexResult<Arc<dyn LayoutReader>> {
let root_reader = footer
.layout()
// TODO(ngates): we may want to allow the user pass in a name here?
.new_reader("".into(), segment_source, session, &Default::default())?;

Ok(if let Some(stats) = footer.statistics().cloned() {
Arc::new(FileStatsLayoutReader::new(
root_reader,
stats,
session.clone(),
))
} else {
root_reader
})
}

impl VortexFile {
Expand All @@ -64,6 +88,17 @@ impl VortexFile {
footer,
segment_source,
session,
layout_reader_cache: None,
}
}

/// Enable layout reader caching
pub fn with_caching(self) -> Self {
Self {
footer: self.footer,
segment_source: self.segment_source,
session: self.session,
layout_reader_cache: Some(OnceLock::new()),
}
}

Expand Down Expand Up @@ -106,28 +141,30 @@ impl VortexFile {
///
/// Wraps the root layout in a [`FileStatsLayoutReader`] if file stats are available.
pub fn layout_reader(&self) -> VortexResult<Arc<dyn LayoutReader>> {
let segment_source = self.segment_source();

let root_reader = self
.footer
.layout()
// TODO(ngates): we may want to allow the user pass in a name here?
.new_reader(
"".into(),
segment_source,
match &self.layout_reader_cache {
None => layout_reader(
Arc::clone(&self.segment_source),
&self.footer,
&self.session,
&Default::default(),
)?;

Ok(if let Some(stats) = self.file_stats().cloned() {
Arc::new(FileStatsLayoutReader::new(
root_reader,
stats,
self.session.clone(),
))
} else {
root_reader
})
),
Some(reader) => {
// get_or_try_init is unstable
if let Some(val) = reader.get() {
Ok(Arc::clone(val))
} else {
let inner = layout_reader(
Arc::clone(&self.segment_source),
&self.footer,
&self.session,
)?;
Ok(if let Err(val) = reader.set(Arc::clone(&inner)) {
val
} else {
inner
})
}
}
}
}

/// Create a [`DataSource`](vortex_scan::DataSource) from this file for scanning.
Expand Down
29 changes: 22 additions & 7 deletions vortex-file/src/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct VortexOpenOptions {
metrics_registry: Option<Arc<dyn MetricsRegistry>>,
/// Default labels applied to all the file's metrics
labels: Vec<Label>,
/// Whether to cache file's LayoutReader between scans
cache_layout_reader: bool,
}

pub trait OpenOptionsSessionExt:
Expand All @@ -77,6 +79,7 @@ pub trait OpenOptionsSessionExt:
initial_read_segments: Default::default(),
metrics_registry: None,
labels: Vec::default(),
cache_layout_reader: false,
}
}
}
Expand All @@ -92,6 +95,12 @@ impl VortexOpenOptions {
self
}

/// Cache file's LayoutReader between scans
pub fn with_layout_reader_cache(mut self) -> Self {
self.cache_layout_reader = true;
self
}

/// Configure a custom [`SegmentCache`].
pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
self.segment_cache = Some(segment_cache);
Expand Down Expand Up @@ -182,6 +191,7 @@ impl VortexOpenOptions {
tracing::warn!("metrics registry is ignored for in-memory `open_buffer`");
}

let cache_layout_reader = self.cache_layout_reader;
let mut opts = self.with_initial_read_size(0);

let footer = match opts.footer.take() {
Expand All @@ -193,8 +203,12 @@ impl VortexOpenOptions {
buffer,
Arc::clone(footer.segment_map()),
));

Ok(VortexFile::new(footer, segment_source, opts.session))
let file = VortexFile::new(footer, segment_source, opts.session);
Ok(if cache_layout_reader {
file.with_caching()
} else {
file
})
}

/// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation.
Expand Down Expand Up @@ -240,11 +254,12 @@ impl VortexOpenOptions {
segment_source,
));

Ok(VortexFile::new(
footer,
segment_source,
self.session.clone(),
))
let file = VortexFile::new(footer, segment_source, self.session.clone());
Ok(if self.cache_layout_reader {
file.with_caching()
} else {
file
})
}

async fn read_footer(&self, read: &dyn VortexReadAt) -> VortexResult<Footer> {
Expand Down
Loading