Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions include/lance.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,22 @@ int64_t lance_versions_timestamp_ms_at(const LanceVersions* versions, size_t ind
/** Close and free a versions handle. Safe to call with NULL. */
void lance_versions_close(LanceVersions* versions);

/**
* Restore the dataset to an older version by committing a new manifest that
* carries the fragments of `version`. If `version` is already the latest,
* succeeds as a no-op without writing a new manifest.
*
* @param dataset Open dataset (not consumed). Must not be NULL.
* @param version Target version id (>= 1). `0` is rejected since it is the
* "latest" sentinel used by lance_dataset_open.
* @return Fresh LanceDataset* positioned at the target version (caller closes
* with lance_dataset_close), or NULL on error. Possible error codes
* include LANCE_ERR_INVALID_ARGUMENT (NULL handle or version == 0),
* LANCE_ERR_NOT_FOUND (unknown version),
* LANCE_ERR_COMMIT_CONFLICT (concurrent writer).
*/
LanceDataset* lance_dataset_restore(const LanceDataset* dataset, uint64_t version);

/**
* Export the dataset schema via Arrow C Data Interface.
* @param out Pointer to caller-allocated ArrowSchema struct
Expand Down
10 changes: 10 additions & 0 deletions include/lance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ class Dataset {
return out;
}

/// Commit a new manifest that aliases `version` as the latest. The
/// returned Dataset points at the target version; this handle is
/// unchanged. If `version` is already the latest, no new manifest is
/// written. Throws lance::Error on failure.
Dataset restore(uint64_t version) const {
auto* out = lance_dataset_restore(handle_.get(), version);
if (!out) check_error();
return Dataset(out);
}

/// Export the schema as an Arrow C Data Interface struct.
void schema(ArrowSchema* out) const {
if (lance_dataset_schema(handle_.get(), out) != 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod error;
mod fragment_writer;
mod helpers;
mod index;
mod restore;
pub mod runtime;
mod scanner;
mod versions;
Expand All @@ -34,5 +35,6 @@ pub use error::{
};
pub use fragment_writer::*;
pub use index::*;
pub use restore::*;
pub use scanner::*;
pub use versions::*;
74 changes: 74 additions & 0 deletions src/restore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Restore C API: move a dataset's latest back to an older version by
//! committing a new manifest whose fragments match the chosen version.
//!
//! Returns a fresh `LanceDataset*` positioned at the target version; the
//! caller's original handle is untouched and remains usable.

use std::sync::{Arc, RwLock};

use lance_core::Result;

use crate::dataset::LanceDataset;
use crate::error::ffi_try;
use crate::runtime::block_on;

/// Restore the dataset to an older version by committing a new manifest that
/// carries the fragments of `version`.
///
/// - `dataset`: Open dataset (not consumed). Must not be NULL.
/// - `version`: Target version id. Must be `>= 1`; `0` is reserved as the
/// "latest" sentinel by `lance_dataset_open` and is rejected here with
/// `LANCE_ERR_INVALID_ARGUMENT`.
///
/// A new manifest is always written, even when `version` already matches the
/// current latest — this guarantees the caller's stated intent ("make `version`
/// the new latest") holds under concurrent writers without a TOCTOU race.
///
/// Returns a fresh `LanceDataset*` positioned at the target version on success
/// (caller closes with `lance_dataset_close`), or NULL on error. Errors include
/// `LANCE_ERR_NOT_FOUND` for an unknown `version` and `LANCE_ERR_COMMIT_CONFLICT`
/// for a concurrent commit race.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn lance_dataset_restore(
dataset: *const LanceDataset,
version: u64,
) -> *mut LanceDataset {
ffi_try!(unsafe { restore_inner(dataset, version) }, null)
}

unsafe fn restore_inner(dataset: *const LanceDataset, version: u64) -> Result<*mut LanceDataset> {
if dataset.is_null() {
return Err(lance_core::Error::InvalidInput {
source: "dataset must not be NULL".into(),
location: snafu::location!(),
});
}
if version == 0 {
return Err(lance_core::Error::InvalidInput {
source: "version must be >= 1; 0 is reserved as the \"latest\" sentinel".into(),
location: snafu::location!(),
});
}

let ds = unsafe { &*dataset };
let snap = ds.snapshot();

// Check out the target version, then always commit a new manifest that
// aliases its fragments as the new latest. Skipping the commit when
// `version == latest` would open a TOCTOU window: a concurrent writer
// could land a newer manifest between the read and the comparison, and
// we'd silently leave their version as latest instead of the caller's.
let restored = block_on(async {
let mut checked_out = snap.checkout_version(version).await?;
checked_out.restore().await?;
Ok::<_, lance_core::Error>(checked_out)
})?;

let handle = LanceDataset {
inner: RwLock::new(Arc::new(restored)),
};
Ok(Box::into_raw(Box::new(handle)))
}
110 changes: 110 additions & 0 deletions tests/c_api_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,116 @@ fn test_versions_close_null_is_safe() {
unsafe { lance_versions_close(ptr::null_mut()) };
}

// ---------------------------------------------------------------------------
// Restore (lance_dataset_restore)
// ---------------------------------------------------------------------------

/// Helper: set up a dataset with two versions — initial create (rows 1..=5)
/// plus an append (rows 6..=7), returning `(tempdir, uri)`.
fn create_two_version_dataset() -> (tempfile::TempDir, String) {
let (tmp, uri) = create_test_dataset();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![6, 7])),
Arc::new(StringArray::from(vec!["frank", "grace"])),
],
)
.unwrap();
append_batch(&uri, schema, batch);
(tmp, uri)
}

#[test]
fn test_dataset_restore_to_prior_version() {
let (_tmp, uri) = create_two_version_dataset();
let c_uri = c_str(&uri);
let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) };
assert_eq!(unsafe { lance_dataset_version(ds) }, 2);
assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 7);

// Restore to V1 — expect a fresh handle at a new version (3) with V1's
// row count (5).
let restored = unsafe { lance_dataset_restore(ds, 1) };
assert!(!restored.is_null());
assert_eq!(unsafe { lance_dataset_version(restored) }, 3);
assert_eq!(unsafe { lance_dataset_count_rows(restored) }, 5);

// Original handle is untouched.
assert_eq!(unsafe { lance_dataset_version(ds) }, 2);

unsafe { lance_dataset_close(restored) };
unsafe { lance_dataset_close(ds) };
}

#[test]
fn test_dataset_restore_to_current_latest_writes_new_manifest() {
// Restoring to the current latest still writes a new manifest. The
// optimization that previously skipped the commit was racy: a concurrent
// writer could land a newer manifest between the staleness check and the
// skip, silently leaving their version as latest. We always commit so the
// caller's "make `version` the new latest" intent holds unconditionally.
let (_tmp, uri) = create_two_version_dataset();
let c_uri = c_str(&uri);
let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) };
let latest = unsafe { lance_dataset_version(ds) };
assert_eq!(latest, 2);

let restored = unsafe { lance_dataset_restore(ds, latest) };
assert!(!restored.is_null());
assert_eq!(
unsafe { lance_dataset_version(restored) },
latest + 1,
"restore to latest must commit a new manifest to defeat TOCTOU races"
);
assert_eq!(unsafe { lance_dataset_count_rows(restored) }, 7);

// Reopening the dataset reports the bumped latest.
unsafe { lance_dataset_close(restored) };
let ds2 = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) };
assert_eq!(unsafe { lance_dataset_version(ds2) }, latest + 1);

unsafe { lance_dataset_close(ds2) };
unsafe { lance_dataset_close(ds) };
}

#[test]
fn test_dataset_restore_nonexistent_version() {
let (_tmp, uri) = create_test_dataset();
let c_uri = c_str(&uri);
let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) };

let restored = unsafe { lance_dataset_restore(ds, 999) };
assert!(restored.is_null());
assert_eq!(lance_last_error_code(), LanceErrorCode::NotFound);

unsafe { lance_dataset_close(ds) };
}

#[test]
fn test_dataset_restore_version_zero_rejected() {
let (_tmp, uri) = create_test_dataset();
let c_uri = c_str(&uri);
let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) };

let restored = unsafe { lance_dataset_restore(ds, 0) };
assert!(restored.is_null());
assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument);

unsafe { lance_dataset_close(ds) };
}

#[test]
fn test_dataset_restore_null_dataset_rejected() {
let restored = unsafe { lance_dataset_restore(ptr::null(), 1) };
assert!(restored.is_null());
assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument);
}

// ---------------------------------------------------------------------------
// Index lifecycle tests (Phase 2)
// ---------------------------------------------------------------------------
Expand Down
21 changes: 21 additions & 0 deletions tests/cpp/test_c_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,26 @@ static void test_versions(const char *uri) {
printf("OK\n");
}

/* Restore the dataset to its own current version — always commits a new
* manifest (no skip-if-equal optimization) so the caller's "make `version`
* the new latest" intent holds even under concurrent writers. */
static void test_restore_to_current(const char *uri) {
printf(" test_restore_to_current... ");

LanceDataset *ds = lance_dataset_open(uri, NULL, 0);
ASSERT(ds != NULL, "open failed");
uint64_t current = lance_dataset_version(ds);

LanceDataset *after = lance_dataset_restore(ds, current);
ASSERT(after != NULL, "restore failed");
ASSERT(lance_dataset_version(after) == current + 1,
"restore must bump the version to commit a fresh manifest");

lance_dataset_close(after);
lance_dataset_close(ds);
printf("OK\n");
}

static void test_error_handling(void) {
printf(" test_error_handling... ");

Expand Down Expand Up @@ -214,6 +234,7 @@ int main(int argc, char **argv) {
test_scan(uri);
test_scan_with_limit(uri);
test_versions(uri);
test_restore_to_current(uri);
test_error_handling();

printf("All C tests passed!\n");
Expand Down
16 changes: 16 additions & 0 deletions tests/cpp/test_cpp_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,21 @@ static void test_versions(const std::string& uri) {
PASS();
}

// Restore to the dataset's own current version — always commits a new
// manifest (no skip-if-equal optimization) to defeat TOCTOU races against
// concurrent writers.
static void test_restore_to_current(const std::string& uri) {
TEST(test_restore_to_current);

auto ds = lance::Dataset::open(uri);
uint64_t current = ds.version();

auto after = ds.restore(current);
assert(after.version() == current + 1);

PASS();
}

static void test_error_exception(const std::string& /*uri*/) {
TEST(test_error_exception);

Expand Down Expand Up @@ -276,6 +291,7 @@ int main(int argc, char** argv) {
test_dataset_take(uri);
test_raii_cleanup(uri);
test_versions(uri);
test_restore_to_current(uri);
test_error_exception(uri);
test_index_lifecycle(uri);
test_nearest_smoke(uri);
Expand Down
Loading