From 1d7b1bdac9254f27ca9308e4caebbb2aec836197 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 11 Jun 2026 13:52:22 +0100 Subject: [PATCH] Minor touchups for vortex-datafusion Signed-off-by: Adam Gutglick --- .../src/persistent/access_plan.rs | 12 ++- vortex-datafusion/src/persistent/format.rs | 2 +- vortex-datafusion/src/persistent/opener.rs | 88 +++++++++++++++---- vortex-datafusion/src/persistent/reader.rs | 2 +- vortex-datafusion/src/persistent/sink.rs | 16 ++-- 5 files changed, 89 insertions(+), 31 deletions(-) diff --git a/vortex-datafusion/src/persistent/access_plan.rs b/vortex-datafusion/src/persistent/access_plan.rs index 99583b7c0e4..ad7bf941a2c 100644 --- a/vortex-datafusion/src/persistent/access_plan.rs +++ b/vortex-datafusion/src/persistent/access_plan.rs @@ -43,18 +43,16 @@ pub struct VortexAccessPlan { } impl VortexAccessPlan { + /// Returns the selection, if one was set. + pub fn selection(&self) -> Option<&Selection> { + self.selection.as_ref() + } + /// Sets the row [`Selection`] to apply when the file is opened. pub fn with_selection(mut self, selection: Selection) -> Self { self.selection = Some(selection); self } -} - -impl VortexAccessPlan { - /// Returns the selection, if one was set. - pub fn selection(&self) -> Option<&Selection> { - self.selection.as_ref() - } /// Applies this access plan to a [`ScanBuilder`]. /// diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 964d6ad19e1..0381e550893 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -285,7 +285,7 @@ impl FileFormatFactory for VortexFormatFactory { if let Some(key) = key.strip_prefix("format.") { opts.set(key, value)?; } else { - tracing::trace!("Ignoring options '{key}'"); + tracing::trace!("Ignoring option '{key}'"); } } diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index af3df1f72b8..d50b003f1dc 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -17,6 +17,7 @@ use datafusion_datasource::PartitionedFile; use datafusion_datasource::TableSchema; use datafusion_datasource::file_stream::FileOpenFuture; use datafusion_datasource::file_stream::FileOpener; +use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry; use datafusion_execution::cache::cache_manager::FileMetadataCache; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr::projection::ProjectionExprs; @@ -129,7 +130,7 @@ impl FileOpener for VortexOpener { let unified_file_schema = Arc::clone(self.table_schema.file_schema()); let batch_size = self.batch_size; let limit = self.limit; - let layout_reader = Arc::clone(&self.layout_readers); + let layout_readers = Arc::clone(&self.layout_readers); let natural_split_ranges = Arc::clone(&self.natural_split_ranges); let has_output_ordering = self.has_output_ordering; let scan_concurrency = self.scan_concurrency; @@ -160,9 +161,7 @@ impl FileOpener for VortexOpener { Ok(async move { // Create FilePruner when we have a predicate and either dynamic expressions // or file statistics available. The pruner can eliminate files without - // opening them based on: - // - Partition column values (e.g., date=2024-01-01) - // - File-level statistics (min/max values per column) + // opening them based on File-level statistics (min/max values per column) let mut file_pruner = file_pruning_predicate .filter(|p| { // Only create pruner if we have dynamic expressions or file statistics @@ -192,15 +191,21 @@ impl FileOpener for VortexOpener { .with_metrics_registry(Arc::clone(&metrics_registry)) .with_labels(labels); - if let Some(file_metadata_cache) = file_metadata_cache - && let Some(entry) = file_metadata_cache.get(file.path()) - && entry.is_valid_for(&file.object_meta) - && let Some(vortex_metadata) = entry - .file_metadata - .as_any() - .downcast_ref::() - { - open_opts = open_opts.with_footer(vortex_metadata.footer().clone()); + let cached_footer = file_metadata_cache + .as_ref() + .and_then(|cache| cache.get(file.path())) + .filter(|entry| entry.is_valid_for(&file.object_meta)) + .and_then(|entry| { + entry + .file_metadata + .as_any() + .downcast_ref::() + .map(|vortex_metadata| vortex_metadata.footer().clone()) + }); + let footer_cache_hit = cached_footer.is_some(); + + if let Some(footer) = cached_footer { + open_opts = open_opts.with_footer(footer); } let vxf = open_opts @@ -208,6 +213,19 @@ impl FileOpener for VortexOpener { .await .map_err(|e| exec_datafusion_err!("Failed to open Vortex file {e}"))?; + // On a miss, cache the parsed footer so other partitions and later executions + // skip the footer fetch and parse. `infer_schema`/`infer_stats` also populate + // this cache, but only when planning goes through `VortexFormat`. + if !footer_cache_hit && let Some(cache) = &file_metadata_cache { + cache.put( + file.path(), + CachedFileMetadataEntry::new( + file.object_meta.clone(), + Arc::new(CachedVortexMetadata::new(&vxf)), + ), + ); + } + // Check if there are rows in this file. If not, we can save // ourselves some work and return an empty stream. if vxf.row_count() == 0 { @@ -285,7 +303,7 @@ impl FileOpener for VortexOpener { let projector = leftover_projection.make_projector(&stream_schema)?; // We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once. - let layout_reader = match layout_reader.entry(file.object_meta.location.clone()) { + let layout_reader = match layout_readers.entry(file.object_meta.location.clone()) { Entry::Occupied(mut occupied_entry) => { if let Some(reader) = occupied_entry.get().upgrade() { tracing::trace!("reusing layout reader for {}", occupied_entry.key()); @@ -352,7 +370,6 @@ impl FileOpener for VortexOpener { // This will only fail if the user has not configured a suitable // PhysicalExprAdapterFactory on the file source to handle rewriting the // expression to handle missing/reordered columns in the Vortex file. - let (pushed, unpushed): (Vec, Vec) = split_conjunction(&f) .into_iter() @@ -568,6 +585,7 @@ mod tests { use datafusion::physical_expr::planner::logical2physical; use datafusion::physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion::scalar::ScalarValue; + use datafusion_execution::cache::DefaultFilesMetadataCache; use datafusion_expr::Operator; use datafusion_physical_expr::expressions as df_expr; use datafusion_physical_expr::projection::ProjectionExpr; @@ -775,6 +793,46 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_open_populates_file_metadata_cache() -> anyhow::Result<()> { + let object_store = Arc::new(InMemory::new()) as Arc; + let file_path = "cached/file.vortex"; + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let data_size = + write_arrow_to_vortex(Arc::clone(&object_store), file_path, batch.clone()).await?; + + let file = PartitionedFile::new(file_path.to_string(), data_size); + let table_schema = TableSchema::from_file_schema(batch.schema()); + + let cache: Arc = + Arc::new(DefaultFilesMetadataCache::new(64 * 1024 * 1024)); + let mut opener = make_opener(Arc::clone(&object_store), table_schema, None); + opener.file_metadata_cache = Some(Arc::clone(&cache)); + + // The first open misses the cache and must write the parsed footer back. + let stream = opener.open(file.clone())?.await?; + stream.try_collect::>().await?; + + let entry = cache + .get(file.path()) + .ok_or_else(|| anyhow::anyhow!("footer was not cached after open"))?; + assert!(entry.is_valid_for(&file.object_meta)); + assert!( + entry + .file_metadata + .as_any() + .downcast_ref::() + .is_some() + ); + + // The second open hits the cache and still returns the same data. + let stream = opener.open(file.clone())?.await?; + let data = stream.try_collect::>().await?; + assert_eq!(data.iter().map(|rb| rb.num_rows()).sum::(), 3); + + Ok(()) + } + #[rstest] #[tokio::test] async fn test_open_files_different_table_schema() -> anyhow::Result<()> { diff --git a/vortex-datafusion/src/persistent/reader.rs b/vortex-datafusion/src/persistent/reader.rs index de221c28ca7..9f98761ac3f 100644 --- a/vortex-datafusion/src/persistent/reader.rs +++ b/vortex-datafusion/src/persistent/reader.rs @@ -72,7 +72,7 @@ impl VortexReaderFactory for DefaultVortexReaderFactory { ) -> DFResult> { Ok(Arc::new(ObjectStoreReadAt::new_with_allocator( Arc::clone(&self.object_store), - file.path().as_ref().into(), + file.path().clone(), session.handle(), session.allocator(), )) as _) diff --git a/vortex-datafusion/src/persistent/sink.rs b/vortex-datafusion/src/persistent/sink.rs index 51cb60953da..9f5e8ad184d 100644 --- a/vortex-datafusion/src/persistent/sink.rs +++ b/vortex-datafusion/src/persistent/sink.rs @@ -100,24 +100,26 @@ impl FileSink for VortexSink { object_store: Arc, ) -> DFResult { let mut file_write_tasks: JoinSet> = JoinSet::new(); + let writer_schema = get_writer_schema(&self.config); + let dtype = self + .session + .arrow() + .from_arrow_schema(&writer_schema) + .map_err(|e| { + exec_datafusion_err!("Failed to derive Vortex DType from writer schema: {e}") + })?; // TODO(adamg): // 1. We can probably be better at signaling how much memory we're consuming (potentially when reading too), see ParquetSink::spawn_writer_tasks_and_join. while let Some((path, rx)) = file_stream_rx.recv().await { let session = self.session.clone(); let object_store = Arc::clone(&object_store); - let writer_schema = get_writer_schema(&self.config); - let dtype = session - .arrow() - .from_arrow_schema(&writer_schema) - .map_err(|e| { - exec_datafusion_err!("Failed to derive Vortex DType from writer schema: {e}") - })?; // We need to spawn work because there's a dependency between the different files. If one file has too many batches buffered, // the demux task might deadlock itself. let arrow_session = session.clone(); let import_schema = Arc::clone(&writer_schema); + let dtype = dtype.clone(); file_write_tasks.spawn(async move { let stream = ReceiverStream::new(rx).map(move |rb| { arrow_session