diff --git a/include/lance.h b/include/lance.h index 1d53ee0..bfe7616 100644 --- a/include/lance.h +++ b/include/lance.h @@ -200,6 +200,22 @@ int64_t lance_versions_timestamp_ms_at(const LanceVersions* versions, size_t ind /** Close and free a versions handle. Safe to call with NULL. */ void lance_versions_close(LanceVersions* versions); +/** + * Restore the dataset to an older version by committing a new manifest that + * carries the fragments of `version`. If `version` is already the latest, + * succeeds as a no-op without writing a new manifest. + * + * @param dataset Open dataset (not consumed). Must not be NULL. + * @param version Target version id (>= 1). `0` is rejected since it is the + * "latest" sentinel used by lance_dataset_open. + * @return Fresh LanceDataset* positioned at the target version (caller closes + * with lance_dataset_close), or NULL on error. Possible error codes + * include LANCE_ERR_INVALID_ARGUMENT (NULL handle or version == 0), + * LANCE_ERR_NOT_FOUND (unknown version), + * LANCE_ERR_COMMIT_CONFLICT (concurrent writer). + */ +LanceDataset* lance_dataset_restore(const LanceDataset* dataset, uint64_t version); + /** * Export the dataset schema via Arrow C Data Interface. * @param out Pointer to caller-allocated ArrowSchema struct diff --git a/include/lance.hpp b/include/lance.hpp index eaee7f9..f611500 100644 --- a/include/lance.hpp +++ b/include/lance.hpp @@ -162,6 +162,16 @@ class Dataset { return out; } + /// Commit a new manifest that aliases `version` as the latest. The + /// returned Dataset points at the target version; this handle is + /// unchanged. If `version` is already the latest, no new manifest is + /// written. Throws lance::Error on failure. + Dataset restore(uint64_t version) const { + auto* out = lance_dataset_restore(handle_.get(), version); + if (!out) check_error(); + return Dataset(out); + } + /// Export the schema as an Arrow C Data Interface struct. void schema(ArrowSchema* out) const { if (lance_dataset_schema(handle_.get(), out) != 0) { diff --git a/src/lib.rs b/src/lib.rs index 53535a1..c26fdbf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,7 @@ mod error; mod fragment_writer; mod helpers; mod index; +mod restore; pub mod runtime; mod scanner; mod versions; @@ -34,5 +35,6 @@ pub use error::{ }; pub use fragment_writer::*; pub use index::*; +pub use restore::*; pub use scanner::*; pub use versions::*; diff --git a/src/restore.rs b/src/restore.rs new file mode 100644 index 0000000..0fd43bf --- /dev/null +++ b/src/restore.rs @@ -0,0 +1,74 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Restore C API: move a dataset's latest back to an older version by +//! committing a new manifest whose fragments match the chosen version. +//! +//! Returns a fresh `LanceDataset*` positioned at the target version; the +//! caller's original handle is untouched and remains usable. + +use std::sync::{Arc, RwLock}; + +use lance_core::Result; + +use crate::dataset::LanceDataset; +use crate::error::ffi_try; +use crate::runtime::block_on; + +/// Restore the dataset to an older version by committing a new manifest that +/// carries the fragments of `version`. +/// +/// - `dataset`: Open dataset (not consumed). Must not be NULL. +/// - `version`: Target version id. Must be `>= 1`; `0` is reserved as the +/// "latest" sentinel by `lance_dataset_open` and is rejected here with +/// `LANCE_ERR_INVALID_ARGUMENT`. +/// +/// A new manifest is always written, even when `version` already matches the +/// current latest — this guarantees the caller's stated intent ("make `version` +/// the new latest") holds under concurrent writers without a TOCTOU race. +/// +/// Returns a fresh `LanceDataset*` positioned at the target version on success +/// (caller closes with `lance_dataset_close`), or NULL on error. Errors include +/// `LANCE_ERR_NOT_FOUND` for an unknown `version` and `LANCE_ERR_COMMIT_CONFLICT` +/// for a concurrent commit race. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_restore( + dataset: *const LanceDataset, + version: u64, +) -> *mut LanceDataset { + ffi_try!(unsafe { restore_inner(dataset, version) }, null) +} + +unsafe fn restore_inner(dataset: *const LanceDataset, version: u64) -> Result<*mut LanceDataset> { + if dataset.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "dataset must not be NULL".into(), + location: snafu::location!(), + }); + } + if version == 0 { + return Err(lance_core::Error::InvalidInput { + source: "version must be >= 1; 0 is reserved as the \"latest\" sentinel".into(), + location: snafu::location!(), + }); + } + + let ds = unsafe { &*dataset }; + let snap = ds.snapshot(); + + // Check out the target version, then always commit a new manifest that + // aliases its fragments as the new latest. Skipping the commit when + // `version == latest` would open a TOCTOU window: a concurrent writer + // could land a newer manifest between the read and the comparison, and + // we'd silently leave their version as latest instead of the caller's. + let restored = block_on(async { + let mut checked_out = snap.checkout_version(version).await?; + checked_out.restore().await?; + Ok::<_, lance_core::Error>(checked_out) + })?; + + let handle = LanceDataset { + inner: RwLock::new(Arc::new(restored)), + }; + Ok(Box::into_raw(Box::new(handle))) +} diff --git a/tests/c_api_test.rs b/tests/c_api_test.rs index eb0b0eb..75f9840 100644 --- a/tests/c_api_test.rs +++ b/tests/c_api_test.rs @@ -1785,6 +1785,116 @@ fn test_versions_close_null_is_safe() { unsafe { lance_versions_close(ptr::null_mut()) }; } +// --------------------------------------------------------------------------- +// Restore (lance_dataset_restore) +// --------------------------------------------------------------------------- + +/// Helper: set up a dataset with two versions — initial create (rows 1..=5) +/// plus an append (rows 6..=7), returning `(tempdir, uri)`. +fn create_two_version_dataset() -> (tempfile::TempDir, String) { + let (tmp, uri) = create_test_dataset(); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![6, 7])), + Arc::new(StringArray::from(vec!["frank", "grace"])), + ], + ) + .unwrap(); + append_batch(&uri, schema, batch); + (tmp, uri) +} + +#[test] +fn test_dataset_restore_to_prior_version() { + let (_tmp, uri) = create_two_version_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert_eq!(unsafe { lance_dataset_version(ds) }, 2); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 7); + + // Restore to V1 — expect a fresh handle at a new version (3) with V1's + // row count (5). + let restored = unsafe { lance_dataset_restore(ds, 1) }; + assert!(!restored.is_null()); + assert_eq!(unsafe { lance_dataset_version(restored) }, 3); + assert_eq!(unsafe { lance_dataset_count_rows(restored) }, 5); + + // Original handle is untouched. + assert_eq!(unsafe { lance_dataset_version(ds) }, 2); + + unsafe { lance_dataset_close(restored) }; + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_dataset_restore_to_current_latest_writes_new_manifest() { + // Restoring to the current latest still writes a new manifest. The + // optimization that previously skipped the commit was racy: a concurrent + // writer could land a newer manifest between the staleness check and the + // skip, silently leaving their version as latest. We always commit so the + // caller's "make `version` the new latest" intent holds unconditionally. + let (_tmp, uri) = create_two_version_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + let latest = unsafe { lance_dataset_version(ds) }; + assert_eq!(latest, 2); + + let restored = unsafe { lance_dataset_restore(ds, latest) }; + assert!(!restored.is_null()); + assert_eq!( + unsafe { lance_dataset_version(restored) }, + latest + 1, + "restore to latest must commit a new manifest to defeat TOCTOU races" + ); + assert_eq!(unsafe { lance_dataset_count_rows(restored) }, 7); + + // Reopening the dataset reports the bumped latest. + unsafe { lance_dataset_close(restored) }; + let ds2 = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert_eq!(unsafe { lance_dataset_version(ds2) }, latest + 1); + + unsafe { lance_dataset_close(ds2) }; + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_dataset_restore_nonexistent_version() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let restored = unsafe { lance_dataset_restore(ds, 999) }; + assert!(restored.is_null()); + assert_eq!(lance_last_error_code(), LanceErrorCode::NotFound); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_dataset_restore_version_zero_rejected() { + let (_tmp, uri) = create_test_dataset(); + let c_uri = c_str(&uri); + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + + let restored = unsafe { lance_dataset_restore(ds, 0) }; + assert!(restored.is_null()); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_dataset_restore_null_dataset_rejected() { + let restored = unsafe { lance_dataset_restore(ptr::null(), 1) }; + assert!(restored.is_null()); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); +} + // --------------------------------------------------------------------------- // Index lifecycle tests (Phase 2) // --------------------------------------------------------------------------- diff --git a/tests/cpp/test_c_api.c b/tests/cpp/test_c_api.c index c0eeb1d..a2dd547 100644 --- a/tests/cpp/test_c_api.c +++ b/tests/cpp/test_c_api.c @@ -179,6 +179,26 @@ static void test_versions(const char *uri) { printf("OK\n"); } +/* Restore the dataset to its own current version — always commits a new + * manifest (no skip-if-equal optimization) so the caller's "make `version` + * the new latest" intent holds even under concurrent writers. */ +static void test_restore_to_current(const char *uri) { + printf(" test_restore_to_current... "); + + LanceDataset *ds = lance_dataset_open(uri, NULL, 0); + ASSERT(ds != NULL, "open failed"); + uint64_t current = lance_dataset_version(ds); + + LanceDataset *after = lance_dataset_restore(ds, current); + ASSERT(after != NULL, "restore failed"); + ASSERT(lance_dataset_version(after) == current + 1, + "restore must bump the version to commit a fresh manifest"); + + lance_dataset_close(after); + lance_dataset_close(ds); + printf("OK\n"); +} + static void test_error_handling(void) { printf(" test_error_handling... "); @@ -214,6 +234,7 @@ int main(int argc, char **argv) { test_scan(uri); test_scan_with_limit(uri); test_versions(uri); + test_restore_to_current(uri); test_error_handling(); printf("All C tests passed!\n"); diff --git a/tests/cpp/test_cpp_api.cpp b/tests/cpp/test_cpp_api.cpp index 3fa5fe5..be49850 100644 --- a/tests/cpp/test_cpp_api.cpp +++ b/tests/cpp/test_cpp_api.cpp @@ -156,6 +156,21 @@ static void test_versions(const std::string& uri) { PASS(); } +// Restore to the dataset's own current version — always commits a new +// manifest (no skip-if-equal optimization) to defeat TOCTOU races against +// concurrent writers. +static void test_restore_to_current(const std::string& uri) { + TEST(test_restore_to_current); + + auto ds = lance::Dataset::open(uri); + uint64_t current = ds.version(); + + auto after = ds.restore(current); + assert(after.version() == current + 1); + + PASS(); +} + static void test_error_exception(const std::string& /*uri*/) { TEST(test_error_exception); @@ -276,6 +291,7 @@ int main(int argc, char** argv) { test_dataset_take(uri); test_raii_cleanup(uri); test_versions(uri); + test_restore_to_current(uri); test_error_exception(uri); test_index_lifecycle(uri); test_nearest_smoke(uri);