From e4aa14c004ac7cbb7198e2f2a5d9a31f1f9b28a1 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 24 Apr 2026 14:36:20 +0800 Subject: [PATCH 1/2] feat: add lance_dataset_restore for rolling back to a prior version Commits a new manifest that aliases the fragments of an older version as the new latest. Returns a fresh LanceDataset* at the new manifest; the caller's original handle is untouched. If the target is already the latest, the call is a no-op (no new manifest is written). Version 0 is rejected up front since lance_dataset_open already uses 0 as the "latest" sentinel; overloading it here would be confusing. Non-existent versions surface upstream's Error::NotFound as LANCE_ERR_NOT_FOUND; concurrent commits surface as LANCE_ERR_COMMIT_CONFLICT. C++ gets a matching lance::Dataset::restore(version) member that returns a new Dataset. Tests: 5 new Rust unit tests (restore-to-prior with version/rowcount assertions, restore-to-latest no-op, non-existent version, version == 0 rejection, NULL dataset rejection). C and C++ integration tests smoke the no-op path against the shared read-only dataset. Closes #12. --- include/lance.h | 16 ++++++ include/lance.hpp | 10 ++++ src/lib.rs | 2 + src/restore.rs | 73 ++++++++++++++++++++++++++ tests/c_api_test.rs | 104 +++++++++++++++++++++++++++++++++++++ tests/cpp/test_c_api.c | 20 +++++++ tests/cpp/test_cpp_api.cpp | 15 ++++++ 7 files changed, 240 insertions(+) create mode 100644 src/restore.rs diff --git a/include/lance.h b/include/lance.h index 5424359..340e259 100644 --- a/include/lance.h +++ b/include/lance.h @@ -155,6 +155,22 @@ int64_t lance_versions_timestamp_ms_at(const LanceVersions* versions, size_t ind /** Close and free a versions handle. Safe to call with NULL. */ void lance_versions_close(LanceVersions* versions); +/** + * Restore the dataset to an older version by committing a new manifest that + * carries the fragments of `version`. If `version` is already the latest, + * succeeds as a no-op without writing a new manifest. + * + * @param dataset Open dataset (not consumed). Must not be NULL. + * @param version Target version id (>= 1). `0` is rejected since it is the + * "latest" sentinel used by lance_dataset_open. + * @return Fresh LanceDataset* positioned at the target version (caller closes + * with lance_dataset_close), or NULL on error. Possible error codes + * include LANCE_ERR_INVALID_ARGUMENT (NULL handle or version == 0), + * LANCE_ERR_NOT_FOUND (unknown version), + * LANCE_ERR_COMMIT_CONFLICT (concurrent writer). + */ +LanceDataset* lance_dataset_restore(const LanceDataset* dataset, uint64_t version); + /** * Export the dataset schema via Arrow C Data Interface. * @param out Pointer to caller-allocated ArrowSchema struct diff --git a/include/lance.hpp b/include/lance.hpp index 186cb9e..ccf5375 100644 --- a/include/lance.hpp +++ b/include/lance.hpp @@ -162,6 +162,16 @@ class Dataset { return out; } + /// Commit a new manifest that aliases `version` as the latest. The + /// returned Dataset points at the target version; this handle is + /// unchanged. If `version` is already the latest, no new manifest is + /// written. Throws lance::Error on failure. + Dataset restore(uint64_t version) const { + auto* out = lance_dataset_restore(handle_.get(), version); + if (!out) check_error(); + return Dataset(out); + } + /// Export the schema as an Arrow C Data Interface struct. void schema(ArrowSchema* out) const { if (lance_dataset_schema(handle_.get(), out) != 0) { diff --git a/src/lib.rs b/src/lib.rs index 318c9cd..83261d9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ mod dataset; mod error; mod fragment_writer; mod helpers; +mod restore; pub mod runtime; mod scanner; mod versions; @@ -32,5 +33,6 @@ pub use error::{ LanceErrorCode, lance_free_string, lance_last_error_code, lance_last_error_message, }; pub use fragment_writer::*; +pub use restore::*; pub use scanner::*; pub use versions::*; diff --git a/src/restore.rs b/src/restore.rs new file mode 100644 index 0000000..4ba94f2 --- /dev/null +++ b/src/restore.rs @@ -0,0 +1,73 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Restore C API: move a dataset's latest back to an older version by +//! committing a new manifest whose fragments match the chosen version. +//! +//! Returns a fresh `LanceDataset*` positioned at the target version; the +//! caller's original handle is untouched and remains usable. + +use std::sync::Arc; + +use lance_core::Result; + +use crate::dataset::LanceDataset; +use crate::error::ffi_try; +use crate::runtime::block_on; + +/// Restore the dataset to an older version by committing a new manifest that +/// carries the fragments of `version`. +/// +/// - `dataset`: Open dataset (not consumed). Must not be NULL. +/// - `version`: Target version id. Must be `>= 1`; `0` is reserved as the +/// "latest" sentinel by `lance_dataset_open` and is rejected here with +/// `LANCE_ERR_INVALID_ARGUMENT`. +/// +/// If `version` is already the dataset's latest, the call succeeds as a +/// no-op without writing a new manifest. +/// +/// Returns a fresh `LanceDataset*` positioned at the target version on success +/// (caller closes with `lance_dataset_close`), or NULL on error. Errors include +/// `LANCE_ERR_NOT_FOUND` for an unknown `version` and `LANCE_ERR_COMMIT_CONFLICT` +/// for a concurrent commit race. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_restore( + dataset: *const LanceDataset, + version: u64, +) -> *mut LanceDataset { + ffi_try!(unsafe { restore_inner(dataset, version) }, null) +} + +unsafe fn restore_inner(dataset: *const LanceDataset, version: u64) -> Result<*mut LanceDataset> { + if dataset.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "dataset must not be NULL".into(), + location: snafu::location!(), + }); + } + if version == 0 { + return Err(lance_core::Error::InvalidInput { + source: "version must be >= 1; 0 is reserved as the \"latest\" sentinel".into(), + location: snafu::location!(), + }); + } + + let ds = unsafe { &*dataset }; + + // Check out the target version, then commit a new manifest that aliases + // its fragments as the new latest. If the target is already the latest, + // skip the commit — the checkout alone is enough. + let restored = block_on(async { + let latest = ds.inner.latest_version_id().await?; + let mut checked_out = ds.inner.checkout_version(version).await?; + if version != latest { + checked_out.restore().await?; + } + Ok::<_, lance_core::Error>(checked_out) + })?; + + let handle = LanceDataset { + inner: Arc::new(restored), + }; + Ok(Box::into_raw(Box::new(handle))) +} diff --git a/tests/c_api_test.rs b/tests/c_api_test.rs index 2a31e18..d025fc9 100644 --- a/tests/c_api_test.rs +++ b/tests/c_api_test.rs @@ -1783,3 +1783,107 @@ fn test_versions_accessors_null_handle() { fn test_versions_close_null_is_safe() { unsafe { lance_versions_close(ptr::null_mut()) }; } + +// --------------------------------------------------------------------------- +// Restore (lance_dataset_restore) +// --------------------------------------------------------------------------- + +/// Helper: set up a dataset with two versions — initial create (rows 1..=5) +/// plus an append (rows 6..=7), returning `(tempdir, uri)`. +fn create_two_version_dataset() -> (tempfile::TempDir, String) { + let (tmp, uri) = create_test_dataset(); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![6, 7])), + Arc::new(StringArray::from(vec!["frank", "grace"])), + ], + ) + .unwrap(); + append_batch(&uri, schema, batch); + (tmp, uri) +} + +#[test] +fn test_dataset_restore_to_prior_version() { + let (_tmp, uri) = create_two_version_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert_eq!(unsafe { lance_dataset_version(ds) }, 2); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 7); + + // Restore to V1 — expect a fresh handle at a new version (3) with V1's + // row count (5). + let restored = unsafe { lance_dataset_restore(ds, 1) }; + assert!(!restored.is_null()); + assert_eq!(unsafe { lance_dataset_version(restored) }, 3); + assert_eq!(unsafe { lance_dataset_count_rows(restored) }, 5); + + // Original handle is untouched. + assert_eq!(unsafe { lance_dataset_version(ds) }, 2); + + unsafe { lance_dataset_close(restored) }; + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_dataset_restore_to_current_latest_is_noop() { + let (_tmp, uri) = create_two_version_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let latest = unsafe { lance_dataset_version(ds) }; + assert_eq!(latest, 2); + + let restored = unsafe { lance_dataset_restore(ds, latest) }; + assert!(!restored.is_null()); + assert_eq!( + unsafe { lance_dataset_version(restored) }, + latest, + "restore to latest must not bump the version" + ); + + // No new manifest: reopening the dataset reports the same latest. + unsafe { lance_dataset_close(restored) }; + let ds2 = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert_eq!(unsafe { lance_dataset_version(ds2) }, latest); + + unsafe { lance_dataset_close(ds2) }; + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_dataset_restore_nonexistent_version() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let restored = unsafe { lance_dataset_restore(ds, 999) }; + assert!(restored.is_null()); + assert_eq!(lance_last_error_code(), LanceErrorCode::NotFound); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_dataset_restore_version_zero_rejected() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let restored = unsafe { lance_dataset_restore(ds, 0) }; + assert!(restored.is_null()); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_dataset_restore_null_dataset_rejected() { + let restored = unsafe { lance_dataset_restore(ptr::null(), 1) }; + assert!(restored.is_null()); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); +} diff --git a/tests/cpp/test_c_api.c b/tests/cpp/test_c_api.c index c0eeb1d..61f54a1 100644 --- a/tests/cpp/test_c_api.c +++ b/tests/cpp/test_c_api.c @@ -179,6 +179,25 @@ static void test_versions(const char *uri) { printf("OK\n"); } +/* Restore the dataset to its own current version — expected to be a no-op + * (same version id, no new manifest written). This is the only restore path + * that's safe to run against the shared read-only test dataset. */ +static void test_restore_noop(const char *uri) { + printf(" test_restore_noop... "); + + LanceDataset *ds = lance_dataset_open(uri, NULL, 0); + ASSERT(ds != NULL, "open failed"); + uint64_t current = lance_dataset_version(ds); + + LanceDataset *after = lance_dataset_restore(ds, current); + ASSERT(after != NULL, "restore failed"); + ASSERT(lance_dataset_version(after) == current, "version unexpectedly changed"); + + lance_dataset_close(after); + lance_dataset_close(ds); + printf("OK\n"); +} + static void test_error_handling(void) { printf(" test_error_handling... "); @@ -214,6 +233,7 @@ int main(int argc, char **argv) { test_scan(uri); test_scan_with_limit(uri); test_versions(uri); + test_restore_noop(uri); test_error_handling(); printf("All C tests passed!\n"); diff --git a/tests/cpp/test_cpp_api.cpp b/tests/cpp/test_cpp_api.cpp index 9756503..7fe8a41 100644 --- a/tests/cpp/test_cpp_api.cpp +++ b/tests/cpp/test_cpp_api.cpp @@ -156,6 +156,20 @@ static void test_versions(const std::string& uri) { PASS(); } +// Restore to the dataset's own current version — a no-op that returns a +// fresh handle at the same version. +static void test_restore_noop(const std::string& uri) { + TEST(test_restore_noop); + + auto ds = lance::Dataset::open(uri); + uint64_t current = ds.version(); + + auto after = ds.restore(current); + assert(after.version() == current); + + PASS(); +} + static void test_error_exception(const std::string& /*uri*/) { TEST(test_error_exception); @@ -188,6 +202,7 @@ int main(int argc, char** argv) { test_dataset_take(uri); test_raii_cleanup(uri); test_versions(uri); + test_restore_noop(uri); test_error_exception(uri); printf("All C++ tests passed!\n"); From bf0c3e4c6c54d5af42210b7c554ed701db9eaae2 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 27 Apr 2026 13:52:48 +0800 Subject: [PATCH 2/2] fix(restore): always commit fresh manifest to defeat TOCTOU Drops the skip-when-version==latest optimization in lance_dataset_restore. The prior fast-path read latest_version_id before deciding whether to commit, opening a TOCTOU window: a concurrent writer landing a newer manifest in between would silently leave their version as latest, even though the caller's stated intent was "make `version` the new latest". Always calling restore() costs at most one extra manifest in the rare case the caller intentionally restores to the current latest, and makes the API's contract hold unconditionally under concurrent writers. Updates Rust + C + C++ tests to assert the new always-bumps behavior. Addresses review on PR #18. --- src/restore.rs | 18 +++++++++--------- tests/c_api_test.rs | 16 +++++++++++----- tests/cpp/test_c_api.c | 15 ++++++++------- tests/cpp/test_cpp_api.cpp | 13 +++++++------ 4 files changed, 35 insertions(+), 27 deletions(-) diff --git a/src/restore.rs b/src/restore.rs index 4ba94f2..28b2ba7 100644 --- a/src/restore.rs +++ b/src/restore.rs @@ -23,8 +23,9 @@ use crate::runtime::block_on; /// "latest" sentinel by `lance_dataset_open` and is rejected here with /// `LANCE_ERR_INVALID_ARGUMENT`. /// -/// If `version` is already the dataset's latest, the call succeeds as a -/// no-op without writing a new manifest. +/// A new manifest is always written, even when `version` already matches the +/// current latest — this guarantees the caller's stated intent ("make `version` +/// the new latest") holds under concurrent writers without a TOCTOU race. /// /// Returns a fresh `LanceDataset*` positioned at the target version on success /// (caller closes with `lance_dataset_close`), or NULL on error. Errors include @@ -54,15 +55,14 @@ unsafe fn restore_inner(dataset: *const LanceDataset, version: u64) -> Result<*m let ds = unsafe { &*dataset }; - // Check out the target version, then commit a new manifest that aliases - // its fragments as the new latest. If the target is already the latest, - // skip the commit — the checkout alone is enough. + // Check out the target version, then always commit a new manifest that + // aliases its fragments as the new latest. Skipping the commit when + // `version == latest` would open a TOCTOU window: a concurrent writer + // could land a newer manifest between the read and the comparison, and + // we'd silently leave their version as latest instead of the caller's. let restored = block_on(async { - let latest = ds.inner.latest_version_id().await?; let mut checked_out = ds.inner.checkout_version(version).await?; - if version != latest { - checked_out.restore().await?; - } + checked_out.restore().await?; Ok::<_, lance_core::Error>(checked_out) })?; diff --git a/tests/c_api_test.rs b/tests/c_api_test.rs index d025fc9..b162e19 100644 --- a/tests/c_api_test.rs +++ b/tests/c_api_test.rs @@ -1831,7 +1831,12 @@ fn test_dataset_restore_to_prior_version() { } #[test] -fn test_dataset_restore_to_current_latest_is_noop() { +fn test_dataset_restore_to_current_latest_writes_new_manifest() { + // Restoring to the current latest still writes a new manifest. The + // optimization that previously skipped the commit was racy: a concurrent + // writer could land a newer manifest between the staleness check and the + // skip, silently leaving their version as latest. We always commit so the + // caller's "make `version` the new latest" intent holds unconditionally. let (_tmp, uri) = create_two_version_dataset(); let c_uri = c_str(&uri); let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; @@ -1842,14 +1847,15 @@ fn test_dataset_restore_to_current_latest_is_noop() { assert!(!restored.is_null()); assert_eq!( unsafe { lance_dataset_version(restored) }, - latest, - "restore to latest must not bump the version" + latest + 1, + "restore to latest must commit a new manifest to defeat TOCTOU races" ); + assert_eq!(unsafe { lance_dataset_count_rows(restored) }, 7); - // No new manifest: reopening the dataset reports the same latest. + // Reopening the dataset reports the bumped latest. unsafe { lance_dataset_close(restored) }; let ds2 = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; - assert_eq!(unsafe { lance_dataset_version(ds2) }, latest); + assert_eq!(unsafe { lance_dataset_version(ds2) }, latest + 1); unsafe { lance_dataset_close(ds2) }; unsafe { lance_dataset_close(ds) }; diff --git a/tests/cpp/test_c_api.c b/tests/cpp/test_c_api.c index 61f54a1..a2dd547 100644 --- a/tests/cpp/test_c_api.c +++ b/tests/cpp/test_c_api.c @@ -179,11 +179,11 @@ static void test_versions(const char *uri) { printf("OK\n"); } -/* Restore the dataset to its own current version — expected to be a no-op - * (same version id, no new manifest written). This is the only restore path - * that's safe to run against the shared read-only test dataset. */ -static void test_restore_noop(const char *uri) { - printf(" test_restore_noop... "); +/* Restore the dataset to its own current version — always commits a new + * manifest (no skip-if-equal optimization) so the caller's "make `version` + * the new latest" intent holds even under concurrent writers. */ +static void test_restore_to_current(const char *uri) { + printf(" test_restore_to_current... "); LanceDataset *ds = lance_dataset_open(uri, NULL, 0); ASSERT(ds != NULL, "open failed"); @@ -191,7 +191,8 @@ static void test_restore_noop(const char *uri) { LanceDataset *after = lance_dataset_restore(ds, current); ASSERT(after != NULL, "restore failed"); - ASSERT(lance_dataset_version(after) == current, "version unexpectedly changed"); + ASSERT(lance_dataset_version(after) == current + 1, + "restore must bump the version to commit a fresh manifest"); lance_dataset_close(after); lance_dataset_close(ds); @@ -233,7 +234,7 @@ int main(int argc, char **argv) { test_scan(uri); test_scan_with_limit(uri); test_versions(uri); - test_restore_noop(uri); + test_restore_to_current(uri); test_error_handling(); printf("All C tests passed!\n"); diff --git a/tests/cpp/test_cpp_api.cpp b/tests/cpp/test_cpp_api.cpp index 7fe8a41..1dddf98 100644 --- a/tests/cpp/test_cpp_api.cpp +++ b/tests/cpp/test_cpp_api.cpp @@ -156,16 +156,17 @@ static void test_versions(const std::string& uri) { PASS(); } -// Restore to the dataset's own current version — a no-op that returns a -// fresh handle at the same version. -static void test_restore_noop(const std::string& uri) { - TEST(test_restore_noop); +// Restore to the dataset's own current version — always commits a new +// manifest (no skip-if-equal optimization) to defeat TOCTOU races against +// concurrent writers. +static void test_restore_to_current(const std::string& uri) { + TEST(test_restore_to_current); auto ds = lance::Dataset::open(uri); uint64_t current = ds.version(); auto after = ds.restore(current); - assert(after.version() == current); + assert(after.version() == current + 1); PASS(); } @@ -202,7 +203,7 @@ int main(int argc, char** argv) { test_dataset_take(uri); test_raii_cleanup(uri); test_versions(uri); - test_restore_noop(uri); + test_restore_to_current(uri); test_error_exception(uri); printf("All C++ tests passed!\n");