From 0ab2316a6551273b7a8c757813187acfdc1a9969 Mon Sep 17 00:00:00 2001 From: Heng Ge Date: Tue, 12 May 2026 20:45:22 -0700 Subject: [PATCH 1/9] feat: add manifest version hint for fast latest-version lookup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On object stores where listing is not lexicographically ordered (e.g. S3 Express, the local filesystem), resolving the latest manifest version is O(n) in the number of versions. After every successful commit on such a store, write a small JSON file `_versions/latest_version_hint.json` (`{"version":N}`); readers use it as a starting point and probe a few higher versions with HEAD requests (O(k), k = versions added since the hint was written), falling back to a full listing if the hint is missing (older datasets) or stale, or if a transient object-store error makes the probed range untrustworthy. The hint is written/read only on non-lexically-ordered stores — on S3 Standard / GCS / Azure / DynamoDB / memory the ordered listing already resolves the latest version in roughly one request. The write is awaited as part of the commit (no fire-and-forget mode) and is best-effort: failures are logged and ignored, since the hint only accelerates reads and never affects correctness. Detached versions are never hinted. `current_manifest_path` uses the hint for non-lexically-ordered, non-local stores (the local filesystem keeps its single-directory-read fast path); `CommitHandler::list_manifest_locations_since` (used by `load_new_transactions`) follows the same strategy, with the gap-fill HEADs bounded by `io_parallelism()` and a fallback to a single paginated listing once a reader is more than 1000 versions behind. Carries on #5997 / discussion #5947, and follows up on #6728 where moving S3 Express to a version hint was raised. --- docs/src/format/table/layout.md | 15 +- java/src/test/java/org/lance/DatasetTest.java | 7 +- python/python/tests/test_dataset.py | 17 +- rust/lance-table/src/io/commit.rs | 513 +++++++++++++++++- .../src/io/commit/external_manifest.rs | 7 +- rust/lance/Cargo.toml | 4 + rust/lance/benches/manifest_commit.rs | 371 +++++++++++++ rust/lance/src/dataset.rs | 17 +- .../src/dataset/tests/dataset_versioning.rs | 2 + rust/lance/src/dataset/write/commit.rs | 81 ++- rust/lance/src/io/commit/external_manifest.rs | 18 + 11 files changed, 1031 insertions(+), 21 deletions(-) create mode 100644 rust/lance/benches/manifest_commit.rs diff --git a/docs/src/format/table/layout.md b/docs/src/format/table/layout.md index 46efa56a908..8a7026a8951 100644 --- a/docs/src/format/table/layout.md +++ b/docs/src/format/table/layout.md @@ -20,7 +20,8 @@ A Lance dataset in its basic form stores all files within the dataset root direc data/ *.lance -- Data files containing column data _versions/ - *.manifest -- Manifest files (one per version) + *.manifest -- Manifest files (one per version) + latest_version_hint.json -- Optional hint of the latest version (see below) _transactions/ *.txn -- Transaction files for commit coordination _deletions/ @@ -201,3 +202,15 @@ Manifest files are stored in the `_versions/` directory with naming schemes that See [Manifest Naming Schemes](transaction.md#manifest-naming-schemes) for details on the V1 and V2 patterns and their implications for version discovery. +### Version Hint + +On object stores where listing is not lexicographically ordered (e.g. S3 Express, the local filesystem), finding the latest version by listing `_versions/` is O(n) in the number of versions. +To avoid this, writers on such stores write `_versions/latest_version_hint.json` after each successful commit: + +```json +{"version": 42} +``` + +Readers use the hint as a starting point and probe a few higher versions with HEAD requests to find the true latest, falling back to a full listing if the hint is missing (older datasets) or stale. +The hint is purely an optimization: it never affects correctness, can be safely deleted, and is ignored by readers that don't understand it. + diff --git a/java/src/test/java/org/lance/DatasetTest.java b/java/src/test/java/org/lance/DatasetTest.java index c01154b0b71..315b010da1e 100644 --- a/java/src/test/java/org/lance/DatasetTest.java +++ b/java/src/test/java/org/lance/DatasetTest.java @@ -528,7 +528,12 @@ void testOpenSerializedManifest(@TempDir Path tempDir) throws IOException { assertEquals(1, dataset1.version()); Path manifestPath = datasetPath.resolve("_versions"); try (Stream fileStream = Files.list(manifestPath)) { - assertEquals(1, fileStream.count()); + // Ignore the version hint file, which is not a manifest. + assertEquals( + 1, + fileStream + .filter(p -> !p.getFileName().toString().startsWith("latest_version_hint")) + .count()); ByteBuffer manifestBuffer = readManifest(manifestPath.resolve("1.manifest")); try (Dataset dataset2 = testDataset.write(1, 5)) { assertEquals(2, dataset2.version()); diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 428b85095ce..dc2030c4b2b 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -436,18 +436,27 @@ def test_has_stable_row_ids_property(tmp_path: Path): assert lance.dataset(non_stable_path).has_stable_row_ids is False +def _list_manifests(versions_dir): + # Ignore the version hint file, which is not a manifest. + return [ + name + for name in os.listdir(versions_dir) + if not name.startswith("latest_version_hint") + ] + + def test_v2_manifest_paths(tmp_path: Path): lance.write_dataset( pa.table({"a": range(100)}), tmp_path, enable_v2_manifest_paths=True ) - manifest_path = os.listdir(tmp_path / "_versions") + manifest_path = _list_manifests(tmp_path / "_versions") assert len(manifest_path) == 1 assert re.match(r"\d{20}\.manifest", manifest_path[0]) def test_default_v2_manifest_paths(tmp_path: Path): lance.write_dataset(pa.table({"a": range(100)}), tmp_path) - manifest_path = os.listdir(tmp_path / "_versions") + manifest_path = _list_manifests(tmp_path / "_versions") assert len(manifest_path) == 1 assert re.match(r"\d{20}\.manifest", manifest_path[0]) @@ -457,12 +466,12 @@ def test_v2_manifest_paths_migration(tmp_path: Path): lance.write_dataset( pa.table({"a": range(100)}), tmp_path, enable_v2_manifest_paths=False ) - manifest_path = os.listdir(tmp_path / "_versions") + manifest_path = _list_manifests(tmp_path / "_versions") assert manifest_path == ["1.manifest"] # Migrate to v2 manifest paths lance.dataset(tmp_path).migrate_manifest_paths_v2() - manifest_path = os.listdir(tmp_path / "_versions") + manifest_path = _list_manifests(tmp_path / "_versions") assert len(manifest_path) == 1 assert re.match(r"\d{20}\.manifest", manifest_path[0]) diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index c90909d7db5..86ba80101ab 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -70,6 +70,13 @@ use { pub const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; const DETACHED_VERSION_PREFIX: &str = "d"; +/// File name for the JSON version hint file, stored under `_versions/`. +/// +/// The file contains `{"version":N}` where `N` is the latest committed version +/// at the time of writing. It enables O(1)/O(k) latest-version lookup via HEAD +/// requests on object stores where listing is not lexicographically ordered +/// (e.g. S3 Express, local filesystem) instead of an O(n) listing. +const VERSION_HINT_FILE: &str = "latest_version_hint.json"; /// How manifest files should be named. #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -260,17 +267,274 @@ impl TryFrom for ManifestLocation { } } -/// Get the latest manifest path +/// Get the latest manifest path. +/// +/// - Local filesystem: a single directory read. +/// - Stores where listing is not lexicographically ordered (e.g. S3 Express): +/// the version hint (read the hint file, then probe higher versions with +/// HEADs), falling back to a listing if the hint is missing or stale. A full +/// listing on these stores is O(n) in the number of versions. +/// - Lexicographically ordered stores (e.g. S3 Standard, GCS): the listing +/// already resolves the latest version in roughly one request. async fn current_manifest_path( object_store: &ObjectStore, base: &Path, ) -> Result { - if object_store.is_local() - && let Ok(Some(location)) = current_manifest_local(base) + if object_store.is_local() { + if let Ok(Some(location)) = current_manifest_local(base) { + return Ok(location); + } + } else if uses_version_hint(object_store) + && let Some(location) = read_version_hint_and_probe(object_store, base).await { return Ok(location); } + resolve_version_from_listing(object_store, base).await +} + +/// JSON body of the version hint file: `{"version":N}`. +#[derive(serde::Serialize, serde::Deserialize)] +struct VersionHint { + version: u64, +} + +/// Whether this object store benefits from a version hint. +/// +/// On stores where listing is lexicographically ordered (S3 Standard, GCS, +/// Azure, ...) the latest version is already resolved in roughly one request, +/// so the hint would only add a write per commit for nothing. We write (and +/// read) it only on stores where listing is not lexicographically ordered — +/// S3 Express and the local filesystem. +fn uses_version_hint(object_store: &ObjectStore) -> bool { + !object_store.list_is_lexically_ordered +} + +/// Path to the JSON version hint file for a dataset. +fn version_hint_path(base: &Path) -> Path { + base.clone().join(VERSIONS_DIR).join(VERSION_HINT_FILE) +} + +/// Write the version hint file after a successful commit. +/// +/// The hint is stored as JSON: `{"version":N}`. This write is best-effort — +/// failures are logged and ignored, since the hint only accelerates reads and +/// never affects correctness (readers verify the hinted version and probe +/// upward from there). It is a no-op for detached versions and for stores that +/// do not benefit from a hint (see [`uses_version_hint`]). +pub async fn write_version_hint(object_store: &ObjectStore, base: &Path, version: u64) { + if is_detached_version(version) || !uses_version_hint(object_store) { + return; + } + let hint_path = version_hint_path(base); + let content = serde_json::to_vec(&VersionHint { version }).expect("serialize version hint"); + if let Err(e) = object_store.put(&hint_path, content.as_slice()).await { + warn!("Failed to write version hint file for version {version}: {e}"); + } +} + +/// Read the latest version from the hint file, or `None` if it does not exist +/// or cannot be parsed. +async fn read_version_from_hint(object_store: &ObjectStore, base: &Path) -> Option { + let bytes = object_store + .inner + .get(&version_hint_path(base)) + .await + .ok()? + .bytes() + .await + .ok()?; + Some(serde_json::from_slice::(&bytes).ok()?.version) +} + +/// Read the version hint and probe upward to find the true latest manifest. +/// +/// Returns `None` if the hint file is missing, the hinted version no longer +/// exists, or any error occurred — callers should fall back to listing. +async fn read_version_hint_and_probe( + object_store: &ObjectStore, + base: &Path, +) -> Option { + let hint_version = read_version_from_hint(object_store, base).await?; + let (version, scheme, mut probed) = probe_versions_upward(object_store, base, hint_version) + .await + .ok() + .flatten()?; + // `probed` is non-empty and its last entry is the highest version found. + let (_, meta) = probed.pop()?; + Some(ManifestLocation { + version, + path: scheme.manifest_path(base, version), + size: Some(meta.size), + naming_scheme: scheme, + e_tag: meta.e_tag, + }) +} + +/// Maximum version gap between the hint and the read version for which we use +/// the hint-based parallel-HEAD path; beyond this a single (paginated) listing +/// is cheaper, so callers fall back to it. +const MAX_HINT_PROBE_GAP: u64 = 1000; + +/// Probe `from_version`, then `from_version + 1`, `+ 2`, ... with HEAD requests +/// until one is not found. +/// +/// Assumes attached versions are contiguous above `from_version` (true in +/// practice: every commit increments by one, and cleanup only removes *old* +/// versions, never ones newer than the latest). A `NotFound` therefore marks +/// the end of the history. +/// +/// - `Ok(Some((true_latest_version, naming_scheme, [(version, meta), ...])))`: +/// the vec covers every version from `from_version` through the true latest +/// in ascending order. +/// - `Ok(None)`: `from_version` itself does not exist (a `NotFound` for both +/// naming schemes) — i.e. the hint pointed past the end. +/// - `Err(_)`: a transient object-store error was hit, so the probed range may +/// be incomplete; callers should fall back to a full listing rather than +/// trust a possibly-stale result. +async fn probe_versions_upward( + object_store: &ObjectStore, + base: &Path, + from_version: u64, +) -> Result< + Option<( + u64, + ManifestNamingScheme, + Vec<(u64, object_store::ObjectMeta)>, + )>, +> { + // Newer datasets use V2; fall back to V1 if the V2 path is not found. + let mut scheme = ManifestNamingScheme::V2; + let meta = match object_store + .inner + .head(&scheme.manifest_path(base, from_version)) + .await + { + Ok(meta) => meta, + Err(ObjectStoreError::NotFound { .. }) => { + scheme = ManifestNamingScheme::V1; + match object_store + .inner + .head(&scheme.manifest_path(base, from_version)) + .await + { + Ok(meta) => meta, + Err(ObjectStoreError::NotFound { .. }) => return Ok(None), + Err(e) => return Err(e.into()), + } + } + Err(e) => return Err(e.into()), + }; + + let mut probed = vec![(from_version, meta)]; + let mut version = from_version; + loop { + let next = version + 1; + match object_store + .inner + .head(&scheme.manifest_path(base, next)) + .await + { + Ok(meta) => { + probed.push((next, meta)); + version = next; + } + // NotFound means we found the latest version. + Err(ObjectStoreError::NotFound { .. }) => break, + // A transient error means a newer version might exist that we + // failed to observe — surface it so callers fall back to listing. + Err(e) => return Err(e.into()), + } + } + Ok(Some((version, scheme, probed))) +} + +/// List manifest locations with version `> since_version` using the version +/// hint, in descending order of version. +/// +/// Returns `None` if the hint is missing or stale enough that this is not +/// usable — callers should fall back to a full listing. `Some(vec![])` is the +/// fast path where the hint confirms there are no new versions. +async fn list_manifests_since_version_with_hint( + object_store: &ObjectStore, + base: &Path, + since_version: u64, +) -> Option> { + let hint_version = read_version_from_hint(object_store, base).await?; + + // A reader that is very far behind is cheaper to serve with one paginated + // listing than with thousands of HEADs. + if hint_version.saturating_sub(since_version) > MAX_HINT_PROBE_GAP { + return None; + } + + // If the hint is not newer than the read version, the only versions that + // could exist are right above it; otherwise start at the hint. + let probe_from = if hint_version > since_version { + hint_version + } else { + since_version + 1 + }; + + let (scheme, probed) = match probe_versions_upward(object_store, base, probe_from).await { + Ok(Some((_true_latest, scheme, probed))) => (scheme, probed), + // Nothing at `probe_from`. If we were probing from the hint, the hint + // is stale — bail to a full listing. If we were probing from + // `since_version + 1`, there are simply no new versions. + Ok(None) if hint_version > since_version => return None, + Ok(None) => return Some(Vec::new()), + // Transient error: don't trust the hint path, fall back to listing. + Err(_) => return None, + }; + + let mut locations: Vec = probed + .into_iter() + .filter(|(v, _)| *v > since_version) + .map(|(version, meta)| ManifestLocation { + version, + path: scheme.manifest_path(base, version), + size: Some(meta.size), + naming_scheme: scheme, + e_tag: meta.e_tag, + }) + .collect(); + + // Fill the gap between `since_version` and the hint with HEADs (the probe + // above already covered `hint_version` and up). The range is contiguous, so + // any error here (including a `NotFound`) means we can't trust the hint path + // — fall back to a full listing. + if hint_version > since_version + 1 { + let gap_locations: Vec = + futures::stream::iter((since_version + 1)..hint_version) + .map(|version| async move { + object_store + .inner + .head(&scheme.manifest_path(base, version)) + .await + .map(|meta| ManifestLocation { + version, + path: scheme.manifest_path(base, version), + size: Some(meta.size), + naming_scheme: scheme, + e_tag: meta.e_tag, + }) + }) + .buffer_unordered(object_store.io_parallelism()) + .try_collect() + .await + .ok()?; + locations.extend(gap_locations); + } + + locations.sort_by_key(|loc| std::cmp::Reverse(loc.version)); + Some(locations) +} + +/// Resolve the latest manifest by listing the versions directory. +async fn resolve_version_from_listing( + object_store: &ObjectStore, + base: &Path, +) -> Result { let manifest_files = object_store.list(Some(base.clone().join(VERSIONS_DIR))); let mut valid_manifests = manifest_files.try_filter_map(|res| { @@ -588,6 +852,51 @@ pub trait CommitHandler: Debug + Send + Sync { } } + /// List manifest locations with version `> since_version`, in descending + /// order of version. + /// + /// On lexically-ordered stores this is the standard listing with early + /// termination. On non-lexically-ordered stores (e.g. S3 Express) it uses + /// the version hint to avoid an O(n) listing, falling back to a full + /// listing if the hint is missing or stale. + fn list_manifest_locations_since<'a>( + &self, + base_path: &Path, + object_store: &'a ObjectStore, + since_version: u64, + ) -> BoxStream<'a, Result> { + if object_store.list_is_lexically_ordered { + return self + .list_manifest_locations(base_path, object_store, true) + .try_take_while(move |loc| future::ready(Ok(loc.version > since_version))) + .boxed(); + } + + let base_path = base_path.clone(); + futures::stream::once(async move { + let locations = match list_manifests_since_version_with_hint( + object_store, + &base_path, + since_version, + ) + .await + { + Some(locations) => locations, + None => { + let mut locations = list_manifests(&base_path, &object_store.inner) + .try_collect::>() + .await?; + locations.retain(|loc| loc.version > since_version); + locations.sort_by_key(|loc| std::cmp::Reverse(loc.version)); + locations + } + }; + Ok::<_, Error>(futures::stream::iter(locations.into_iter().map(Ok))) + }) + .try_flatten() + .boxed() + } + /// Commit a manifest. /// /// This function should return an [CommitError::CommitConflict] if another @@ -877,6 +1186,8 @@ impl CommitHandler for UnsafeCommitHandler { let res = manifest_writer(object_store, manifest, indices, &version_path, transaction).await?; + write_version_hint(object_store, base_path, manifest.version).await; + Ok(ManifestLocation { version: manifest.version, size: Some(res.size as u64), @@ -960,6 +1271,9 @@ impl CommitHandler for T { lease.release(res.is_ok()).await?; let res = res?; + + write_version_hint(object_store, base_path, manifest.version).await; + Ok(ManifestLocation { version: manifest.version, size: Some(res.size as u64), @@ -1028,6 +1342,7 @@ impl CommitHandler for RenameCommitHandler { { Ok(_) => { // Successfully committed + write_version_hint(object_store, base_path, manifest.version).await; Ok(ManifestLocation { version: manifest.version, path, @@ -1103,6 +1418,8 @@ impl CommitHandler for ConditionalPutCommitHandler { _ => CommitError::OtherError(err.into()), })?; + write_version_hint(object_store, base_path, manifest.version).await; + Ok(ManifestLocation { version: manifest.version, path, @@ -1282,6 +1599,196 @@ mod tests { assert_eq!(location.path, naming_scheme.manifest_path(&base, 11)); } + /// A memory store that reports `list_is_lexically_ordered == false`, like + /// S3 Express, so the version-hint paths are exercised. + fn non_lexical_memory_store() -> Box { + let mut object_store = ObjectStore::memory(); + object_store.list_is_lexically_ordered = false; + Box::new(object_store) + } + + #[tokio::test] + async fn test_write_version_hint() { + let base = Path::from("base"); + + // No hint is written on lexically-ordered stores (it would not be read). + let lexical = ObjectStore::memory(); + write_version_hint(&lexical, &base, 42).await; + assert_eq!(read_version_from_hint(&lexical, &base).await, None); + + let object_store = non_lexical_memory_store(); + write_version_hint(&object_store, &base, 42).await; + assert_eq!(read_version_from_hint(&object_store, &base).await, Some(42)); + + // A later commit overwrites the hint. + write_version_hint(&object_store, &base, 100).await; + assert_eq!( + read_version_from_hint(&object_store, &base).await, + Some(100) + ); + + // Detached versions are never written to the hint. + write_version_hint( + &object_store, + &base, + crate::format::DETACHED_VERSION_MASK | 7, + ) + .await; + assert_eq!( + read_version_from_hint(&object_store, &base).await, + Some(100) + ); + + // A corrupt / non-JSON hint file is treated as missing. + let hint_path = version_hint_path(&base); + object_store + .put(&hint_path, b"not json".as_slice()) + .await + .unwrap(); + assert_eq!(read_version_from_hint(&object_store, &base).await, None); + } + + #[tokio::test] + #[rstest::rstest] + async fn test_read_version_hint_and_probe( + #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)] + naming_scheme: ManifestNamingScheme, + ) { + let object_store = non_lexical_memory_store(); + let base = Path::from("base"); + + // No hint file yet. + assert!( + read_version_hint_and_probe(&object_store, &base) + .await + .is_none() + ); + + for version in 1..=5 { + object_store + .put(&naming_scheme.manifest_path(&base, version), b"".as_slice()) + .await + .unwrap(); + } + + // Stale hint: should probe forward and find version 5. + write_version_hint(&object_store, &base, 3).await; + let location = read_version_hint_and_probe(&object_store, &base) + .await + .unwrap(); + assert_eq!(location.version, 5); + assert_eq!(location.naming_scheme, naming_scheme); + + // Up-to-date hint: returns version 5 directly. + write_version_hint(&object_store, &base, 5).await; + let location = read_version_hint_and_probe(&object_store, &base) + .await + .unwrap(); + assert_eq!(location.version, 5); + + // Hint points past the latest version: not usable. + write_version_hint(&object_store, &base, 10).await; + assert!( + read_version_hint_and_probe(&object_store, &base) + .await + .is_none() + ); + } + + #[tokio::test] + async fn test_list_manifests_since_version_with_hint() { + let object_store = non_lexical_memory_store(); + let base = Path::from("base"); + let scheme = ManifestNamingScheme::V2; + + for version in 1..=10 { + object_store + .put(&scheme.manifest_path(&base, version), b"".as_slice()) + .await + .unwrap(); + } + + // No hint yet -> not usable, caller must fall back. + assert!( + list_manifests_since_version_with_hint(&object_store, &base, 7) + .await + .is_none() + ); + + // Hint exactly at the read version -> fast path, nothing new. + write_version_hint(&object_store, &base, 10).await; + assert!(matches!( + list_manifests_since_version_with_hint(&object_store, &base, 10).await, + Some(v) if v.is_empty() + )); + + // Hint ahead of the read version, with a gap to fill (8, 9) plus probing + // from the hint (10). Results are descending by version. + let locations = list_manifests_since_version_with_hint(&object_store, &base, 7) + .await + .unwrap(); + assert_eq!( + locations.iter().map(|l| l.version).collect::>(), + vec![10, 9, 8] + ); + + // Slightly stale hint (points at 8) still probes up to the true latest. + write_version_hint(&object_store, &base, 8).await; + let locations = list_manifests_since_version_with_hint(&object_store, &base, 7) + .await + .unwrap(); + assert_eq!( + locations.iter().map(|l| l.version).collect::>(), + vec![10, 9, 8] + ); + + // Hint points past the latest -> not usable, caller falls back. + write_version_hint(&object_store, &base, 20).await; + assert!( + list_manifests_since_version_with_hint(&object_store, &base, 7) + .await + .is_none() + ); + } + + #[tokio::test] + async fn test_current_manifest_path_with_hint_non_lexical() { + // Simulate S3 Express (non-lexically ordered list) with many versions. + let object_store = non_lexical_memory_store(); + let base = Path::from("base"); + let naming_scheme = ManifestNamingScheme::V2; + + for version in 1..=100 { + object_store + .put(&naming_scheme.manifest_path(&base, version), b"".as_slice()) + .await + .unwrap(); + } + + // Slightly stale hint: probing from 98 still resolves the true latest. + write_version_hint(&object_store, &base, 98).await; + let location = current_manifest_path(&object_store, &base).await.unwrap(); + assert_eq!(location.version, 100); + } + + #[tokio::test] + async fn test_current_manifest_path_with_stale_hint_falls_back_to_listing() { + let object_store = non_lexical_memory_store(); + let base = Path::from("base"); + let naming_scheme = ManifestNamingScheme::V2; + + // Only version 5 exists, but the hint claims version 10. + object_store + .put(&naming_scheme.manifest_path(&base, 5), b"".as_slice()) + .await + .unwrap(); + write_version_hint(&object_store, &base, 10).await; + + // The stale hint is ignored; listing finds version 5. + let location = current_manifest_path(&object_store, &base).await.unwrap(); + assert_eq!(location.version, 5); + } + #[test] fn test_parse_detached_version() { // Valid detached version filenames diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index 49d651fe6eb..75993ca8d1f 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -21,7 +21,7 @@ use tracing::info; use super::{ MANIFEST_EXTENSION, ManifestLocation, ManifestNamingScheme, current_manifest_path, - default_resolve_version, make_staging_manifest_path, + default_resolve_version, make_staging_manifest_path, write_version_hint, }; use crate::format::{IndexMetadata, Manifest, Transaction}; use crate::io::commit::{CommitError, CommitHandler}; @@ -490,7 +490,10 @@ impl CommitHandler for ExternalManifestCommitHandler { .await; match result { - Ok(location) => Ok(location), + Ok(location) => { + write_version_hint(object_store, base_path, manifest.version).await; + Ok(location) + } Err(_) => { // delete the staging manifest match object_store.inner.delete(&staging_path).await { diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 9f055fc9154..6a6825afd57 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -247,5 +247,9 @@ name = "mem_wal_fts_bench" path = "benches/mem_wal/fts/mem_wal_fts_bench.rs" harness = false +[[bench]] +name = "manifest_commit" +harness = false + [lints] workspace = true diff --git a/rust/lance/benches/manifest_commit.rs b/rust/lance/benches/manifest_commit.rs new file mode 100644 index 00000000000..2a98a37a498 --- /dev/null +++ b/rust/lance/benches/manifest_commit.rs @@ -0,0 +1,371 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Benchmark for manifest commit performance with many small fragments. +//! +//! This benchmark tests how performance degrades as the number of small fragments +//! grows. Each fragment contains only 10 rows, and we measure both: +//! - Commit time (manifest write only, excludes fragment data writing) +//! - Load time (manifest read from storage, using checkout_latest) +//! +//! Key optimizations: +//! - Uses shared ObjectStoreRegistry to reuse TCP/TLS connections +//! - Disables auto-cleanup to avoid background cleanup overhead +//! - Separates fragment writing from commit measurement +//! +//! ## Running against S3 Express +//! +//! ```bash +//! export AWS_REGION=us-east-1 +//! export DATASET_PREFIX=s3://your-bucket--use1-az4--x-s3/bench/manifest_commit +//! export NUM_ITERATIONS=100 +//! cargo bench --bench manifest_commit +//! ``` +//! +//! ## Running against local filesystem (with temp directory) +//! +//! ```bash +//! cargo bench --bench manifest_commit +//! ``` +//! +//! ## Configuration +//! +//! - `DATASET_PREFIX`: Base URI for datasets (e.g. s3://bucket/prefix or /tmp/bench). +//! If not set, uses a temporary directory. +//! - `NUM_ITERATIONS`: Number of small fragment writes to perform (default: 100). +//! - `ROWS_PER_FRAGMENT`: Number of rows per fragment (default: 10). +//! - `DELETE_DATASET`: When "true", delete the dataset after benchmark completes. +//! - `ENABLE_CACHE`: When "true", enable manifest caching for load measurements. +//! Default is "false" to measure actual storage read latency. + +#![allow(clippy::print_stdout)] + +use arrow_array::{Int64Array, RecordBatch, RecordBatchIterator, StringArray}; +use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use criterion::{Criterion, criterion_group, criterion_main}; +use lance::dataset::builder::DatasetBuilder; +use lance::dataset::{CommitBuilder, Dataset, InsertBuilder, WriteMode, WriteParams}; +use lance::session::Session; +use lance_io::object_store::ObjectStoreRegistry; +use std::sync::Arc; +use std::time::Instant; +use tokio::runtime::Runtime; +use uuid::Uuid; + +const DEFAULT_ROWS_PER_FRAGMENT: usize = 10; +const DEFAULT_NUM_ITERATIONS: usize = 100; + +fn get_rows_per_fragment() -> usize { + std::env::var("ROWS_PER_FRAGMENT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_ROWS_PER_FRAGMENT) +} + +fn get_num_iterations() -> usize { + std::env::var("NUM_ITERATIONS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_NUM_ITERATIONS) +} + +fn get_delete_dataset() -> bool { + std::env::var("DELETE_DATASET") + .map(|s| s.to_lowercase() == "true") + .unwrap_or(false) +} + +fn get_enable_cache() -> bool { + std::env::var("ENABLE_CACHE") + .map(|s| s.to_lowercase() == "true") + .unwrap_or(false) +} + +fn get_dataset_prefix() -> String { + std::env::var("DATASET_PREFIX").unwrap_or_else(|_| { + let temp_dir = std::env::temp_dir().join(format!("lance_bench_{}", Uuid::new_v4())); + std::fs::create_dir_all(&temp_dir).expect("Failed to create temp directory"); + temp_dir.to_string_lossy().to_string() + }) +} + +fn get_storage_label(prefix: &str) -> &'static str { + if prefix.starts_with("s3://") { + "s3" + } else if prefix.starts_with("gs://") { + "gcs" + } else if prefix.starts_with("az://") { + "azure" + } else if prefix.starts_with("memory://") { + "memory" + } else { + "local" + } +} + +async fn create_initial_dataset( + uri: &str, + rows_per_fragment: usize, + session: Arc, +) -> Dataset { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + ])); + + let batch = create_batch(schema.clone(), 0, rows_per_fragment); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + + std::fs::remove_dir_all(uri).ok(); + + let params = WriteParams { + session: Some(session), + skip_auto_cleanup: true, + ..Default::default() + }; + + Dataset::write(reader, uri, Some(params)) + .await + .expect("failed to create initial dataset") +} + +fn create_batch(schema: Arc, start_id: usize, num_rows: usize) -> RecordBatch { + let ids = Int64Array::from_iter_values((start_id as i64)..((start_id + num_rows) as i64)); + let names = StringArray::from_iter_values( + (start_id..(start_id + num_rows)).map(|i| format!("name_{}", i)), + ); + + RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(names)]) + .expect("failed to create batch") +} + +fn bench_manifest_commit(c: &mut Criterion) { + let runtime = Runtime::new().expect("failed to build tokio runtime"); + + let dataset_prefix = get_dataset_prefix(); + let num_iterations = get_num_iterations(); + let rows_per_fragment = get_rows_per_fragment(); + let delete_dataset = get_delete_dataset(); + let enable_cache = get_enable_cache(); + let storage_label = get_storage_label(&dataset_prefix); + + let short_id = &Uuid::new_v4().to_string()[..8]; + let uri = format!( + "{}/manifest_commit_{}", + dataset_prefix.trim_end_matches('/'), + short_id + ); + + println!("=== Manifest Commit Benchmark Setup ==="); + println!("Storage: {} ({})", uri, storage_label); + println!("Rows per fragment: {}", rows_per_fragment); + println!("Number of iterations: {}", num_iterations); + println!( + "Total fragments (including initial): {}", + num_iterations + 1 + ); + println!("Delete dataset: {}", delete_dataset); + println!( + "Cache enabled: {} ({})", + enable_cache, + if enable_cache { + "using default cache size" + } else { + "zero cache size - measures actual storage read" + } + ); + println!(); + + // Create a shared session for both commit and load operations + // When cache is disabled, use zero cache size to measure actual storage read latency + // When cache is enabled, use default cache sizes (6GB index, 1GB metadata) + let shared_store_registry = Arc::new(ObjectStoreRegistry::default()); + let session = if enable_cache { + Arc::new(Session::default()) + } else { + Arc::new(Session::new(0, 0, shared_store_registry)) + }; + + let initial_dataset = runtime.block_on(create_initial_dataset( + &uri, + rows_per_fragment, + session.clone(), + )); + + let uri_clone = uri.clone(); + let mut load_dataset = runtime.block_on(async { + DatasetBuilder::from_uri(&uri_clone) + .with_session(session.clone()) + .load() + .await + .expect("failed to load dataset for load measurements") + }); + + let mut current_dataset = Arc::new(initial_dataset); + + let mut commit_latencies = Vec::with_capacity(num_iterations); + let mut load_latencies = Vec::with_capacity(num_iterations); + + println!("Running commit and load benchmarks..."); + println!("fragments,commit_ms,load_ms"); + + for i in 1..=num_iterations { + let num_fragments = i + 1; + + let (commit_time, new_dataset) = { + let dataset = current_dataset.clone(); + let session_clone = session.clone(); + runtime.block_on(async move { + let schema: Arc = Arc::new((&dataset.schema().clone()).into()); + let start_id = dataset.count_rows(None).await.unwrap() as usize; + let batch = create_batch(schema.clone(), start_id, rows_per_fragment); + + let write_params = WriteParams { + mode: WriteMode::Append, + session: Some(session_clone.clone()), + skip_auto_cleanup: true, + ..Default::default() + }; + + let transaction = InsertBuilder::new(dataset.clone()) + .with_params(&write_params) + .execute_uncommitted(vec![batch]) + .await + .expect("failed to write fragment"); + + let start = Instant::now(); + let new_ds = CommitBuilder::new(dataset) + .with_session(session_clone) + .with_skip_auto_cleanup(true) + .execute(transaction) + .await + .expect("failed to commit"); + (start.elapsed(), Arc::new(new_ds)) + }) + }; + + let load_time = runtime.block_on(async { + let start = Instant::now(); + load_dataset + .checkout_latest() + .await + .expect("failed to checkout latest"); + let elapsed = start.elapsed(); + + assert_eq!( + load_dataset.manifest().fragments.len(), + num_fragments, + "Expected {} fragments", + num_fragments + ); + elapsed + }); + + current_dataset = new_dataset; + + commit_latencies.push(commit_time); + load_latencies.push(load_time); + + println!( + "{},{:.2},{:.2}", + num_fragments, + commit_time.as_secs_f64() * 1000.0, + load_time.as_secs_f64() * 1000.0 + ); + } + + println!(); + println!("=== Summary Statistics ==="); + + let avg_commit: f64 = commit_latencies + .iter() + .map(|d| d.as_secs_f64()) + .sum::() + / commit_latencies.len() as f64; + let avg_load: f64 = + load_latencies.iter().map(|d| d.as_secs_f64()).sum::() / load_latencies.len() as f64; + + let min_commit = commit_latencies.iter().min().unwrap(); + let max_commit = commit_latencies.iter().max().unwrap(); + let min_load = load_latencies.iter().min().unwrap(); + let max_load = load_latencies.iter().max().unwrap(); + + println!( + "Commit latency: avg={:.2}ms, min={:.2}ms, max={:.2}ms", + avg_commit * 1000.0, + min_commit.as_secs_f64() * 1000.0, + max_commit.as_secs_f64() * 1000.0 + ); + println!( + "Load latency: avg={:.2}ms, min={:.2}ms, max={:.2}ms", + avg_load * 1000.0, + min_load.as_secs_f64() * 1000.0, + max_load.as_secs_f64() * 1000.0 + ); + + let first_10_avg_commit = commit_latencies + .iter() + .take(10) + .map(|d| d.as_secs_f64()) + .sum::() + / 10.0; + let last_10_avg_commit = commit_latencies + .iter() + .rev() + .take(10) + .map(|d| d.as_secs_f64()) + .sum::() + / 10.0; + let first_10_avg_load = load_latencies + .iter() + .take(10) + .map(|d| d.as_secs_f64()) + .sum::() + / 10.0; + let last_10_avg_load = load_latencies + .iter() + .rev() + .take(10) + .map(|d| d.as_secs_f64()) + .sum::() + / 10.0; + + println!(); + println!( + "First 10 iterations avg: commit={:.2}ms, load={:.2}ms", + first_10_avg_commit * 1000.0, + first_10_avg_load * 1000.0 + ); + println!( + "Last 10 iterations avg: commit={:.2}ms, load={:.2}ms", + last_10_avg_commit * 1000.0, + last_10_avg_load * 1000.0 + ); + println!( + "Degradation ratio: commit={:.2}x, load={:.2}x", + last_10_avg_commit / first_10_avg_commit, + last_10_avg_load / first_10_avg_load + ); + + let mut group = c.benchmark_group("manifest_commit"); + + group.bench_function("avg_commit_latency", |b| { + b.iter(|| std::time::Duration::from_secs_f64(avg_commit)) + }); + + group.bench_function("avg_load_latency", |b| { + b.iter(|| std::time::Duration::from_secs_f64(avg_load)) + }); + + group.finish(); + + if delete_dataset { + std::fs::remove_dir_all(&uri).ok(); + println!("Dataset deleted: {}", uri); + } else { + println!("Dataset preserved: {}", uri); + } +} + +criterion_group!(benches, bench_manifest_commit); +criterion_main!(benches); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 8abb5975fdd..30e43c5bb32 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2760,16 +2760,15 @@ pub(crate) struct NewTransactionResult<'a> { } pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'_> { - // Re-use the same list call for getting the latest manifest and the metadata - // for all manifests in between. + // Resolve every manifest with version > our current version (the latest plus + // the ones in between). On non-lexically-ordered stores this uses the version + // hint to avoid an O(n) listing. let io_parallelism = dataset.object_store.as_ref().io_parallelism(); - let latest_version = dataset.manifest.version; - let locations = dataset - .commit_handler - .list_manifest_locations(&dataset.base, dataset.object_store.as_ref(), true) - .try_take_while(move |location| { - futures::future::ready(Ok(location.version > latest_version)) - }); + let locations = dataset.commit_handler.list_manifest_locations_since( + &dataset.base, + dataset.object_store.as_ref(), + dataset.manifest.version, + ); // Will send the latest manifest via a channel. let (latest_tx, latest_rx) = tokio::sync::oneshot::channel(); diff --git a/rust/lance/src/dataset/tests/dataset_versioning.rs b/rust/lance/src/dataset/tests/dataset_versioning.rs index 088ce85b9a5..5ac01c498b2 100644 --- a/rust/lance/src/dataset/tests/dataset_versioning.rs +++ b/rust/lance/src/dataset/tests/dataset_versioning.rs @@ -34,6 +34,8 @@ fn assert_all_manifests_use_scheme(test_dir: &TempStdDir, scheme: ManifestNaming .read_dir() .unwrap() .map(|entry| entry.unwrap().file_name().into_string().unwrap()) + // Ignore the version hint file, which is not a manifest. + .filter(|name| !name.starts_with("latest_version_hint")) .collect::>(); assert!( entries_names diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index efab12d249c..45b78b48bcb 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -561,6 +561,7 @@ mod tests { // Should see 2 IOPs: // 1. Write the transaction files // 2. Write (conditional put) the manifest + // (the version hint is only written on non-lexically-ordered stores) assert_io_eq!(io_stats, write_iops, 2, "write txn + manifest, i = {}", i); } @@ -629,7 +630,7 @@ mod tests { // Assert io requests let io_stats = new_ds.object_store.as_ref().io_stats_incremental(); // This could be zero, if we decided to be optimistic. However, that - // would mean two wasted write requests (txn + manifest) if there was + // would mean wasted write requests (txn + manifest) if there was // a conflict. We choose to be pessimistic for more consistent performance. assert_io_eq!(io_stats, read_iops, 1); assert_io_eq!(io_stats, write_iops, 2); @@ -786,4 +787,82 @@ mod tests { ); assert_eq!(transaction.read_version, 1); } + + /// On non-lexically-ordered stores (e.g. S3 Express) a commit should use the + /// version hint (a few HEAD probes, O(k)) instead of a full O(n) listing. + #[tokio::test] + async fn test_commit_uses_version_hint_on_non_lexical_store() { + // Make `list` artificially slow per entry so a full listing would be + // obvious; HEAD/GET/PUT stay fast. + let throttled = Arc::new(ThrottledStoreWrapper { + config: ThrottleConfig { + wait_list_per_entry: Duration::from_millis(50), + wait_get_per_call: Duration::from_millis(1), + wait_put_per_call: Duration::from_millis(1), + ..Default::default() + }, + }); + let session = Arc::new(Session::default()); + let write_params = WriteParams { + store_params: Some(ObjectStoreParams { + object_store_wrapper: Some(throttled), + list_is_lexically_ordered: Some(false), + ..Default::default() + }), + session: Some(session.clone()), + enable_v2_manifest_paths: true, + ..Default::default() + }; + + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..10_i32))], + ) + .unwrap(); + let mut dataset = Arc::new( + InsertBuilder::new("memory://test_version_hint") + .with_params(&write_params) + .execute(vec![batch]) + .await + .unwrap(), + ); + + // Build up many versions so a full listing would be expensive. + for _ in 0..50 { + dataset = Arc::new( + CommitBuilder::new(dataset.clone()) + .execute(sample_transaction(dataset.manifest().version)) + .await + .unwrap(), + ); + } + assert_eq!(dataset.manifest().version, 51); + + dataset.object_store.as_ref().io_stats_incremental(); + + let start = std::time::Instant::now(); + let new_ds = CommitBuilder::new(dataset.clone()) + .execute(sample_transaction(dataset.manifest().version)) + .await + .unwrap(); + let elapsed = start.elapsed(); + + // A full listing of ~52 entries at 50ms each would take ~2.6s. + assert!( + elapsed < Duration::from_secs(1), + "commit took {elapsed:?}; the version hint path was likely not used" + ); + + let io_stats = new_ds.object_store.as_ref().io_stats_incremental(); + assert!( + io_stats.read_iops < 10, + "read_iops = {}; a full listing was likely used", + io_stats.read_iops + ); + } } diff --git a/rust/lance/src/io/commit/external_manifest.rs b/rust/lance/src/io/commit/external_manifest.rs index d68ef08fad4..df2b84a4878 100644 --- a/rust/lance/src/io/commit/external_manifest.rs +++ b/rust/lance/src/io/commit/external_manifest.rs @@ -267,6 +267,15 @@ mod test { .to_string_lossy() .contains(".manifest#") }) + // The version hint file is expected to be present. + .filter(|entry| { + let entry = entry.as_ref().unwrap(); + !entry + .file_name() + .as_os_str() + .to_string_lossy() + .starts_with("latest_version_hint") + }) .collect::>(); assert!(unexpected_entries.is_empty(), "{:?}", unexpected_entries); } @@ -373,6 +382,15 @@ mod test { .to_string_lossy() .ends_with(".manifest") }) + // The version hint file is expected to be present. + .filter(|entry| { + let entry = entry.as_ref().unwrap(); + !entry + .file_name() + .as_os_str() + .to_string_lossy() + .starts_with("latest_version_hint") + }) .collect::>(); assert!(unexpected_entries.is_empty(), "{:?}", unexpected_entries); } From 07bf0cf30196385f6b3b2fba905a4869cce9fcef Mon Sep 17 00:00:00 2001 From: Heng Ge Date: Tue, 12 May 2026 23:04:39 -0700 Subject: [PATCH 2/9] fix: make uses_version_hint public and update list-call test - Mark uses_version_hint as pub so the doc link from write_version_hint resolves under rustdoc. - Update test_dir_listing_extra_calls_with_migration to expect one fewer listing call: on local FS the __manifest reload now uses the version hint (a HEAD-and-probe on _versions/latest_version_hint.json) instead of a full LIST, so table_exists / describe_table in the migration path now make only the table-directory fallback list call. --- rust/lance-namespace-impls/src/dir.rs | 17 +++++++++-------- rust/lance-table/src/io/commit.rs | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 425993e3956..4b9c69b739a 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -9546,8 +9546,9 @@ mod tests { .await .unwrap(); - // table_exists first checks __manifest (one list on __manifest/_versions), - // then falls back to the table directory (one list_with_delimiter on test_table.lance). + // table_exists first checks __manifest (which on local FS uses the + // version hint and does no list call), then falls back to the table + // directory (one list_with_delimiter on test_table.lance). listing_count.store(0, Ordering::SeqCst); let mut exists_req = TableExistsRequest::new(); @@ -9556,9 +9557,9 @@ mod tests { let count = listing_count.load(Ordering::SeqCst); assert_eq!( - count, 2, - "Expected exactly 2 listing calls for table_exists with migration mode \ - (manifest reload + table directory fallback), but got {}", + count, 1, + "Expected exactly 1 listing call for table_exists with migration mode \ + (table directory fallback; manifest reload uses the version hint), but got {}", count ); @@ -9571,9 +9572,9 @@ mod tests { let count = listing_count.load(Ordering::SeqCst); assert_eq!( - count, 2, - "Expected exactly 2 listing calls for describe_table with migration mode \ - (manifest reload + table directory fallback), but got {}", + count, 1, + "Expected exactly 1 listing call for describe_table with migration mode \ + (table directory fallback; manifest reload uses the version hint), but got {}", count ); } diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 86ba80101ab..1e25f42f058 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -306,7 +306,7 @@ struct VersionHint { /// so the hint would only add a write per commit for nothing. We write (and /// read) it only on stores where listing is not lexicographically ordered — /// S3 Express and the local filesystem. -fn uses_version_hint(object_store: &ObjectStore) -> bool { +pub fn uses_version_hint(object_store: &ObjectStore) -> bool { !object_store.list_is_lexically_ordered } From 8a0abfcc227307c27c31426c81b459a4226374c2 Mon Sep 17 00:00:00 2001 From: Heng Ge Date: Tue, 12 May 2026 23:07:38 -0700 Subject: [PATCH 3/9] bench: concurrent-append throughput vs S3 / S3 Express A new `concurrent_append` benchmark seeds a 100k-row base table then runs N tokio writer tasks that each loop calling `InsertBuilder::execute` on the same dataset. The output records commits/sec, per-commit latency distribution (p50/p90/p95/p99/max/mean), and the final version count, so the version-hint optimisation can be measured against S3 Standard and S3 Express directly. Designed to be driven from a single very large EC2 host so the writer count itself isn't the bottleneck. Configurable via env vars (DATASET_URI, NUM_WRITERS, APPENDS_PER_WRITER, ROWS_PER_APPEND, BASE_ROWS, KEEP_DATASET) and detects S3 Express via the `--x-s3` suffix. --- rust/lance/Cargo.toml | 4 + rust/lance/benches/concurrent_append.rs | 391 ++++++++++++++++++++++++ 2 files changed, 395 insertions(+) create mode 100644 rust/lance/benches/concurrent_append.rs diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 6a6825afd57..1f7c4af0bd1 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -251,5 +251,9 @@ harness = false name = "manifest_commit" harness = false +[[bench]] +name = "concurrent_append" +harness = false + [lints] workspace = true diff --git a/rust/lance/benches/concurrent_append.rs b/rust/lance/benches/concurrent_append.rs new file mode 100644 index 00000000000..7e86f52d671 --- /dev/null +++ b/rust/lance/benches/concurrent_append.rs @@ -0,0 +1,391 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Benchmark for concurrent-append throughput against S3 / S3 Express. +//! +//! Many writers append to the same dataset at once. The output measures how +//! the version-hint optimization affects conflict resolution and overall +//! commit rate as the version count grows. Designed to be run on a single +//! large EC2 instance so the writer count itself isn't the bottleneck. +//! +//! ## Running against S3 Standard +//! +//! ```bash +//! export AWS_REGION=us-east-1 +//! export DATASET_URI=s3://jack-devland-build/bench/concurrent_append +//! export NUM_WRITERS=64 +//! export APPENDS_PER_WRITER=200 +//! cargo bench --bench concurrent_append --release +//! ``` +//! +//! ## Running against S3 Express +//! +//! ```bash +//! export AWS_REGION=us-east-1 +//! export DATASET_URI=s3://jack-lancedb-devland--use1-az24--x-s3/bench/concurrent_append +//! export NUM_WRITERS=64 +//! export APPENDS_PER_WRITER=200 +//! cargo bench --bench concurrent_append --release +//! ``` +//! +//! ## Configuration +//! +//! - `DATASET_URI`: base URI under which a uniquely-named dataset is created. +//! Required. +//! - `NUM_WRITERS`: number of concurrent writers (default 64). +//! - `APPENDS_PER_WRITER`: appends each writer attempts (default 200). +//! - `ROWS_PER_APPEND`: rows per appended batch (default 100). +//! - `BASE_ROWS`: rows in the initial table before concurrent writes begin +//! (default 100_000). +//! - `KEEP_DATASET`: when set to `true`, leaves the dataset in place after +//! the run (default: deleted on S3, kept on local). + +#![allow(clippy::print_stdout, clippy::print_stderr)] + +use arrow_array::{Int64Array, RecordBatch, RecordBatchIterator, StringArray}; +use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use criterion::{Criterion, criterion_group, criterion_main}; +use lance::dataset::{Dataset, InsertBuilder, WriteMode, WriteParams, builder::DatasetBuilder}; +use lance::session::Session; +use lance_io::object_store::{ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor}; +use std::collections::HashMap; +use std::env; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use uuid::Uuid; + +const DEFAULT_NUM_WRITERS: usize = 64; +const DEFAULT_APPENDS_PER_WRITER: usize = 200; +const DEFAULT_ROWS_PER_APPEND: usize = 100; +const DEFAULT_BASE_ROWS: usize = 100_000; + +fn env_usize(key: &str, default: usize) -> usize { + env::var(key) + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(default) +} + +fn env_bool(key: &str) -> bool { + env::var(key) + .map(|s| s.eq_ignore_ascii_case("true")) + .unwrap_or(false) +} + +fn storage_label(uri: &str) -> &'static str { + if uri.contains("--x-s3") { + "s3express" + } else if uri.starts_with("s3://") { + "s3" + } else if uri.starts_with("gs://") { + "gcs" + } else if uri.starts_with("az://") { + "azure" + } else { + "local" + } +} + +fn schema() -> Arc { + Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + ])) +} + +fn batch(start_id: usize, num_rows: usize) -> RecordBatch { + let ids = Int64Array::from_iter_values((start_id as i64)..((start_id + num_rows) as i64)); + let names = StringArray::from_iter_values( + (start_id..(start_id + num_rows)).map(|i| format!("name_{i}")), + ); + RecordBatch::try_new(schema(), vec![Arc::new(ids), Arc::new(names)]).expect("build batch") +} + +/// Storage options that turn on S3 Express when the URI advertises it. +/// +/// S3 Express directory buckets don't support GetBucketLocation, so we also +/// require the caller to set `AWS_REGION` and forward it explicitly. +fn store_params_for(uri: &str) -> Option { + if !uri.contains("--x-s3") { + return None; + } + let region = env::var("AWS_REGION") + .or_else(|_| env::var("AWS_DEFAULT_REGION")) + .expect("AWS_REGION is required when DATASET_URI points at S3 Express"); + let storage_options: HashMap = + [("s3_express", "true"), ("region", region.as_str())] + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + Some(ObjectStoreParams { + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( + storage_options, + ))), + ..Default::default() + }) +} + +fn write_params(session: Arc, store_params: Option) -> WriteParams { + WriteParams { + mode: WriteMode::Append, + session: Some(session), + store_params, + skip_auto_cleanup: true, + ..Default::default() + } +} + +async fn create_base_dataset( + uri: &str, + base_rows: usize, + rows_per_append: usize, + session: Arc, + store_params: Option, +) -> Dataset { + let initial = batch(0, rows_per_append); + let reader = RecordBatchIterator::new(vec![Ok(initial)], schema()); + let create_params = WriteParams { + mode: WriteMode::Create, + session: Some(session.clone()), + store_params: store_params.clone(), + skip_auto_cleanup: true, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, uri, Some(create_params)) + .await + .expect("create base dataset"); + + // Top up to BASE_ROWS in chunks so we don't allocate one huge batch. + let chunk = 10_000.min(base_rows); + let mut written = rows_per_append; + while written < base_rows { + let to_write = chunk.min(base_rows - written); + let batch = batch(written, to_write); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema()); + let params = write_params(session.clone(), store_params.clone()); + dataset = Dataset::write(reader, uri, Some(params)) + .await + .expect("seed appends"); + written += to_write; + } + dataset +} + +struct WriterStats { + successes: usize, + failures: usize, + latencies: Vec, +} + +async fn run_writer( + writer_id: usize, + uri: String, + appends: usize, + rows_per_append: usize, + session: Arc, + store_params: Option, +) -> WriterStats { + // Each writer keeps its own dataset handle; CommitBuilder rebases on + // conflict so we don't need to manually reload between appends. + let mut dataset = Arc::new( + DatasetBuilder::from_uri(&uri) + .with_session(session.clone()) + .load() + .await + .expect("writer load"), + ); + + let mut stats = WriterStats { + successes: 0, + failures: 0, + latencies: Vec::with_capacity(appends), + }; + + // Disjoint id ranges per writer so the data inserted is identifiable. + let id_base = 1_000_000 + writer_id * appends * rows_per_append; + for i in 0..appends { + let batch = batch(id_base + i * rows_per_append, rows_per_append); + let params = write_params(session.clone(), store_params.clone()); + let start = Instant::now(); + let result = InsertBuilder::new(dataset.clone()) + .with_params(¶ms) + .execute(vec![batch]) + .await; + let elapsed = start.elapsed(); + match result { + Ok(new_ds) => { + stats.successes += 1; + stats.latencies.push(elapsed); + dataset = Arc::new(new_ds); + } + Err(e) => { + stats.failures += 1; + eprintln!("writer {writer_id} append {i} failed after {elapsed:?}: {e}"); + // Reload and keep going so a single failure doesn't end the run. + dataset = Arc::new( + DatasetBuilder::from_uri(&uri) + .with_session(session.clone()) + .load() + .await + .expect("writer reload after error"), + ); + } + } + } + stats +} + +fn percentile(sorted: &[Duration], p: f64) -> Duration { + if sorted.is_empty() { + return Duration::ZERO; + } + let idx = ((sorted.len() as f64 - 1.0) * p).round() as usize; + sorted[idx.min(sorted.len() - 1)] +} + +fn ms(d: Duration) -> f64 { + d.as_secs_f64() * 1000.0 +} + +fn bench_concurrent_append(c: &mut Criterion) { + let dataset_base = + env::var("DATASET_URI").expect("DATASET_URI is required for concurrent_append bench"); + let num_writers = env_usize("NUM_WRITERS", DEFAULT_NUM_WRITERS); + let appends_per_writer = env_usize("APPENDS_PER_WRITER", DEFAULT_APPENDS_PER_WRITER); + let rows_per_append = env_usize("ROWS_PER_APPEND", DEFAULT_ROWS_PER_APPEND); + let base_rows = env_usize("BASE_ROWS", DEFAULT_BASE_ROWS); + let keep_dataset = env_bool("KEEP_DATASET"); + + let uri = format!( + "{}/concurrent_append_{}", + dataset_base.trim_end_matches('/'), + &Uuid::new_v4().to_string()[..8] + ); + let label = storage_label(&uri); + let store_params = store_params_for(&uri); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("build tokio runtime"); + + println!("=== Concurrent Append Benchmark ==="); + println!("Storage: {uri} ({label})"); + println!( + "Writers: {num_writers}, appends/writer: {appends_per_writer}, rows/append: {rows_per_append}" + ); + println!("Base rows: {base_rows}, keep_dataset: {keep_dataset}"); + println!(); + + // Share one ObjectStoreRegistry so all writers reuse warm TCP/TLS sessions. + let registry = Arc::new(ObjectStoreRegistry::default()); + let session = Arc::new(Session::new(0, 0, registry)); + + println!("Seeding base dataset ({base_rows} rows)..."); + let seed_start = Instant::now(); + let base_dataset = runtime.block_on(create_base_dataset( + &uri, + base_rows, + rows_per_append, + session.clone(), + store_params.clone(), + )); + let starting_version = base_dataset.manifest().version; + println!( + "Base dataset ready in {:.2}s at version {starting_version}", + seed_start.elapsed().as_secs_f64() + ); + + println!("Starting {num_writers} concurrent writers..."); + let wall_start = Instant::now(); + let all_stats: Vec = runtime.block_on(async { + let mut tasks = Vec::with_capacity(num_writers); + for writer_id in 0..num_writers { + let uri = uri.clone(); + let session = session.clone(); + let store_params = store_params.clone(); + tasks.push(tokio::spawn(async move { + run_writer( + writer_id, + uri, + appends_per_writer, + rows_per_append, + session, + store_params, + ) + .await + })); + } + let mut out = Vec::with_capacity(num_writers); + for t in tasks { + out.push(t.await.expect("writer task panicked")); + } + out + }); + let wall = wall_start.elapsed(); + + let total_attempts = all_stats + .iter() + .map(|s| s.successes + s.failures) + .sum::(); + let total_success = all_stats.iter().map(|s| s.successes).sum::(); + let total_failed = all_stats.iter().map(|s| s.failures).sum::(); + let mut latencies: Vec = all_stats + .into_iter() + .flat_map(|s| s.latencies.into_iter()) + .collect(); + latencies.sort(); + + let throughput = total_success as f64 / wall.as_secs_f64(); + + println!(); + println!("=== Results ==="); + println!("Wall time: {:.2}s", wall.as_secs_f64()); + println!( + "Commits: {total_success} succeeded, {total_failed} failed out of {total_attempts} attempts" + ); + println!("Throughput: {throughput:.2} commits/sec"); + if !latencies.is_empty() { + let mean = latencies.iter().map(|d| d.as_secs_f64()).sum::() / latencies.len() as f64; + println!( + "Commit latency (per writer, includes any retries): \ + p50={:.2}ms p90={:.2}ms p95={:.2}ms p99={:.2}ms max={:.2}ms mean={:.2}ms", + ms(percentile(&latencies, 0.50)), + ms(percentile(&latencies, 0.90)), + ms(percentile(&latencies, 0.95)), + ms(percentile(&latencies, 0.99)), + ms(*latencies.last().unwrap()), + mean * 1000.0, + ); + } + + let final_dataset = runtime.block_on(async { + DatasetBuilder::from_uri(&uri) + .with_session(session.clone()) + .load() + .await + .expect("final load") + }); + println!( + "Final dataset version: {} (started at {})", + final_dataset.manifest().version, + starting_version + ); + + // Pin the numbers into criterion so they show up in regression tracking. + let mut group = c.benchmark_group(format!("concurrent_append_{label}")); + group.bench_function("commits_per_sec", |b| b.iter(|| throughput)); + group.bench_function("p50_ms", |b| b.iter(|| ms(percentile(&latencies, 0.50)))); + group.bench_function("p99_ms", |b| b.iter(|| ms(percentile(&latencies, 0.99)))); + group.finish(); + + if !keep_dataset && label == "local" { + let _ = std::fs::remove_dir_all(&uri); + println!("Local dataset removed: {uri}"); + } else { + println!("Dataset preserved: {uri}"); + } +} + +criterion_group!(benches, bench_concurrent_append); +criterion_main!(benches); From c48d005f3ffa3a3f691d435b6b618f29a8971526 Mon Sep 17 00:00:00 2001 From: Heng Ge Date: Thu, 14 May 2026 00:08:40 -0700 Subject: [PATCH 4/9] bench: allow concurrent_append to start from a fully empty table Setting BASE_ROWS=0 now creates the dataset with a single zero-row batch so writers begin at version 1 with no data, instead of the previous ~100k-row seed. --- rust/lance/benches/concurrent_append.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/rust/lance/benches/concurrent_append.rs b/rust/lance/benches/concurrent_append.rs index 7e86f52d671..9ca21345eec 100644 --- a/rust/lance/benches/concurrent_append.rs +++ b/rust/lance/benches/concurrent_append.rs @@ -142,7 +142,14 @@ async fn create_base_dataset( session: Arc, store_params: Option, ) -> Dataset { - let initial = batch(0, rows_per_append); + // When `base_rows == 0` the dataset starts empty: one create commit with a + // zero-row batch so the writers begin at version 1 with no data. + let initial_rows = if base_rows == 0 { + 0 + } else { + rows_per_append.min(base_rows) + }; + let initial = batch(0, initial_rows); let reader = RecordBatchIterator::new(vec![Ok(initial)], schema()); let create_params = WriteParams { mode: WriteMode::Create, @@ -157,7 +164,7 @@ async fn create_base_dataset( // Top up to BASE_ROWS in chunks so we don't allocate one huge batch. let chunk = 10_000.min(base_rows); - let mut written = rows_per_append; + let mut written = initial_rows; while written < base_rows { let to_write = chunk.min(base_rows - written); let batch = batch(written, to_write); From 7f7daab1e2dc467923818e6ad2d0d6450b79d99f Mon Sep 17 00:00:00 2001 From: Heng Ge Date: Thu, 14 May 2026 00:15:15 -0700 Subject: [PATCH 5/9] feat: LANCE_USE_VERSION_HINT env var to disable the hint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The hint is now controlled by a process-wide env var (read once via OnceLock) that overrides every store-type check. Setting LANCE_USE_VERSION_HINT=0 (or false / off) makes write_version_hint a no-op, makes current_manifest_path skip the hint probe, and makes CommitHandler::list_manifest_locations_since fall back to the listing path on every store — so the same binary can be benchmarked with and without the optimization, and operators have a clear escape hatch if it ever misbehaves. --- docs/src/format/table/layout.md | 1 + rust/lance-table/src/io/commit.rs | 24 +++++++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/docs/src/format/table/layout.md b/docs/src/format/table/layout.md index 8a7026a8951..10ec82c654c 100644 --- a/docs/src/format/table/layout.md +++ b/docs/src/format/table/layout.md @@ -213,4 +213,5 @@ To avoid this, writers on such stores write `_versions/latest_version_hint.json` Readers use the hint as a starting point and probe a few higher versions with HEAD requests to find the true latest, falling back to a full listing if the hint is missing (older datasets) or stale. The hint is purely an optimization: it never affects correctness, can be safely deleted, and is ignored by readers that don't understand it. +Set the environment variable `LANCE_USE_VERSION_HINT=0` (or `false`) to globally disable the hint — useful for benchmarks and as an escape hatch. diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 1e25f42f058..a6cc42c9c2d 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -299,15 +299,33 @@ struct VersionHint { version: u64, } +/// Set `LANCE_USE_VERSION_HINT=0` (or `false`) to globally disable the version +/// hint — writers stop emitting the hint file and readers stop consulting it, +/// falling back to plain listing. Intended as a benchmark/escape-hatch knob; +/// the hint is on by default. +const VERSION_HINT_ENV: &str = "LANCE_USE_VERSION_HINT"; + +fn version_hint_globally_enabled() -> bool { + static ENABLED: std::sync::OnceLock = std::sync::OnceLock::new(); + *ENABLED.get_or_init(|| match std::env::var(VERSION_HINT_ENV) { + Ok(v) => !matches!( + v.trim().to_ascii_lowercase().as_str(), + "0" | "false" | "off" + ), + Err(_) => true, + }) +} + /// Whether this object store benefits from a version hint. /// /// On stores where listing is lexicographically ordered (S3 Standard, GCS, /// Azure, ...) the latest version is already resolved in roughly one request, /// so the hint would only add a write per commit for nothing. We write (and /// read) it only on stores where listing is not lexicographically ordered — -/// S3 Express and the local filesystem. +/// S3 Express and the local filesystem. Can be force-disabled with +/// [`VERSION_HINT_ENV`]. pub fn uses_version_hint(object_store: &ObjectStore) -> bool { - !object_store.list_is_lexically_ordered + version_hint_globally_enabled() && !object_store.list_is_lexically_ordered } /// Path to the JSON version hint file for a dataset. @@ -865,7 +883,7 @@ pub trait CommitHandler: Debug + Send + Sync { object_store: &'a ObjectStore, since_version: u64, ) -> BoxStream<'a, Result> { - if object_store.list_is_lexically_ordered { + if !uses_version_hint(object_store) { return self .list_manifest_locations(base_path, object_store, true) .try_take_while(move |loc| future::ready(Ok(loc.version > since_version))) From 6f0cdce5b2bc896d74f9c5bfa984514ab80957e9 Mon Sep 17 00:00:00 2001 From: Heng Ge Date: Thu, 14 May 2026 00:59:42 -0700 Subject: [PATCH 6/9] bench: add MAX_WALL_SECS time-budget cap for concurrent_append Lets each writer stop after a wall-clock budget instead of always finishing APPENDS_PER_WRITER commits, so high-concurrency runs (where contention drags per-commit latency up) don't run unbounded. --- rust/lance/benches/concurrent_append.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/rust/lance/benches/concurrent_append.rs b/rust/lance/benches/concurrent_append.rs index 9ca21345eec..1c97ce3a738 100644 --- a/rust/lance/benches/concurrent_append.rs +++ b/rust/lance/benches/concurrent_append.rs @@ -189,6 +189,7 @@ async fn run_writer( uri: String, appends: usize, rows_per_append: usize, + deadline: Option, session: Arc, store_params: Option, ) -> WriterStats { @@ -211,6 +212,11 @@ async fn run_writer( // Disjoint id ranges per writer so the data inserted is identifiable. let id_base = 1_000_000 + writer_id * appends * rows_per_append; for i in 0..appends { + if let Some(d) = deadline + && Instant::now() >= d + { + break; + } let batch = batch(id_base + i * rows_per_append, rows_per_append); let params = write_params(session.clone(), store_params.clone()); let start = Instant::now(); @@ -262,6 +268,11 @@ fn bench_concurrent_append(c: &mut Criterion) { let rows_per_append = env_usize("ROWS_PER_APPEND", DEFAULT_ROWS_PER_APPEND); let base_rows = env_usize("BASE_ROWS", DEFAULT_BASE_ROWS); let keep_dataset = env_bool("KEEP_DATASET"); + // Per-writer wall-clock budget. When non-zero, each writer stops looping + // once this many seconds have elapsed since the run started, even if it + // hasn't issued `APPENDS_PER_WRITER` commits yet. Lets us bound run time + // at high concurrency where conflict retries make commits arbitrarily slow. + let max_wall_secs = env_usize("MAX_WALL_SECS", 0); let uri = format!( "{}/concurrent_append_{}", @@ -305,6 +316,14 @@ fn bench_concurrent_append(c: &mut Criterion) { println!("Starting {num_writers} concurrent writers..."); let wall_start = Instant::now(); + let deadline = + (max_wall_secs > 0).then(|| wall_start + Duration::from_secs(max_wall_secs as u64)); + if let Some(d) = deadline { + println!( + "Per-writer wall budget: {max_wall_secs}s (deadline {:?} from now)", + d.duration_since(wall_start) + ); + } let all_stats: Vec = runtime.block_on(async { let mut tasks = Vec::with_capacity(num_writers); for writer_id in 0..num_writers { @@ -317,6 +336,7 @@ fn bench_concurrent_append(c: &mut Criterion) { uri, appends_per_writer, rows_per_append, + deadline, session, store_params, ) From 90ee1825a9c4e7e1b3658c358b0ae6e491249fce Mon Sep 17 00:00:00 2001 From: Heng Ge Date: Thu, 14 May 2026 01:51:13 -0700 Subject: [PATCH 7/9] bench: add PER_ATTEMPT_TIMEOUT_SECS to cap commit-attempt wall time Lets the driver bound a run's total wall to MAX_WALL_SECS + per-attempt timeout, even when contention pushes a single commit attempt's retry chain past several minutes. --- rust/lance/benches/concurrent_append.rs | 43 ++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/rust/lance/benches/concurrent_append.rs b/rust/lance/benches/concurrent_append.rs index 1c97ce3a738..7d4eecac536 100644 --- a/rust/lance/benches/concurrent_append.rs +++ b/rust/lance/benches/concurrent_append.rs @@ -190,6 +190,7 @@ async fn run_writer( appends: usize, rows_per_append: usize, deadline: Option, + per_attempt_timeout: Option, session: Arc, store_params: Option, ) -> WriterStats { @@ -220,10 +221,34 @@ async fn run_writer( let batch = batch(id_base + i * rows_per_append, rows_per_append); let params = write_params(session.clone(), store_params.clone()); let start = Instant::now(); - let result = InsertBuilder::new(dataset.clone()) - .with_params(¶ms) - .execute(vec![batch]) - .await; + // Per-attempt cap keeps the slow-tail commits from extending the run + // far past the writer-side deadline at high concurrency. + let result = match per_attempt_timeout { + Some(t) => { + let ds = dataset.clone(); + let params_ref = ¶ms; + match tokio::time::timeout(t, async move { + InsertBuilder::new(ds) + .with_params(params_ref) + .execute(vec![batch]) + .await + }) + .await + { + Ok(r) => r, + Err(_) => Err(lance_core::Error::io_source(Box::new(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "per-attempt timeout", + )))), + } + } + None => { + InsertBuilder::new(dataset.clone()) + .with_params(¶ms) + .execute(vec![batch]) + .await + } + }; let elapsed = start.elapsed(); match result { Ok(new_ds) => { @@ -273,6 +298,10 @@ fn bench_concurrent_append(c: &mut Criterion) { // hasn't issued `APPENDS_PER_WRITER` commits yet. Lets us bound run time // at high concurrency where conflict retries make commits arbitrarily slow. let max_wall_secs = env_usize("MAX_WALL_SECS", 0); + // Per-attempt timeout. Caps any single commit attempt (including its + // internal retries) so the slow-tail of an under-contention commit doesn't + // extend the run past the writer deadline. 0 disables it. + let per_attempt_timeout_secs = env_usize("PER_ATTEMPT_TIMEOUT_SECS", 0); let uri = format!( "{}/concurrent_append_{}", @@ -324,6 +353,11 @@ fn bench_concurrent_append(c: &mut Criterion) { d.duration_since(wall_start) ); } + let per_attempt_timeout = (per_attempt_timeout_secs > 0) + .then(|| Duration::from_secs(per_attempt_timeout_secs as u64)); + if let Some(t) = per_attempt_timeout { + println!("Per-attempt timeout: {:?}", t); + } let all_stats: Vec = runtime.block_on(async { let mut tasks = Vec::with_capacity(num_writers); for writer_id in 0..num_writers { @@ -337,6 +371,7 @@ fn bench_concurrent_append(c: &mut Criterion) { appends_per_writer, rows_per_append, deadline, + per_attempt_timeout, session, store_params, ) From 57bb6634608107499cc5ded6a3ef7621c9fefa75 Mon Sep 17 00:00:00 2001 From: Heng Ge Date: Thu, 14 May 2026 08:13:58 -0700 Subject: [PATCH 8/9] fix(bench): allow too_many_arguments on run_writer; doc: drop private link CI clippy was tripping on the 8-arg run_writer signature; tag it explicitly. rustdoc was rejecting the link from public uses_version_hint to the private VERSION_HINT_ENV const, so inline the env-var name instead. --- rust/lance-table/src/io/commit.rs | 4 ++-- rust/lance/benches/concurrent_append.rs | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index a6cc42c9c2d..5dbf62002fa 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -322,8 +322,8 @@ fn version_hint_globally_enabled() -> bool { /// Azure, ...) the latest version is already resolved in roughly one request, /// so the hint would only add a write per commit for nothing. We write (and /// read) it only on stores where listing is not lexicographically ordered — -/// S3 Express and the local filesystem. Can be force-disabled with -/// [`VERSION_HINT_ENV`]. +/// S3 Express and the local filesystem. Can be force-disabled with the +/// `LANCE_USE_VERSION_HINT=0` environment variable. pub fn uses_version_hint(object_store: &ObjectStore) -> bool { version_hint_globally_enabled() && !object_store.list_is_lexically_ordered } diff --git a/rust/lance/benches/concurrent_append.rs b/rust/lance/benches/concurrent_append.rs index 7d4eecac536..ac7cf3f610f 100644 --- a/rust/lance/benches/concurrent_append.rs +++ b/rust/lance/benches/concurrent_append.rs @@ -184,6 +184,7 @@ struct WriterStats { latencies: Vec, } +#[allow(clippy::too_many_arguments)] async fn run_writer( writer_id: usize, uri: String, From a1916157184f93070e6c79d576a29a08fca7c1c7 Mon Sep 17 00:00:00 2001 From: Heng Ge Date: Fri, 15 May 2026 01:30:29 -0700 Subject: [PATCH 9/9] docs(spec): tighten version-hint section per review Drop the env-var mention (implementation detail) and drop the specific non-lex store examples; describe what the file is and the contract readers can rely on, not which stores choose to write it. --- docs/src/format/table/layout.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/src/format/table/layout.md b/docs/src/format/table/layout.md index 10ec82c654c..7b08ce0a0dd 100644 --- a/docs/src/format/table/layout.md +++ b/docs/src/format/table/layout.md @@ -204,14 +204,13 @@ See [Manifest Naming Schemes](transaction.md#manifest-naming-schemes) for detail ### Version Hint -On object stores where listing is not lexicographically ordered (e.g. S3 Express, the local filesystem), finding the latest version by listing `_versions/` is O(n) in the number of versions. -To avoid this, writers on such stores write `_versions/latest_version_hint.json` after each successful commit: +The optional file `_versions/latest_version_hint.json` records the latest committed version as JSON: ```json {"version": 42} ``` -Readers use the hint as a starting point and probe a few higher versions with HEAD requests to find the true latest, falling back to a full listing if the hint is missing (older datasets) or stale. -The hint is purely an optimization: it never affects correctness, can be safely deleted, and is ignored by readers that don't understand it. -Set the environment variable `LANCE_USE_VERSION_HINT=0` (or `false`) to globally disable the hint — useful for benchmarks and as an escape hatch. +It exists to accelerate latest-version discovery on stores where listing `_versions/` is expensive: a reader can read the hint and probe higher versions with HEAD requests instead of listing the whole directory, falling back to a full listing if the hint is missing or stale. + +The hint is purely an optimization. It is always safe to delete, never affects correctness, and can be ignored by readers that don't understand it. Writers may choose not to write it.