diff --git a/dlio_benchmark/reader/parquet_reader.py b/dlio_benchmark/reader/parquet_reader.py index 6397e932..334349f5 100644 --- a/dlio_benchmark/reader/parquet_reader.py +++ b/dlio_benchmark/reader/parquet_reader.py @@ -15,6 +15,9 @@ Configuration (under storage_options in the DLIO YAML): columns: null # list of column names to read (null = all) row_group_cache_size: 4 # max row groups held in memory per reader thread + metadata_cache: true # cache parquet footer metadata across opens + memory_map: true # use memory-mapped I/O + file_cache: true # keep 1 ParquetFile open across close/open calls Example YAML snippet: dataset: @@ -24,6 +27,8 @@ storage_options: columns: ["feature1", "label"] row_group_cache_size: 8 + metadata_cache: true + memory_map: true """ import bisect @@ -57,6 +62,15 @@ def __init__(self, dataset_type, thread_index, epoch): opts = getattr(self._args, "storage_options", {}) or {} + # Configuration flags + self._use_metadata_cache = opts.get("metadata_cache", True) + self._use_memory_map = opts.get("memory_map", True) + self._use_file_cache = opts.get("file_cache", True) + + # Metadata cache: filename -> (FileMetaData, cumulative_offsets) + # Caches parquet footer metadata to avoid re-reading it on every open + self._metadata_cache: dict = {} + # Optional column selection (list[str] or None = all columns) self._columns = opts.get("columns") or None @@ -65,9 +79,15 @@ def __init__(self, dataset_type, thread_index, epoch): self._rg_cache: dict = {} self._rg_lru: list = [] # insertion-order LRU key list + # File cache: keeps at most 1 ParquetFile open across close/open cycles + # Stored as (filename, (pf, offsets)) or None + self._file_cache = None + self.logger.info( f"{utcnow()} ParquetReader thread={thread_index} epoch={epoch} " - f"columns={self._columns} rg_cache_size={self._rg_cache_size}" + f"columns={self._columns} rg_cache_size={self._rg_cache_size} " + f"metadata_cache={self._use_metadata_cache} memory_map={self._use_memory_map} " + f"file_cache={self._use_file_cache}" ) # ── Helpers ────────────────────────────────────────────────────────────── @@ -78,6 +98,14 @@ def _evict_lru(self): oldest = self._rg_lru.pop(0) self._rg_cache.pop(oldest, None) + def _evict_rg_for_file(self, filename): + """Drop all row-group cache entries belonging to ``filename``.""" + keys_to_remove = [k for k in self._rg_cache if k[0] == filename] + for k in keys_to_remove: + self._rg_cache.pop(k, None) + if k in self._rg_lru: + self._rg_lru.remove(k) + # ── FormatReader interface ──────────────────────────────────────────────── @dlp.log @@ -88,31 +116,76 @@ def open(self, filename): Returns (ParquetFile, cumulative_offsets) stored in open_file_map[filename]. cumulative_offsets[i] is the first row index of row group i; cumulative_offsets[-1] is the total row count. + + With metadata_cache=True, caches parquet metadata (footer) to avoid re-reading. + With memory_map=True, uses memory-mapped I/O for faster access. + With file_cache=True, returns a cached ParquetFile handle if the same + file was the last one closed, avoiding any re-open work. """ import pyarrow.parquet as pq - pf = pq.ParquetFile(filename) - meta = pf.metadata + # File cache hit: same file as the last one we kept open + if self._use_file_cache and self._file_cache is not None and self._file_cache[0] == filename: + return self._file_cache[1] + + # File cache miss with a different file cached: evict it now + if self._use_file_cache and self._file_cache is not None: + old_filename = self._file_cache[0] + self._evict_rg_for_file(old_filename) + self._file_cache = None + + cached_meta = None + cached_offsets = None + + # Check if metadata is cached + if self._use_metadata_cache: + cached = self._metadata_cache.get(filename) + if cached is not None: + cached_meta, cached_offsets = cached + + # Open the file - pass cached metadata to skip footer read if available + pf = pq.ParquetFile( + filename, + memory_map=self._use_memory_map, + metadata=cached_meta + ) - # Build cumulative row offsets [0, rg0_rows, rg0+rg1_rows, ...] - offsets = [0] - for i in range(meta.num_row_groups): - offsets.append(offsets[-1] + meta.row_group(i).num_rows) + # Use cached offsets or compute them + if cached_offsets is not None: + offsets = cached_offsets + else: + # Build cumulative row offsets [0, rg0_rows, rg0+rg1_rows, ...] + meta = pf.metadata + offsets = [0] + for i in range(meta.num_row_groups): + offsets.append(offsets[-1] + meta.row_group(i).num_rows) + + # Cache the metadata and offsets + if self._use_metadata_cache: + self._metadata_cache[filename] = (meta, offsets) - self.logger.debug( - f"{utcnow()} ParquetReader.open {filename} " - f"row_groups={meta.num_row_groups} total_rows={offsets[-1]}" - ) - return (pf, offsets) + handle = (pf, offsets) + + # Populate the 1-slot file cache + if self._use_file_cache: + self._file_cache = (filename, handle) + + return handle @dlp.log def close(self, filename): - """Evict cached row groups for this file to free memory.""" - keys_to_remove = [k for k in self._rg_cache if k[0] == filename] - for k in keys_to_remove: - self._rg_cache.pop(k, None) - if k in self._rg_lru: - self._rg_lru.remove(k) + """ + Close ``filename`` and evict its row-group cache entries. + + With ``file_cache`` enabled, the most recently used file is kept open; + the actual close/eviction is deferred until a different file is opened + (handled in :meth:`open`) or until :meth:`finalize` runs. + """ + if self._use_file_cache and self._file_cache is not None and self._file_cache[0] == filename: + # Keep this file open in the 1-slot file cache + return + + self._evict_rg_for_file(filename) super().close(filename) @dlp.log @@ -169,6 +242,7 @@ def read_index(self, image_idx, step): def finalize(self): self._rg_cache.clear() self._rg_lru.clear() + self._file_cache = None return super().finalize() def is_index_based(self):