From d1aec0a28cdd732d14266724c05e255f90ba2a2b Mon Sep 17 00:00:00 2001 From: lijinglun Date: Thu, 8 Jan 2026 19:23:14 +0800 Subject: [PATCH] feat: add strict commit isolation flag for dataset commits --- java/lance-jni/src/blocking_dataset.rs | 1 + rust/lance-table/src/io/commit.rs | 4 ++ rust/lance/src/dataset.rs | 11 +++- rust/lance/src/dataset/fragment.rs | 36 +++++++++--- .../src/dataset/tests/dataset_merge_update.rs | 8 +++ .../src/dataset/tests/dataset_versioning.rs | 1 + rust/lance/src/dataset/write/commit.rs | 55 +++++++++++++++++++ rust/lance/src/io/commit.rs | 12 ++++ rust/lance/src/utils/test.rs | 1 + 9 files changed, 120 insertions(+), 9 deletions(-) diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index 04ca343a98e..03a6b999297 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -176,6 +176,7 @@ impl BlockingDataset { None, Default::default(), false, // TODO: support enable_v2_manifest_paths + false, // serial_commit ))?; Ok(Self { inner }) } diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index be650beac50..79f2302dfac 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -1113,6 +1113,9 @@ impl Debug for ConditionalPutCommitHandler { pub struct CommitConfig { pub num_retries: u32, pub skip_auto_cleanup: bool, + /// When true, fail immediately if any concurrent transactions have been committed + /// since the read version of the transaction being committed. + pub serial_commit: bool, // TODO: add isolation_level } @@ -1121,6 +1124,7 @@ impl Default for CommitConfig { Self { num_retries: 20, skip_auto_cleanup: false, + serial_commit: false, } } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 0ff3cb6873a..05a6da1d351 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1259,6 +1259,7 @@ impl Dataset { commit_handler: Option>, session: Arc, enable_v2_manifest_paths: bool, + serial_commit: bool, detached: bool, ) -> Result { let read_version = read_version.map_or_else( @@ -1277,7 +1278,8 @@ impl Dataset { let mut builder = CommitBuilder::new(base_uri) .enable_v2_manifest_paths(enable_v2_manifest_paths) .with_session(session) - .with_detached(detached); + .with_detached(detached) + .with_serial_commit(serial_commit); if let Some(store_params) = store_params { builder = builder.with_store_params(store_params); @@ -1324,6 +1326,10 @@ impl Dataset { /// dataset, use the [`Self::migrate_manifest_paths_v2`] method. WARNING: turning /// this on will make the dataset unreadable for older versions of Lance /// (prior to 0.17.0). Default is False. + /// * `serial_commit` - When true, the commit will fail if any transactions + /// have been committed since `read_version`, enforcing strict serial ordering. When + /// false (the default), the commit may be automatically rebased on concurrent updates. + #[allow(clippy::too_many_arguments)] pub async fn commit( dest: impl Into>, operation: Operation, @@ -1332,6 +1338,7 @@ impl Dataset { commit_handler: Option>, session: Arc, enable_v2_manifest_paths: bool, + serial_commit: bool, ) -> Result { Self::do_commit( dest.into(), @@ -1341,6 +1348,7 @@ impl Dataset { commit_handler, session, enable_v2_manifest_paths, + serial_commit, /*detached=*/ false, ) .await @@ -1371,6 +1379,7 @@ impl Dataset { commit_handler, session, enable_v2_manifest_paths, + /*serial_commit=*/ false, /*detached=*/ true, ) .await diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 099b883d355..9a805fdbe0e 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2868,6 +2868,7 @@ mod tests { None, Default::default(), true, + false, ) .await .unwrap(); @@ -2941,6 +2942,7 @@ mod tests { None, Default::default(), true, + false, ) .await .unwrap(); @@ -3400,10 +3402,18 @@ mod tests { initial_bases: None, }; - let new_dataset = - Dataset::commit(test_uri, op, None, None, None, Default::default(), false) - .await - .unwrap(); + let new_dataset = Dataset::commit( + test_uri, + op, + None, + None, + None, + Default::default(), + false, + false, + ) + .await + .unwrap(); assert_eq!(new_dataset.count_rows(None).await.unwrap(), dataset_rows); @@ -3510,10 +3520,18 @@ mod tests { initial_bases: None, }; - let dataset = - Dataset::commit(test_uri, op, None, None, None, Default::default(), false) - .await - .unwrap(); + let dataset = Dataset::commit( + test_uri, + op, + None, + None, + None, + Default::default(), + false, + false, + ) + .await + .unwrap(); // We only kept the first fragment of 40 rows assert_eq!( @@ -3750,6 +3768,7 @@ mod tests { None, Default::default(), false, + false, ) .await?; @@ -3885,6 +3904,7 @@ mod tests { None, Default::default(), false, + false, ) .await .unwrap(); diff --git a/rust/lance/src/dataset/tests/dataset_merge_update.rs b/rust/lance/src/dataset/tests/dataset_merge_update.rs index aa35f1b6408..ce4a227b235 100644 --- a/rust/lance/src/dataset/tests/dataset_merge_update.rs +++ b/rust/lance/src/dataset/tests/dataset_merge_update.rs @@ -674,6 +674,7 @@ async fn test_datafile_replacement() { None, Arc::new(Default::default()), false, + false, ) .await .unwrap(); @@ -703,6 +704,7 @@ async fn test_datafile_replacement() { None, Arc::new(Default::default()), false, + false, ) .await .unwrap(); @@ -757,6 +759,7 @@ async fn test_datafile_replacement() { None, Arc::new(Default::default()), false, + false, ) .await .unwrap(); @@ -820,6 +823,7 @@ async fn test_datafile_partial_replacement() { None, Arc::new(Default::default()), false, + false, ) .await .unwrap(); @@ -873,6 +877,7 @@ async fn test_datafile_partial_replacement() { None, Arc::new(Default::default()), false, + false, ) .await .unwrap(); @@ -928,6 +933,7 @@ async fn test_datafile_partial_replacement() { None, Arc::new(Default::default()), false, + false, ) .await .unwrap(); @@ -1000,6 +1006,7 @@ async fn test_datafile_replacement_error() { None, Arc::new(Default::default()), false, + false, ) .await .unwrap(); @@ -1032,6 +1039,7 @@ async fn test_datafile_replacement_error() { None, Arc::new(Default::default()), false, + false, ) .await .unwrap_err(); diff --git a/rust/lance/src/dataset/tests/dataset_versioning.rs b/rust/lance/src/dataset/tests/dataset_versioning.rs index 2e2fcdf6601..18a45404472 100644 --- a/rust/lance/src/dataset/tests/dataset_versioning.rs +++ b/rust/lance/src/dataset/tests/dataset_versioning.rs @@ -116,6 +116,7 @@ async fn test_v2_manifest_path_commit() { None, Default::default(), true, // enable_v2_manifest_paths + false, ) .await .unwrap(); diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 5070ee5e65f..2a6bd92bf9b 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -163,6 +163,13 @@ impl<'a> CommitBuilder<'a> { self } + /// When enabled, fail the commit if any transactions have been committed since + /// the transaction's read version. This enforces strict serial ordering. + pub fn with_serial_commit(mut self, serial_commit: bool) -> Self { + self.commit_config.serial_commit = serial_commit; + self + } + /// Provide the set of row addresses that were deleted or updated. This is /// used to perform fast conflict resolution. pub fn with_affected_rows(mut self, affected_rows: RowAddrTreeMap) -> Self { @@ -725,6 +732,54 @@ mod tests { assert_io_eq!(io_stats, write_iops, 2); // txn + manifest } + #[tokio::test] + async fn test_serial_commit() { + // Create a dataset with a single committed version. + let session = Arc::new(Session::default()); + let write_params = WriteParams { + session: Some(session.clone()), + ..Default::default() + }; + let data = RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![ArrowField::new( + "a", + DataType::Int32, + false, + )])), + vec![Arc::new(Int32Array::from(vec![0; 5]))], + ) + .unwrap(); + let dataset = InsertBuilder::new("memory://force_fail_any_update") + .with_params(&write_params) + .execute(vec![data]) + .await + .unwrap(); + let dataset = Arc::new(dataset); + let base_read_version = dataset.manifest().version; + assert_eq!(base_read_version, 1); + + // Advance the dataset to version 2 using the same read_version. + let _ = CommitBuilder::new(dataset.clone()) + .execute(sample_transaction(base_read_version)) + .await + .unwrap(); + + // Attempt to commit again using the stale read_version and serial_commit. + let result = CommitBuilder::new(dataset.clone()) + .with_serial_commit(true) + .execute(sample_transaction(base_read_version)) + .await; + + assert!(matches!(result, Err(Error::CommitConflict { .. }))); + + // Successfully commit using the same read_version and non serial_isolation. + CommitBuilder::new(dataset.clone()) + .with_serial_commit(false) + .execute(sample_transaction(base_read_version)) + .await + .unwrap(); + } + #[tokio::test] async fn test_commit_batch() { // Create a dataset diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 5900c92fa8d..1e4abd5935f 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -828,6 +828,18 @@ pub(crate) async fn commit_transaction( if !strict_overwrite { (dataset, other_transactions) = load_and_sort_new_transactions(&dataset).await?; + // In serial commit mode, we fail immediately if any concurrent transactions have been + // committed since the read version of the transaction being committed. + if commit_config.serial_commit && !other_transactions.is_empty() { + return Err(Error::CommitConflict { + version: target_version, + source: + "Concurrent updates detected since read_version with serial_commit enabled" + .into(), + location: location!(), + }); + } + // See if we can retry the commit. Try to account for all // transactions that have been committed since the read_version. // Use small amount of backoff to handle transactions that all diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index dbf0911a68b..a69120f170b 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -124,6 +124,7 @@ impl TestDatasetGenerator { None, Default::default(), false, + false, ) .await .unwrap()