diff --git a/docs/src/format/table/layout.md b/docs/src/format/table/layout.md index 46efa56a908..7b08ce0a0dd 100644 --- a/docs/src/format/table/layout.md +++ b/docs/src/format/table/layout.md @@ -20,7 +20,8 @@ A Lance dataset in its basic form stores all files within the dataset root direc data/ *.lance -- Data files containing column data _versions/ - *.manifest -- Manifest files (one per version) + *.manifest -- Manifest files (one per version) + latest_version_hint.json -- Optional hint of the latest version (see below) _transactions/ *.txn -- Transaction files for commit coordination _deletions/ @@ -201,3 +202,15 @@ Manifest files are stored in the `_versions/` directory with naming schemes that See [Manifest Naming Schemes](transaction.md#manifest-naming-schemes) for details on the V1 and V2 patterns and their implications for version discovery. +### Version Hint + +The optional file `_versions/latest_version_hint.json` records the latest committed version as JSON: + +```json +{"version": 42} +``` + +It exists to accelerate latest-version discovery on stores where listing `_versions/` is expensive: a reader can read the hint and probe higher versions with HEAD requests instead of listing the whole directory, falling back to a full listing if the hint is missing or stale. + +The hint is purely an optimization. It is always safe to delete, never affects correctness, and can be ignored by readers that don't understand it. Writers may choose not to write it. + diff --git a/java/src/test/java/org/lance/DatasetTest.java b/java/src/test/java/org/lance/DatasetTest.java index c01154b0b71..315b010da1e 100644 --- a/java/src/test/java/org/lance/DatasetTest.java +++ b/java/src/test/java/org/lance/DatasetTest.java @@ -528,7 +528,12 @@ void testOpenSerializedManifest(@TempDir Path tempDir) throws IOException { assertEquals(1, dataset1.version()); Path manifestPath = datasetPath.resolve("_versions"); try (Stream fileStream = Files.list(manifestPath)) { - assertEquals(1, fileStream.count()); + // Ignore the version hint file, which is not a manifest. + assertEquals( + 1, + fileStream + .filter(p -> !p.getFileName().toString().startsWith("latest_version_hint")) + .count()); ByteBuffer manifestBuffer = readManifest(manifestPath.resolve("1.manifest")); try (Dataset dataset2 = testDataset.write(1, 5)) { assertEquals(2, dataset2.version()); diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 428b85095ce..dc2030c4b2b 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -436,18 +436,27 @@ def test_has_stable_row_ids_property(tmp_path: Path): assert lance.dataset(non_stable_path).has_stable_row_ids is False +def _list_manifests(versions_dir): + # Ignore the version hint file, which is not a manifest. + return [ + name + for name in os.listdir(versions_dir) + if not name.startswith("latest_version_hint") + ] + + def test_v2_manifest_paths(tmp_path: Path): lance.write_dataset( pa.table({"a": range(100)}), tmp_path, enable_v2_manifest_paths=True ) - manifest_path = os.listdir(tmp_path / "_versions") + manifest_path = _list_manifests(tmp_path / "_versions") assert len(manifest_path) == 1 assert re.match(r"\d{20}\.manifest", manifest_path[0]) def test_default_v2_manifest_paths(tmp_path: Path): lance.write_dataset(pa.table({"a": range(100)}), tmp_path) - manifest_path = os.listdir(tmp_path / "_versions") + manifest_path = _list_manifests(tmp_path / "_versions") assert len(manifest_path) == 1 assert re.match(r"\d{20}\.manifest", manifest_path[0]) @@ -457,12 +466,12 @@ def test_v2_manifest_paths_migration(tmp_path: Path): lance.write_dataset( pa.table({"a": range(100)}), tmp_path, enable_v2_manifest_paths=False ) - manifest_path = os.listdir(tmp_path / "_versions") + manifest_path = _list_manifests(tmp_path / "_versions") assert manifest_path == ["1.manifest"] # Migrate to v2 manifest paths lance.dataset(tmp_path).migrate_manifest_paths_v2() - manifest_path = os.listdir(tmp_path / "_versions") + manifest_path = _list_manifests(tmp_path / "_versions") assert len(manifest_path) == 1 assert re.match(r"\d{20}\.manifest", manifest_path[0]) diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 425993e3956..4b9c69b739a 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -9546,8 +9546,9 @@ mod tests { .await .unwrap(); - // table_exists first checks __manifest (one list on __manifest/_versions), - // then falls back to the table directory (one list_with_delimiter on test_table.lance). + // table_exists first checks __manifest (which on local FS uses the + // version hint and does no list call), then falls back to the table + // directory (one list_with_delimiter on test_table.lance). listing_count.store(0, Ordering::SeqCst); let mut exists_req = TableExistsRequest::new(); @@ -9556,9 +9557,9 @@ mod tests { let count = listing_count.load(Ordering::SeqCst); assert_eq!( - count, 2, - "Expected exactly 2 listing calls for table_exists with migration mode \ - (manifest reload + table directory fallback), but got {}", + count, 1, + "Expected exactly 1 listing call for table_exists with migration mode \ + (table directory fallback; manifest reload uses the version hint), but got {}", count ); @@ -9571,9 +9572,9 @@ mod tests { let count = listing_count.load(Ordering::SeqCst); assert_eq!( - count, 2, - "Expected exactly 2 listing calls for describe_table with migration mode \ - (manifest reload + table directory fallback), but got {}", + count, 1, + "Expected exactly 1 listing call for describe_table with migration mode \ + (table directory fallback; manifest reload uses the version hint), but got {}", count ); } diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index c90909d7db5..5dbf62002fa 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -70,6 +70,13 @@ use { pub const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; const DETACHED_VERSION_PREFIX: &str = "d"; +/// File name for the JSON version hint file, stored under `_versions/`. +/// +/// The file contains `{"version":N}` where `N` is the latest committed version +/// at the time of writing. It enables O(1)/O(k) latest-version lookup via HEAD +/// requests on object stores where listing is not lexicographically ordered +/// (e.g. S3 Express, local filesystem) instead of an O(n) listing. +const VERSION_HINT_FILE: &str = "latest_version_hint.json"; /// How manifest files should be named. #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -260,17 +267,292 @@ impl TryFrom for ManifestLocation { } } -/// Get the latest manifest path +/// Get the latest manifest path. +/// +/// - Local filesystem: a single directory read. +/// - Stores where listing is not lexicographically ordered (e.g. S3 Express): +/// the version hint (read the hint file, then probe higher versions with +/// HEADs), falling back to a listing if the hint is missing or stale. A full +/// listing on these stores is O(n) in the number of versions. +/// - Lexicographically ordered stores (e.g. S3 Standard, GCS): the listing +/// already resolves the latest version in roughly one request. async fn current_manifest_path( object_store: &ObjectStore, base: &Path, ) -> Result { - if object_store.is_local() - && let Ok(Some(location)) = current_manifest_local(base) + if object_store.is_local() { + if let Ok(Some(location)) = current_manifest_local(base) { + return Ok(location); + } + } else if uses_version_hint(object_store) + && let Some(location) = read_version_hint_and_probe(object_store, base).await { return Ok(location); } + resolve_version_from_listing(object_store, base).await +} + +/// JSON body of the version hint file: `{"version":N}`. +#[derive(serde::Serialize, serde::Deserialize)] +struct VersionHint { + version: u64, +} + +/// Set `LANCE_USE_VERSION_HINT=0` (or `false`) to globally disable the version +/// hint — writers stop emitting the hint file and readers stop consulting it, +/// falling back to plain listing. Intended as a benchmark/escape-hatch knob; +/// the hint is on by default. +const VERSION_HINT_ENV: &str = "LANCE_USE_VERSION_HINT"; + +fn version_hint_globally_enabled() -> bool { + static ENABLED: std::sync::OnceLock = std::sync::OnceLock::new(); + *ENABLED.get_or_init(|| match std::env::var(VERSION_HINT_ENV) { + Ok(v) => !matches!( + v.trim().to_ascii_lowercase().as_str(), + "0" | "false" | "off" + ), + Err(_) => true, + }) +} + +/// Whether this object store benefits from a version hint. +/// +/// On stores where listing is lexicographically ordered (S3 Standard, GCS, +/// Azure, ...) the latest version is already resolved in roughly one request, +/// so the hint would only add a write per commit for nothing. We write (and +/// read) it only on stores where listing is not lexicographically ordered — +/// S3 Express and the local filesystem. Can be force-disabled with the +/// `LANCE_USE_VERSION_HINT=0` environment variable. +pub fn uses_version_hint(object_store: &ObjectStore) -> bool { + version_hint_globally_enabled() && !object_store.list_is_lexically_ordered +} + +/// Path to the JSON version hint file for a dataset. +fn version_hint_path(base: &Path) -> Path { + base.clone().join(VERSIONS_DIR).join(VERSION_HINT_FILE) +} + +/// Write the version hint file after a successful commit. +/// +/// The hint is stored as JSON: `{"version":N}`. This write is best-effort — +/// failures are logged and ignored, since the hint only accelerates reads and +/// never affects correctness (readers verify the hinted version and probe +/// upward from there). It is a no-op for detached versions and for stores that +/// do not benefit from a hint (see [`uses_version_hint`]). +pub async fn write_version_hint(object_store: &ObjectStore, base: &Path, version: u64) { + if is_detached_version(version) || !uses_version_hint(object_store) { + return; + } + let hint_path = version_hint_path(base); + let content = serde_json::to_vec(&VersionHint { version }).expect("serialize version hint"); + if let Err(e) = object_store.put(&hint_path, content.as_slice()).await { + warn!("Failed to write version hint file for version {version}: {e}"); + } +} + +/// Read the latest version from the hint file, or `None` if it does not exist +/// or cannot be parsed. +async fn read_version_from_hint(object_store: &ObjectStore, base: &Path) -> Option { + let bytes = object_store + .inner + .get(&version_hint_path(base)) + .await + .ok()? + .bytes() + .await + .ok()?; + Some(serde_json::from_slice::(&bytes).ok()?.version) +} + +/// Read the version hint and probe upward to find the true latest manifest. +/// +/// Returns `None` if the hint file is missing, the hinted version no longer +/// exists, or any error occurred — callers should fall back to listing. +async fn read_version_hint_and_probe( + object_store: &ObjectStore, + base: &Path, +) -> Option { + let hint_version = read_version_from_hint(object_store, base).await?; + let (version, scheme, mut probed) = probe_versions_upward(object_store, base, hint_version) + .await + .ok() + .flatten()?; + // `probed` is non-empty and its last entry is the highest version found. + let (_, meta) = probed.pop()?; + Some(ManifestLocation { + version, + path: scheme.manifest_path(base, version), + size: Some(meta.size), + naming_scheme: scheme, + e_tag: meta.e_tag, + }) +} + +/// Maximum version gap between the hint and the read version for which we use +/// the hint-based parallel-HEAD path; beyond this a single (paginated) listing +/// is cheaper, so callers fall back to it. +const MAX_HINT_PROBE_GAP: u64 = 1000; + +/// Probe `from_version`, then `from_version + 1`, `+ 2`, ... with HEAD requests +/// until one is not found. +/// +/// Assumes attached versions are contiguous above `from_version` (true in +/// practice: every commit increments by one, and cleanup only removes *old* +/// versions, never ones newer than the latest). A `NotFound` therefore marks +/// the end of the history. +/// +/// - `Ok(Some((true_latest_version, naming_scheme, [(version, meta), ...])))`: +/// the vec covers every version from `from_version` through the true latest +/// in ascending order. +/// - `Ok(None)`: `from_version` itself does not exist (a `NotFound` for both +/// naming schemes) — i.e. the hint pointed past the end. +/// - `Err(_)`: a transient object-store error was hit, so the probed range may +/// be incomplete; callers should fall back to a full listing rather than +/// trust a possibly-stale result. +async fn probe_versions_upward( + object_store: &ObjectStore, + base: &Path, + from_version: u64, +) -> Result< + Option<( + u64, + ManifestNamingScheme, + Vec<(u64, object_store::ObjectMeta)>, + )>, +> { + // Newer datasets use V2; fall back to V1 if the V2 path is not found. + let mut scheme = ManifestNamingScheme::V2; + let meta = match object_store + .inner + .head(&scheme.manifest_path(base, from_version)) + .await + { + Ok(meta) => meta, + Err(ObjectStoreError::NotFound { .. }) => { + scheme = ManifestNamingScheme::V1; + match object_store + .inner + .head(&scheme.manifest_path(base, from_version)) + .await + { + Ok(meta) => meta, + Err(ObjectStoreError::NotFound { .. }) => return Ok(None), + Err(e) => return Err(e.into()), + } + } + Err(e) => return Err(e.into()), + }; + + let mut probed = vec![(from_version, meta)]; + let mut version = from_version; + loop { + let next = version + 1; + match object_store + .inner + .head(&scheme.manifest_path(base, next)) + .await + { + Ok(meta) => { + probed.push((next, meta)); + version = next; + } + // NotFound means we found the latest version. + Err(ObjectStoreError::NotFound { .. }) => break, + // A transient error means a newer version might exist that we + // failed to observe — surface it so callers fall back to listing. + Err(e) => return Err(e.into()), + } + } + Ok(Some((version, scheme, probed))) +} + +/// List manifest locations with version `> since_version` using the version +/// hint, in descending order of version. +/// +/// Returns `None` if the hint is missing or stale enough that this is not +/// usable — callers should fall back to a full listing. `Some(vec![])` is the +/// fast path where the hint confirms there are no new versions. +async fn list_manifests_since_version_with_hint( + object_store: &ObjectStore, + base: &Path, + since_version: u64, +) -> Option> { + let hint_version = read_version_from_hint(object_store, base).await?; + + // A reader that is very far behind is cheaper to serve with one paginated + // listing than with thousands of HEADs. + if hint_version.saturating_sub(since_version) > MAX_HINT_PROBE_GAP { + return None; + } + + // If the hint is not newer than the read version, the only versions that + // could exist are right above it; otherwise start at the hint. + let probe_from = if hint_version > since_version { + hint_version + } else { + since_version + 1 + }; + + let (scheme, probed) = match probe_versions_upward(object_store, base, probe_from).await { + Ok(Some((_true_latest, scheme, probed))) => (scheme, probed), + // Nothing at `probe_from`. If we were probing from the hint, the hint + // is stale — bail to a full listing. If we were probing from + // `since_version + 1`, there are simply no new versions. + Ok(None) if hint_version > since_version => return None, + Ok(None) => return Some(Vec::new()), + // Transient error: don't trust the hint path, fall back to listing. + Err(_) => return None, + }; + + let mut locations: Vec = probed + .into_iter() + .filter(|(v, _)| *v > since_version) + .map(|(version, meta)| ManifestLocation { + version, + path: scheme.manifest_path(base, version), + size: Some(meta.size), + naming_scheme: scheme, + e_tag: meta.e_tag, + }) + .collect(); + + // Fill the gap between `since_version` and the hint with HEADs (the probe + // above already covered `hint_version` and up). The range is contiguous, so + // any error here (including a `NotFound`) means we can't trust the hint path + // — fall back to a full listing. + if hint_version > since_version + 1 { + let gap_locations: Vec = + futures::stream::iter((since_version + 1)..hint_version) + .map(|version| async move { + object_store + .inner + .head(&scheme.manifest_path(base, version)) + .await + .map(|meta| ManifestLocation { + version, + path: scheme.manifest_path(base, version), + size: Some(meta.size), + naming_scheme: scheme, + e_tag: meta.e_tag, + }) + }) + .buffer_unordered(object_store.io_parallelism()) + .try_collect() + .await + .ok()?; + locations.extend(gap_locations); + } + + locations.sort_by_key(|loc| std::cmp::Reverse(loc.version)); + Some(locations) +} + +/// Resolve the latest manifest by listing the versions directory. +async fn resolve_version_from_listing( + object_store: &ObjectStore, + base: &Path, +) -> Result { let manifest_files = object_store.list(Some(base.clone().join(VERSIONS_DIR))); let mut valid_manifests = manifest_files.try_filter_map(|res| { @@ -588,6 +870,51 @@ pub trait CommitHandler: Debug + Send + Sync { } } + /// List manifest locations with version `> since_version`, in descending + /// order of version. + /// + /// On lexically-ordered stores this is the standard listing with early + /// termination. On non-lexically-ordered stores (e.g. S3 Express) it uses + /// the version hint to avoid an O(n) listing, falling back to a full + /// listing if the hint is missing or stale. + fn list_manifest_locations_since<'a>( + &self, + base_path: &Path, + object_store: &'a ObjectStore, + since_version: u64, + ) -> BoxStream<'a, Result> { + if !uses_version_hint(object_store) { + return self + .list_manifest_locations(base_path, object_store, true) + .try_take_while(move |loc| future::ready(Ok(loc.version > since_version))) + .boxed(); + } + + let base_path = base_path.clone(); + futures::stream::once(async move { + let locations = match list_manifests_since_version_with_hint( + object_store, + &base_path, + since_version, + ) + .await + { + Some(locations) => locations, + None => { + let mut locations = list_manifests(&base_path, &object_store.inner) + .try_collect::>() + .await?; + locations.retain(|loc| loc.version > since_version); + locations.sort_by_key(|loc| std::cmp::Reverse(loc.version)); + locations + } + }; + Ok::<_, Error>(futures::stream::iter(locations.into_iter().map(Ok))) + }) + .try_flatten() + .boxed() + } + /// Commit a manifest. /// /// This function should return an [CommitError::CommitConflict] if another @@ -877,6 +1204,8 @@ impl CommitHandler for UnsafeCommitHandler { let res = manifest_writer(object_store, manifest, indices, &version_path, transaction).await?; + write_version_hint(object_store, base_path, manifest.version).await; + Ok(ManifestLocation { version: manifest.version, size: Some(res.size as u64), @@ -960,6 +1289,9 @@ impl CommitHandler for T { lease.release(res.is_ok()).await?; let res = res?; + + write_version_hint(object_store, base_path, manifest.version).await; + Ok(ManifestLocation { version: manifest.version, size: Some(res.size as u64), @@ -1028,6 +1360,7 @@ impl CommitHandler for RenameCommitHandler { { Ok(_) => { // Successfully committed + write_version_hint(object_store, base_path, manifest.version).await; Ok(ManifestLocation { version: manifest.version, path, @@ -1103,6 +1436,8 @@ impl CommitHandler for ConditionalPutCommitHandler { _ => CommitError::OtherError(err.into()), })?; + write_version_hint(object_store, base_path, manifest.version).await; + Ok(ManifestLocation { version: manifest.version, path, @@ -1282,6 +1617,196 @@ mod tests { assert_eq!(location.path, naming_scheme.manifest_path(&base, 11)); } + /// A memory store that reports `list_is_lexically_ordered == false`, like + /// S3 Express, so the version-hint paths are exercised. + fn non_lexical_memory_store() -> Box { + let mut object_store = ObjectStore::memory(); + object_store.list_is_lexically_ordered = false; + Box::new(object_store) + } + + #[tokio::test] + async fn test_write_version_hint() { + let base = Path::from("base"); + + // No hint is written on lexically-ordered stores (it would not be read). + let lexical = ObjectStore::memory(); + write_version_hint(&lexical, &base, 42).await; + assert_eq!(read_version_from_hint(&lexical, &base).await, None); + + let object_store = non_lexical_memory_store(); + write_version_hint(&object_store, &base, 42).await; + assert_eq!(read_version_from_hint(&object_store, &base).await, Some(42)); + + // A later commit overwrites the hint. + write_version_hint(&object_store, &base, 100).await; + assert_eq!( + read_version_from_hint(&object_store, &base).await, + Some(100) + ); + + // Detached versions are never written to the hint. + write_version_hint( + &object_store, + &base, + crate::format::DETACHED_VERSION_MASK | 7, + ) + .await; + assert_eq!( + read_version_from_hint(&object_store, &base).await, + Some(100) + ); + + // A corrupt / non-JSON hint file is treated as missing. + let hint_path = version_hint_path(&base); + object_store + .put(&hint_path, b"not json".as_slice()) + .await + .unwrap(); + assert_eq!(read_version_from_hint(&object_store, &base).await, None); + } + + #[tokio::test] + #[rstest::rstest] + async fn test_read_version_hint_and_probe( + #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)] + naming_scheme: ManifestNamingScheme, + ) { + let object_store = non_lexical_memory_store(); + let base = Path::from("base"); + + // No hint file yet. + assert!( + read_version_hint_and_probe(&object_store, &base) + .await + .is_none() + ); + + for version in 1..=5 { + object_store + .put(&naming_scheme.manifest_path(&base, version), b"".as_slice()) + .await + .unwrap(); + } + + // Stale hint: should probe forward and find version 5. + write_version_hint(&object_store, &base, 3).await; + let location = read_version_hint_and_probe(&object_store, &base) + .await + .unwrap(); + assert_eq!(location.version, 5); + assert_eq!(location.naming_scheme, naming_scheme); + + // Up-to-date hint: returns version 5 directly. + write_version_hint(&object_store, &base, 5).await; + let location = read_version_hint_and_probe(&object_store, &base) + .await + .unwrap(); + assert_eq!(location.version, 5); + + // Hint points past the latest version: not usable. + write_version_hint(&object_store, &base, 10).await; + assert!( + read_version_hint_and_probe(&object_store, &base) + .await + .is_none() + ); + } + + #[tokio::test] + async fn test_list_manifests_since_version_with_hint() { + let object_store = non_lexical_memory_store(); + let base = Path::from("base"); + let scheme = ManifestNamingScheme::V2; + + for version in 1..=10 { + object_store + .put(&scheme.manifest_path(&base, version), b"".as_slice()) + .await + .unwrap(); + } + + // No hint yet -> not usable, caller must fall back. + assert!( + list_manifests_since_version_with_hint(&object_store, &base, 7) + .await + .is_none() + ); + + // Hint exactly at the read version -> fast path, nothing new. + write_version_hint(&object_store, &base, 10).await; + assert!(matches!( + list_manifests_since_version_with_hint(&object_store, &base, 10).await, + Some(v) if v.is_empty() + )); + + // Hint ahead of the read version, with a gap to fill (8, 9) plus probing + // from the hint (10). Results are descending by version. + let locations = list_manifests_since_version_with_hint(&object_store, &base, 7) + .await + .unwrap(); + assert_eq!( + locations.iter().map(|l| l.version).collect::>(), + vec![10, 9, 8] + ); + + // Slightly stale hint (points at 8) still probes up to the true latest. + write_version_hint(&object_store, &base, 8).await; + let locations = list_manifests_since_version_with_hint(&object_store, &base, 7) + .await + .unwrap(); + assert_eq!( + locations.iter().map(|l| l.version).collect::>(), + vec![10, 9, 8] + ); + + // Hint points past the latest -> not usable, caller falls back. + write_version_hint(&object_store, &base, 20).await; + assert!( + list_manifests_since_version_with_hint(&object_store, &base, 7) + .await + .is_none() + ); + } + + #[tokio::test] + async fn test_current_manifest_path_with_hint_non_lexical() { + // Simulate S3 Express (non-lexically ordered list) with many versions. + let object_store = non_lexical_memory_store(); + let base = Path::from("base"); + let naming_scheme = ManifestNamingScheme::V2; + + for version in 1..=100 { + object_store + .put(&naming_scheme.manifest_path(&base, version), b"".as_slice()) + .await + .unwrap(); + } + + // Slightly stale hint: probing from 98 still resolves the true latest. + write_version_hint(&object_store, &base, 98).await; + let location = current_manifest_path(&object_store, &base).await.unwrap(); + assert_eq!(location.version, 100); + } + + #[tokio::test] + async fn test_current_manifest_path_with_stale_hint_falls_back_to_listing() { + let object_store = non_lexical_memory_store(); + let base = Path::from("base"); + let naming_scheme = ManifestNamingScheme::V2; + + // Only version 5 exists, but the hint claims version 10. + object_store + .put(&naming_scheme.manifest_path(&base, 5), b"".as_slice()) + .await + .unwrap(); + write_version_hint(&object_store, &base, 10).await; + + // The stale hint is ignored; listing finds version 5. + let location = current_manifest_path(&object_store, &base).await.unwrap(); + assert_eq!(location.version, 5); + } + #[test] fn test_parse_detached_version() { // Valid detached version filenames diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index 49d651fe6eb..75993ca8d1f 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -21,7 +21,7 @@ use tracing::info; use super::{ MANIFEST_EXTENSION, ManifestLocation, ManifestNamingScheme, current_manifest_path, - default_resolve_version, make_staging_manifest_path, + default_resolve_version, make_staging_manifest_path, write_version_hint, }; use crate::format::{IndexMetadata, Manifest, Transaction}; use crate::io::commit::{CommitError, CommitHandler}; @@ -490,7 +490,10 @@ impl CommitHandler for ExternalManifestCommitHandler { .await; match result { - Ok(location) => Ok(location), + Ok(location) => { + write_version_hint(object_store, base_path, manifest.version).await; + Ok(location) + } Err(_) => { // delete the staging manifest match object_store.inner.delete(&staging_path).await { diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 9f055fc9154..1f7c4af0bd1 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -247,5 +247,13 @@ name = "mem_wal_fts_bench" path = "benches/mem_wal/fts/mem_wal_fts_bench.rs" harness = false +[[bench]] +name = "manifest_commit" +harness = false + +[[bench]] +name = "concurrent_append" +harness = false + [lints] workspace = true diff --git a/rust/lance/benches/concurrent_append.rs b/rust/lance/benches/concurrent_append.rs new file mode 100644 index 00000000000..ac7cf3f610f --- /dev/null +++ b/rust/lance/benches/concurrent_append.rs @@ -0,0 +1,454 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Benchmark for concurrent-append throughput against S3 / S3 Express. +//! +//! Many writers append to the same dataset at once. The output measures how +//! the version-hint optimization affects conflict resolution and overall +//! commit rate as the version count grows. Designed to be run on a single +//! large EC2 instance so the writer count itself isn't the bottleneck. +//! +//! ## Running against S3 Standard +//! +//! ```bash +//! export AWS_REGION=us-east-1 +//! export DATASET_URI=s3://jack-devland-build/bench/concurrent_append +//! export NUM_WRITERS=64 +//! export APPENDS_PER_WRITER=200 +//! cargo bench --bench concurrent_append --release +//! ``` +//! +//! ## Running against S3 Express +//! +//! ```bash +//! export AWS_REGION=us-east-1 +//! export DATASET_URI=s3://jack-lancedb-devland--use1-az24--x-s3/bench/concurrent_append +//! export NUM_WRITERS=64 +//! export APPENDS_PER_WRITER=200 +//! cargo bench --bench concurrent_append --release +//! ``` +//! +//! ## Configuration +//! +//! - `DATASET_URI`: base URI under which a uniquely-named dataset is created. +//! Required. +//! - `NUM_WRITERS`: number of concurrent writers (default 64). +//! - `APPENDS_PER_WRITER`: appends each writer attempts (default 200). +//! - `ROWS_PER_APPEND`: rows per appended batch (default 100). +//! - `BASE_ROWS`: rows in the initial table before concurrent writes begin +//! (default 100_000). +//! - `KEEP_DATASET`: when set to `true`, leaves the dataset in place after +//! the run (default: deleted on S3, kept on local). + +#![allow(clippy::print_stdout, clippy::print_stderr)] + +use arrow_array::{Int64Array, RecordBatch, RecordBatchIterator, StringArray}; +use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use criterion::{Criterion, criterion_group, criterion_main}; +use lance::dataset::{Dataset, InsertBuilder, WriteMode, WriteParams, builder::DatasetBuilder}; +use lance::session::Session; +use lance_io::object_store::{ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor}; +use std::collections::HashMap; +use std::env; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use uuid::Uuid; + +const DEFAULT_NUM_WRITERS: usize = 64; +const DEFAULT_APPENDS_PER_WRITER: usize = 200; +const DEFAULT_ROWS_PER_APPEND: usize = 100; +const DEFAULT_BASE_ROWS: usize = 100_000; + +fn env_usize(key: &str, default: usize) -> usize { + env::var(key) + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(default) +} + +fn env_bool(key: &str) -> bool { + env::var(key) + .map(|s| s.eq_ignore_ascii_case("true")) + .unwrap_or(false) +} + +fn storage_label(uri: &str) -> &'static str { + if uri.contains("--x-s3") { + "s3express" + } else if uri.starts_with("s3://") { + "s3" + } else if uri.starts_with("gs://") { + "gcs" + } else if uri.starts_with("az://") { + "azure" + } else { + "local" + } +} + +fn schema() -> Arc { + Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + ])) +} + +fn batch(start_id: usize, num_rows: usize) -> RecordBatch { + let ids = Int64Array::from_iter_values((start_id as i64)..((start_id + num_rows) as i64)); + let names = StringArray::from_iter_values( + (start_id..(start_id + num_rows)).map(|i| format!("name_{i}")), + ); + RecordBatch::try_new(schema(), vec![Arc::new(ids), Arc::new(names)]).expect("build batch") +} + +/// Storage options that turn on S3 Express when the URI advertises it. +/// +/// S3 Express directory buckets don't support GetBucketLocation, so we also +/// require the caller to set `AWS_REGION` and forward it explicitly. +fn store_params_for(uri: &str) -> Option { + if !uri.contains("--x-s3") { + return None; + } + let region = env::var("AWS_REGION") + .or_else(|_| env::var("AWS_DEFAULT_REGION")) + .expect("AWS_REGION is required when DATASET_URI points at S3 Express"); + let storage_options: HashMap = + [("s3_express", "true"), ("region", region.as_str())] + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + Some(ObjectStoreParams { + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( + storage_options, + ))), + ..Default::default() + }) +} + +fn write_params(session: Arc, store_params: Option) -> WriteParams { + WriteParams { + mode: WriteMode::Append, + session: Some(session), + store_params, + skip_auto_cleanup: true, + ..Default::default() + } +} + +async fn create_base_dataset( + uri: &str, + base_rows: usize, + rows_per_append: usize, + session: Arc, + store_params: Option, +) -> Dataset { + // When `base_rows == 0` the dataset starts empty: one create commit with a + // zero-row batch so the writers begin at version 1 with no data. + let initial_rows = if base_rows == 0 { + 0 + } else { + rows_per_append.min(base_rows) + }; + let initial = batch(0, initial_rows); + let reader = RecordBatchIterator::new(vec![Ok(initial)], schema()); + let create_params = WriteParams { + mode: WriteMode::Create, + session: Some(session.clone()), + store_params: store_params.clone(), + skip_auto_cleanup: true, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, uri, Some(create_params)) + .await + .expect("create base dataset"); + + // Top up to BASE_ROWS in chunks so we don't allocate one huge batch. + let chunk = 10_000.min(base_rows); + let mut written = initial_rows; + while written < base_rows { + let to_write = chunk.min(base_rows - written); + let batch = batch(written, to_write); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema()); + let params = write_params(session.clone(), store_params.clone()); + dataset = Dataset::write(reader, uri, Some(params)) + .await + .expect("seed appends"); + written += to_write; + } + dataset +} + +struct WriterStats { + successes: usize, + failures: usize, + latencies: Vec, +} + +#[allow(clippy::too_many_arguments)] +async fn run_writer( + writer_id: usize, + uri: String, + appends: usize, + rows_per_append: usize, + deadline: Option, + per_attempt_timeout: Option, + session: Arc, + store_params: Option, +) -> WriterStats { + // Each writer keeps its own dataset handle; CommitBuilder rebases on + // conflict so we don't need to manually reload between appends. + let mut dataset = Arc::new( + DatasetBuilder::from_uri(&uri) + .with_session(session.clone()) + .load() + .await + .expect("writer load"), + ); + + let mut stats = WriterStats { + successes: 0, + failures: 0, + latencies: Vec::with_capacity(appends), + }; + + // Disjoint id ranges per writer so the data inserted is identifiable. + let id_base = 1_000_000 + writer_id * appends * rows_per_append; + for i in 0..appends { + if let Some(d) = deadline + && Instant::now() >= d + { + break; + } + let batch = batch(id_base + i * rows_per_append, rows_per_append); + let params = write_params(session.clone(), store_params.clone()); + let start = Instant::now(); + // Per-attempt cap keeps the slow-tail commits from extending the run + // far past the writer-side deadline at high concurrency. + let result = match per_attempt_timeout { + Some(t) => { + let ds = dataset.clone(); + let params_ref = ¶ms; + match tokio::time::timeout(t, async move { + InsertBuilder::new(ds) + .with_params(params_ref) + .execute(vec![batch]) + .await + }) + .await + { + Ok(r) => r, + Err(_) => Err(lance_core::Error::io_source(Box::new(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "per-attempt timeout", + )))), + } + } + None => { + InsertBuilder::new(dataset.clone()) + .with_params(¶ms) + .execute(vec![batch]) + .await + } + }; + let elapsed = start.elapsed(); + match result { + Ok(new_ds) => { + stats.successes += 1; + stats.latencies.push(elapsed); + dataset = Arc::new(new_ds); + } + Err(e) => { + stats.failures += 1; + eprintln!("writer {writer_id} append {i} failed after {elapsed:?}: {e}"); + // Reload and keep going so a single failure doesn't end the run. + dataset = Arc::new( + DatasetBuilder::from_uri(&uri) + .with_session(session.clone()) + .load() + .await + .expect("writer reload after error"), + ); + } + } + } + stats +} + +fn percentile(sorted: &[Duration], p: f64) -> Duration { + if sorted.is_empty() { + return Duration::ZERO; + } + let idx = ((sorted.len() as f64 - 1.0) * p).round() as usize; + sorted[idx.min(sorted.len() - 1)] +} + +fn ms(d: Duration) -> f64 { + d.as_secs_f64() * 1000.0 +} + +fn bench_concurrent_append(c: &mut Criterion) { + let dataset_base = + env::var("DATASET_URI").expect("DATASET_URI is required for concurrent_append bench"); + let num_writers = env_usize("NUM_WRITERS", DEFAULT_NUM_WRITERS); + let appends_per_writer = env_usize("APPENDS_PER_WRITER", DEFAULT_APPENDS_PER_WRITER); + let rows_per_append = env_usize("ROWS_PER_APPEND", DEFAULT_ROWS_PER_APPEND); + let base_rows = env_usize("BASE_ROWS", DEFAULT_BASE_ROWS); + let keep_dataset = env_bool("KEEP_DATASET"); + // Per-writer wall-clock budget. When non-zero, each writer stops looping + // once this many seconds have elapsed since the run started, even if it + // hasn't issued `APPENDS_PER_WRITER` commits yet. Lets us bound run time + // at high concurrency where conflict retries make commits arbitrarily slow. + let max_wall_secs = env_usize("MAX_WALL_SECS", 0); + // Per-attempt timeout. Caps any single commit attempt (including its + // internal retries) so the slow-tail of an under-contention commit doesn't + // extend the run past the writer deadline. 0 disables it. + let per_attempt_timeout_secs = env_usize("PER_ATTEMPT_TIMEOUT_SECS", 0); + + let uri = format!( + "{}/concurrent_append_{}", + dataset_base.trim_end_matches('/'), + &Uuid::new_v4().to_string()[..8] + ); + let label = storage_label(&uri); + let store_params = store_params_for(&uri); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("build tokio runtime"); + + println!("=== Concurrent Append Benchmark ==="); + println!("Storage: {uri} ({label})"); + println!( + "Writers: {num_writers}, appends/writer: {appends_per_writer}, rows/append: {rows_per_append}" + ); + println!("Base rows: {base_rows}, keep_dataset: {keep_dataset}"); + println!(); + + // Share one ObjectStoreRegistry so all writers reuse warm TCP/TLS sessions. + let registry = Arc::new(ObjectStoreRegistry::default()); + let session = Arc::new(Session::new(0, 0, registry)); + + println!("Seeding base dataset ({base_rows} rows)..."); + let seed_start = Instant::now(); + let base_dataset = runtime.block_on(create_base_dataset( + &uri, + base_rows, + rows_per_append, + session.clone(), + store_params.clone(), + )); + let starting_version = base_dataset.manifest().version; + println!( + "Base dataset ready in {:.2}s at version {starting_version}", + seed_start.elapsed().as_secs_f64() + ); + + println!("Starting {num_writers} concurrent writers..."); + let wall_start = Instant::now(); + let deadline = + (max_wall_secs > 0).then(|| wall_start + Duration::from_secs(max_wall_secs as u64)); + if let Some(d) = deadline { + println!( + "Per-writer wall budget: {max_wall_secs}s (deadline {:?} from now)", + d.duration_since(wall_start) + ); + } + let per_attempt_timeout = (per_attempt_timeout_secs > 0) + .then(|| Duration::from_secs(per_attempt_timeout_secs as u64)); + if let Some(t) = per_attempt_timeout { + println!("Per-attempt timeout: {:?}", t); + } + let all_stats: Vec = runtime.block_on(async { + let mut tasks = Vec::with_capacity(num_writers); + for writer_id in 0..num_writers { + let uri = uri.clone(); + let session = session.clone(); + let store_params = store_params.clone(); + tasks.push(tokio::spawn(async move { + run_writer( + writer_id, + uri, + appends_per_writer, + rows_per_append, + deadline, + per_attempt_timeout, + session, + store_params, + ) + .await + })); + } + let mut out = Vec::with_capacity(num_writers); + for t in tasks { + out.push(t.await.expect("writer task panicked")); + } + out + }); + let wall = wall_start.elapsed(); + + let total_attempts = all_stats + .iter() + .map(|s| s.successes + s.failures) + .sum::(); + let total_success = all_stats.iter().map(|s| s.successes).sum::(); + let total_failed = all_stats.iter().map(|s| s.failures).sum::(); + let mut latencies: Vec = all_stats + .into_iter() + .flat_map(|s| s.latencies.into_iter()) + .collect(); + latencies.sort(); + + let throughput = total_success as f64 / wall.as_secs_f64(); + + println!(); + println!("=== Results ==="); + println!("Wall time: {:.2}s", wall.as_secs_f64()); + println!( + "Commits: {total_success} succeeded, {total_failed} failed out of {total_attempts} attempts" + ); + println!("Throughput: {throughput:.2} commits/sec"); + if !latencies.is_empty() { + let mean = latencies.iter().map(|d| d.as_secs_f64()).sum::() / latencies.len() as f64; + println!( + "Commit latency (per writer, includes any retries): \ + p50={:.2}ms p90={:.2}ms p95={:.2}ms p99={:.2}ms max={:.2}ms mean={:.2}ms", + ms(percentile(&latencies, 0.50)), + ms(percentile(&latencies, 0.90)), + ms(percentile(&latencies, 0.95)), + ms(percentile(&latencies, 0.99)), + ms(*latencies.last().unwrap()), + mean * 1000.0, + ); + } + + let final_dataset = runtime.block_on(async { + DatasetBuilder::from_uri(&uri) + .with_session(session.clone()) + .load() + .await + .expect("final load") + }); + println!( + "Final dataset version: {} (started at {})", + final_dataset.manifest().version, + starting_version + ); + + // Pin the numbers into criterion so they show up in regression tracking. + let mut group = c.benchmark_group(format!("concurrent_append_{label}")); + group.bench_function("commits_per_sec", |b| b.iter(|| throughput)); + group.bench_function("p50_ms", |b| b.iter(|| ms(percentile(&latencies, 0.50)))); + group.bench_function("p99_ms", |b| b.iter(|| ms(percentile(&latencies, 0.99)))); + group.finish(); + + if !keep_dataset && label == "local" { + let _ = std::fs::remove_dir_all(&uri); + println!("Local dataset removed: {uri}"); + } else { + println!("Dataset preserved: {uri}"); + } +} + +criterion_group!(benches, bench_concurrent_append); +criterion_main!(benches); diff --git a/rust/lance/benches/manifest_commit.rs b/rust/lance/benches/manifest_commit.rs new file mode 100644 index 00000000000..2a98a37a498 --- /dev/null +++ b/rust/lance/benches/manifest_commit.rs @@ -0,0 +1,371 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Benchmark for manifest commit performance with many small fragments. +//! +//! This benchmark tests how performance degrades as the number of small fragments +//! grows. Each fragment contains only 10 rows, and we measure both: +//! - Commit time (manifest write only, excludes fragment data writing) +//! - Load time (manifest read from storage, using checkout_latest) +//! +//! Key optimizations: +//! - Uses shared ObjectStoreRegistry to reuse TCP/TLS connections +//! - Disables auto-cleanup to avoid background cleanup overhead +//! - Separates fragment writing from commit measurement +//! +//! ## Running against S3 Express +//! +//! ```bash +//! export AWS_REGION=us-east-1 +//! export DATASET_PREFIX=s3://your-bucket--use1-az4--x-s3/bench/manifest_commit +//! export NUM_ITERATIONS=100 +//! cargo bench --bench manifest_commit +//! ``` +//! +//! ## Running against local filesystem (with temp directory) +//! +//! ```bash +//! cargo bench --bench manifest_commit +//! ``` +//! +//! ## Configuration +//! +//! - `DATASET_PREFIX`: Base URI for datasets (e.g. s3://bucket/prefix or /tmp/bench). +//! If not set, uses a temporary directory. +//! - `NUM_ITERATIONS`: Number of small fragment writes to perform (default: 100). +//! - `ROWS_PER_FRAGMENT`: Number of rows per fragment (default: 10). +//! - `DELETE_DATASET`: When "true", delete the dataset after benchmark completes. +//! - `ENABLE_CACHE`: When "true", enable manifest caching for load measurements. +//! Default is "false" to measure actual storage read latency. + +#![allow(clippy::print_stdout)] + +use arrow_array::{Int64Array, RecordBatch, RecordBatchIterator, StringArray}; +use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use criterion::{Criterion, criterion_group, criterion_main}; +use lance::dataset::builder::DatasetBuilder; +use lance::dataset::{CommitBuilder, Dataset, InsertBuilder, WriteMode, WriteParams}; +use lance::session::Session; +use lance_io::object_store::ObjectStoreRegistry; +use std::sync::Arc; +use std::time::Instant; +use tokio::runtime::Runtime; +use uuid::Uuid; + +const DEFAULT_ROWS_PER_FRAGMENT: usize = 10; +const DEFAULT_NUM_ITERATIONS: usize = 100; + +fn get_rows_per_fragment() -> usize { + std::env::var("ROWS_PER_FRAGMENT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_ROWS_PER_FRAGMENT) +} + +fn get_num_iterations() -> usize { + std::env::var("NUM_ITERATIONS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_NUM_ITERATIONS) +} + +fn get_delete_dataset() -> bool { + std::env::var("DELETE_DATASET") + .map(|s| s.to_lowercase() == "true") + .unwrap_or(false) +} + +fn get_enable_cache() -> bool { + std::env::var("ENABLE_CACHE") + .map(|s| s.to_lowercase() == "true") + .unwrap_or(false) +} + +fn get_dataset_prefix() -> String { + std::env::var("DATASET_PREFIX").unwrap_or_else(|_| { + let temp_dir = std::env::temp_dir().join(format!("lance_bench_{}", Uuid::new_v4())); + std::fs::create_dir_all(&temp_dir).expect("Failed to create temp directory"); + temp_dir.to_string_lossy().to_string() + }) +} + +fn get_storage_label(prefix: &str) -> &'static str { + if prefix.starts_with("s3://") { + "s3" + } else if prefix.starts_with("gs://") { + "gcs" + } else if prefix.starts_with("az://") { + "azure" + } else if prefix.starts_with("memory://") { + "memory" + } else { + "local" + } +} + +async fn create_initial_dataset( + uri: &str, + rows_per_fragment: usize, + session: Arc, +) -> Dataset { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + ])); + + let batch = create_batch(schema.clone(), 0, rows_per_fragment); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + + std::fs::remove_dir_all(uri).ok(); + + let params = WriteParams { + session: Some(session), + skip_auto_cleanup: true, + ..Default::default() + }; + + Dataset::write(reader, uri, Some(params)) + .await + .expect("failed to create initial dataset") +} + +fn create_batch(schema: Arc, start_id: usize, num_rows: usize) -> RecordBatch { + let ids = Int64Array::from_iter_values((start_id as i64)..((start_id + num_rows) as i64)); + let names = StringArray::from_iter_values( + (start_id..(start_id + num_rows)).map(|i| format!("name_{}", i)), + ); + + RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(names)]) + .expect("failed to create batch") +} + +fn bench_manifest_commit(c: &mut Criterion) { + let runtime = Runtime::new().expect("failed to build tokio runtime"); + + let dataset_prefix = get_dataset_prefix(); + let num_iterations = get_num_iterations(); + let rows_per_fragment = get_rows_per_fragment(); + let delete_dataset = get_delete_dataset(); + let enable_cache = get_enable_cache(); + let storage_label = get_storage_label(&dataset_prefix); + + let short_id = &Uuid::new_v4().to_string()[..8]; + let uri = format!( + "{}/manifest_commit_{}", + dataset_prefix.trim_end_matches('/'), + short_id + ); + + println!("=== Manifest Commit Benchmark Setup ==="); + println!("Storage: {} ({})", uri, storage_label); + println!("Rows per fragment: {}", rows_per_fragment); + println!("Number of iterations: {}", num_iterations); + println!( + "Total fragments (including initial): {}", + num_iterations + 1 + ); + println!("Delete dataset: {}", delete_dataset); + println!( + "Cache enabled: {} ({})", + enable_cache, + if enable_cache { + "using default cache size" + } else { + "zero cache size - measures actual storage read" + } + ); + println!(); + + // Create a shared session for both commit and load operations + // When cache is disabled, use zero cache size to measure actual storage read latency + // When cache is enabled, use default cache sizes (6GB index, 1GB metadata) + let shared_store_registry = Arc::new(ObjectStoreRegistry::default()); + let session = if enable_cache { + Arc::new(Session::default()) + } else { + Arc::new(Session::new(0, 0, shared_store_registry)) + }; + + let initial_dataset = runtime.block_on(create_initial_dataset( + &uri, + rows_per_fragment, + session.clone(), + )); + + let uri_clone = uri.clone(); + let mut load_dataset = runtime.block_on(async { + DatasetBuilder::from_uri(&uri_clone) + .with_session(session.clone()) + .load() + .await + .expect("failed to load dataset for load measurements") + }); + + let mut current_dataset = Arc::new(initial_dataset); + + let mut commit_latencies = Vec::with_capacity(num_iterations); + let mut load_latencies = Vec::with_capacity(num_iterations); + + println!("Running commit and load benchmarks..."); + println!("fragments,commit_ms,load_ms"); + + for i in 1..=num_iterations { + let num_fragments = i + 1; + + let (commit_time, new_dataset) = { + let dataset = current_dataset.clone(); + let session_clone = session.clone(); + runtime.block_on(async move { + let schema: Arc = Arc::new((&dataset.schema().clone()).into()); + let start_id = dataset.count_rows(None).await.unwrap() as usize; + let batch = create_batch(schema.clone(), start_id, rows_per_fragment); + + let write_params = WriteParams { + mode: WriteMode::Append, + session: Some(session_clone.clone()), + skip_auto_cleanup: true, + ..Default::default() + }; + + let transaction = InsertBuilder::new(dataset.clone()) + .with_params(&write_params) + .execute_uncommitted(vec![batch]) + .await + .expect("failed to write fragment"); + + let start = Instant::now(); + let new_ds = CommitBuilder::new(dataset) + .with_session(session_clone) + .with_skip_auto_cleanup(true) + .execute(transaction) + .await + .expect("failed to commit"); + (start.elapsed(), Arc::new(new_ds)) + }) + }; + + let load_time = runtime.block_on(async { + let start = Instant::now(); + load_dataset + .checkout_latest() + .await + .expect("failed to checkout latest"); + let elapsed = start.elapsed(); + + assert_eq!( + load_dataset.manifest().fragments.len(), + num_fragments, + "Expected {} fragments", + num_fragments + ); + elapsed + }); + + current_dataset = new_dataset; + + commit_latencies.push(commit_time); + load_latencies.push(load_time); + + println!( + "{},{:.2},{:.2}", + num_fragments, + commit_time.as_secs_f64() * 1000.0, + load_time.as_secs_f64() * 1000.0 + ); + } + + println!(); + println!("=== Summary Statistics ==="); + + let avg_commit: f64 = commit_latencies + .iter() + .map(|d| d.as_secs_f64()) + .sum::() + / commit_latencies.len() as f64; + let avg_load: f64 = + load_latencies.iter().map(|d| d.as_secs_f64()).sum::() / load_latencies.len() as f64; + + let min_commit = commit_latencies.iter().min().unwrap(); + let max_commit = commit_latencies.iter().max().unwrap(); + let min_load = load_latencies.iter().min().unwrap(); + let max_load = load_latencies.iter().max().unwrap(); + + println!( + "Commit latency: avg={:.2}ms, min={:.2}ms, max={:.2}ms", + avg_commit * 1000.0, + min_commit.as_secs_f64() * 1000.0, + max_commit.as_secs_f64() * 1000.0 + ); + println!( + "Load latency: avg={:.2}ms, min={:.2}ms, max={:.2}ms", + avg_load * 1000.0, + min_load.as_secs_f64() * 1000.0, + max_load.as_secs_f64() * 1000.0 + ); + + let first_10_avg_commit = commit_latencies + .iter() + .take(10) + .map(|d| d.as_secs_f64()) + .sum::() + / 10.0; + let last_10_avg_commit = commit_latencies + .iter() + .rev() + .take(10) + .map(|d| d.as_secs_f64()) + .sum::() + / 10.0; + let first_10_avg_load = load_latencies + .iter() + .take(10) + .map(|d| d.as_secs_f64()) + .sum::() + / 10.0; + let last_10_avg_load = load_latencies + .iter() + .rev() + .take(10) + .map(|d| d.as_secs_f64()) + .sum::() + / 10.0; + + println!(); + println!( + "First 10 iterations avg: commit={:.2}ms, load={:.2}ms", + first_10_avg_commit * 1000.0, + first_10_avg_load * 1000.0 + ); + println!( + "Last 10 iterations avg: commit={:.2}ms, load={:.2}ms", + last_10_avg_commit * 1000.0, + last_10_avg_load * 1000.0 + ); + println!( + "Degradation ratio: commit={:.2}x, load={:.2}x", + last_10_avg_commit / first_10_avg_commit, + last_10_avg_load / first_10_avg_load + ); + + let mut group = c.benchmark_group("manifest_commit"); + + group.bench_function("avg_commit_latency", |b| { + b.iter(|| std::time::Duration::from_secs_f64(avg_commit)) + }); + + group.bench_function("avg_load_latency", |b| { + b.iter(|| std::time::Duration::from_secs_f64(avg_load)) + }); + + group.finish(); + + if delete_dataset { + std::fs::remove_dir_all(&uri).ok(); + println!("Dataset deleted: {}", uri); + } else { + println!("Dataset preserved: {}", uri); + } +} + +criterion_group!(benches, bench_manifest_commit); +criterion_main!(benches); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 8abb5975fdd..30e43c5bb32 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2760,16 +2760,15 @@ pub(crate) struct NewTransactionResult<'a> { } pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'_> { - // Re-use the same list call for getting the latest manifest and the metadata - // for all manifests in between. + // Resolve every manifest with version > our current version (the latest plus + // the ones in between). On non-lexically-ordered stores this uses the version + // hint to avoid an O(n) listing. let io_parallelism = dataset.object_store.as_ref().io_parallelism(); - let latest_version = dataset.manifest.version; - let locations = dataset - .commit_handler - .list_manifest_locations(&dataset.base, dataset.object_store.as_ref(), true) - .try_take_while(move |location| { - futures::future::ready(Ok(location.version > latest_version)) - }); + let locations = dataset.commit_handler.list_manifest_locations_since( + &dataset.base, + dataset.object_store.as_ref(), + dataset.manifest.version, + ); // Will send the latest manifest via a channel. let (latest_tx, latest_rx) = tokio::sync::oneshot::channel(); diff --git a/rust/lance/src/dataset/tests/dataset_versioning.rs b/rust/lance/src/dataset/tests/dataset_versioning.rs index 088ce85b9a5..5ac01c498b2 100644 --- a/rust/lance/src/dataset/tests/dataset_versioning.rs +++ b/rust/lance/src/dataset/tests/dataset_versioning.rs @@ -34,6 +34,8 @@ fn assert_all_manifests_use_scheme(test_dir: &TempStdDir, scheme: ManifestNaming .read_dir() .unwrap() .map(|entry| entry.unwrap().file_name().into_string().unwrap()) + // Ignore the version hint file, which is not a manifest. + .filter(|name| !name.starts_with("latest_version_hint")) .collect::>(); assert!( entries_names diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index efab12d249c..45b78b48bcb 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -561,6 +561,7 @@ mod tests { // Should see 2 IOPs: // 1. Write the transaction files // 2. Write (conditional put) the manifest + // (the version hint is only written on non-lexically-ordered stores) assert_io_eq!(io_stats, write_iops, 2, "write txn + manifest, i = {}", i); } @@ -629,7 +630,7 @@ mod tests { // Assert io requests let io_stats = new_ds.object_store.as_ref().io_stats_incremental(); // This could be zero, if we decided to be optimistic. However, that - // would mean two wasted write requests (txn + manifest) if there was + // would mean wasted write requests (txn + manifest) if there was // a conflict. We choose to be pessimistic for more consistent performance. assert_io_eq!(io_stats, read_iops, 1); assert_io_eq!(io_stats, write_iops, 2); @@ -786,4 +787,82 @@ mod tests { ); assert_eq!(transaction.read_version, 1); } + + /// On non-lexically-ordered stores (e.g. S3 Express) a commit should use the + /// version hint (a few HEAD probes, O(k)) instead of a full O(n) listing. + #[tokio::test] + async fn test_commit_uses_version_hint_on_non_lexical_store() { + // Make `list` artificially slow per entry so a full listing would be + // obvious; HEAD/GET/PUT stay fast. + let throttled = Arc::new(ThrottledStoreWrapper { + config: ThrottleConfig { + wait_list_per_entry: Duration::from_millis(50), + wait_get_per_call: Duration::from_millis(1), + wait_put_per_call: Duration::from_millis(1), + ..Default::default() + }, + }); + let session = Arc::new(Session::default()); + let write_params = WriteParams { + store_params: Some(ObjectStoreParams { + object_store_wrapper: Some(throttled), + list_is_lexically_ordered: Some(false), + ..Default::default() + }), + session: Some(session.clone()), + enable_v2_manifest_paths: true, + ..Default::default() + }; + + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..10_i32))], + ) + .unwrap(); + let mut dataset = Arc::new( + InsertBuilder::new("memory://test_version_hint") + .with_params(&write_params) + .execute(vec![batch]) + .await + .unwrap(), + ); + + // Build up many versions so a full listing would be expensive. + for _ in 0..50 { + dataset = Arc::new( + CommitBuilder::new(dataset.clone()) + .execute(sample_transaction(dataset.manifest().version)) + .await + .unwrap(), + ); + } + assert_eq!(dataset.manifest().version, 51); + + dataset.object_store.as_ref().io_stats_incremental(); + + let start = std::time::Instant::now(); + let new_ds = CommitBuilder::new(dataset.clone()) + .execute(sample_transaction(dataset.manifest().version)) + .await + .unwrap(); + let elapsed = start.elapsed(); + + // A full listing of ~52 entries at 50ms each would take ~2.6s. + assert!( + elapsed < Duration::from_secs(1), + "commit took {elapsed:?}; the version hint path was likely not used" + ); + + let io_stats = new_ds.object_store.as_ref().io_stats_incremental(); + assert!( + io_stats.read_iops < 10, + "read_iops = {}; a full listing was likely used", + io_stats.read_iops + ); + } } diff --git a/rust/lance/src/io/commit/external_manifest.rs b/rust/lance/src/io/commit/external_manifest.rs index d68ef08fad4..df2b84a4878 100644 --- a/rust/lance/src/io/commit/external_manifest.rs +++ b/rust/lance/src/io/commit/external_manifest.rs @@ -267,6 +267,15 @@ mod test { .to_string_lossy() .contains(".manifest#") }) + // The version hint file is expected to be present. + .filter(|entry| { + let entry = entry.as_ref().unwrap(); + !entry + .file_name() + .as_os_str() + .to_string_lossy() + .starts_with("latest_version_hint") + }) .collect::>(); assert!(unexpected_entries.is_empty(), "{:?}", unexpected_entries); } @@ -373,6 +382,15 @@ mod test { .to_string_lossy() .ends_with(".manifest") }) + // The version hint file is expected to be present. + .filter(|entry| { + let entry = entry.as_ref().unwrap(); + !entry + .file_name() + .as_os_str() + .to_string_lossy() + .starts_with("latest_version_hint") + }) .collect::>(); assert!(unexpected_entries.is_empty(), "{:?}", unexpected_entries); }