Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ impl BlockingDataset {
None,
Default::default(),
false, // TODO: support enable_v2_manifest_paths
false, // serial_commit
))?;
Ok(Self { inner })
}
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-table/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,9 @@ impl Debug for ConditionalPutCommitHandler {
pub struct CommitConfig {
pub num_retries: u32,
pub skip_auto_cleanup: bool,
/// When true, fail immediately if any concurrent transactions have been committed
/// since the read version of the transaction being committed.
pub serial_commit: bool,
// TODO: add isolation_level
}

Expand All @@ -1121,6 +1124,7 @@ impl Default for CommitConfig {
Self {
num_retries: 20,
skip_auto_cleanup: false,
serial_commit: false,
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,7 @@ impl Dataset {
commit_handler: Option<Arc<dyn CommitHandler>>,
session: Arc<Session>,
enable_v2_manifest_paths: bool,
serial_commit: bool,
detached: bool,
) -> Result<Self> {
let read_version = read_version.map_or_else(
Expand All @@ -1277,7 +1278,8 @@ impl Dataset {
let mut builder = CommitBuilder::new(base_uri)
.enable_v2_manifest_paths(enable_v2_manifest_paths)
.with_session(session)
.with_detached(detached);
.with_detached(detached)
.with_serial_commit(serial_commit);

if let Some(store_params) = store_params {
builder = builder.with_store_params(store_params);
Expand Down Expand Up @@ -1324,6 +1326,10 @@ impl Dataset {
/// dataset, use the [`Self::migrate_manifest_paths_v2`] method. WARNING: turning
/// this on will make the dataset unreadable for older versions of Lance
/// (prior to 0.17.0). Default is False.
/// * `serial_commit` - When true, the commit will fail if any transactions
/// have been committed since `read_version`, enforcing strict serial ordering. When
/// false (the default), the commit may be automatically rebased on concurrent updates.
#[allow(clippy::too_many_arguments)]
pub async fn commit(
dest: impl Into<WriteDestination<'_>>,
operation: Operation,
Expand All @@ -1332,6 +1338,7 @@ impl Dataset {
commit_handler: Option<Arc<dyn CommitHandler>>,
session: Arc<Session>,
enable_v2_manifest_paths: bool,
serial_commit: bool,
) -> Result<Self> {
Self::do_commit(
dest.into(),
Expand All @@ -1341,6 +1348,7 @@ impl Dataset {
commit_handler,
session,
enable_v2_manifest_paths,
serial_commit,
/*detached=*/ false,
)
.await
Expand Down Expand Up @@ -1371,6 +1379,7 @@ impl Dataset {
commit_handler,
session,
enable_v2_manifest_paths,
/*serial_commit=*/ false,
/*detached=*/ true,
)
.await
Expand Down
36 changes: 28 additions & 8 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2868,6 +2868,7 @@ mod tests {
None,
Default::default(),
true,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -2941,6 +2942,7 @@ mod tests {
None,
Default::default(),
true,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -3400,10 +3402,18 @@ mod tests {
initial_bases: None,
};

let new_dataset =
Dataset::commit(test_uri, op, None, None, None, Default::default(), false)
.await
.unwrap();
let new_dataset = Dataset::commit(
test_uri,
op,
None,
None,
None,
Default::default(),
false,
false,
)
.await
.unwrap();

assert_eq!(new_dataset.count_rows(None).await.unwrap(), dataset_rows);

Expand Down Expand Up @@ -3510,10 +3520,18 @@ mod tests {
initial_bases: None,
};

let dataset =
Dataset::commit(test_uri, op, None, None, None, Default::default(), false)
.await
.unwrap();
let dataset = Dataset::commit(
test_uri,
op,
None,
None,
None,
Default::default(),
false,
false,
)
.await
.unwrap();

// We only kept the first fragment of 40 rows
assert_eq!(
Expand Down Expand Up @@ -3750,6 +3768,7 @@ mod tests {
None,
Default::default(),
false,
false,
)
.await?;

Expand Down Expand Up @@ -3885,6 +3904,7 @@ mod tests {
None,
Default::default(),
false,
false,
)
.await
.unwrap();
Expand Down
8 changes: 8 additions & 0 deletions rust/lance/src/dataset/tests/dataset_merge_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ async fn test_datafile_replacement() {
None,
Arc::new(Default::default()),
false,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -703,6 +704,7 @@ async fn test_datafile_replacement() {
None,
Arc::new(Default::default()),
false,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -757,6 +759,7 @@ async fn test_datafile_replacement() {
None,
Arc::new(Default::default()),
false,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -820,6 +823,7 @@ async fn test_datafile_partial_replacement() {
None,
Arc::new(Default::default()),
false,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -873,6 +877,7 @@ async fn test_datafile_partial_replacement() {
None,
Arc::new(Default::default()),
false,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -928,6 +933,7 @@ async fn test_datafile_partial_replacement() {
None,
Arc::new(Default::default()),
false,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1000,6 +1006,7 @@ async fn test_datafile_replacement_error() {
None,
Arc::new(Default::default()),
false,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1032,6 +1039,7 @@ async fn test_datafile_replacement_error() {
None,
Arc::new(Default::default()),
false,
false,
)
.await
.unwrap_err();
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/tests/dataset_versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ async fn test_v2_manifest_path_commit() {
None,
Default::default(),
true, // enable_v2_manifest_paths
false,
)
.await
.unwrap();
Expand Down
55 changes: 55 additions & 0 deletions rust/lance/src/dataset/write/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ impl<'a> CommitBuilder<'a> {
self
}

/// When enabled, fail the commit if any transactions have been committed since
/// the transaction's read version. This enforces strict serial ordering.
pub fn with_serial_commit(mut self, serial_commit: bool) -> Self {
self.commit_config.serial_commit = serial_commit;
self
}

/// Provide the set of row addresses that were deleted or updated. This is
/// used to perform fast conflict resolution.
pub fn with_affected_rows(mut self, affected_rows: RowAddrTreeMap) -> Self {
Expand Down Expand Up @@ -725,6 +732,54 @@ mod tests {
assert_io_eq!(io_stats, write_iops, 2); // txn + manifest
}

#[tokio::test]
async fn test_serial_commit() {
// Create a dataset with a single committed version.
let session = Arc::new(Session::default());
let write_params = WriteParams {
session: Some(session.clone()),
..Default::default()
};
let data = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a",
DataType::Int32,
false,
)])),
vec![Arc::new(Int32Array::from(vec![0; 5]))],
)
.unwrap();
let dataset = InsertBuilder::new("memory://force_fail_any_update")
.with_params(&write_params)
.execute(vec![data])
.await
.unwrap();
let dataset = Arc::new(dataset);
let base_read_version = dataset.manifest().version;
assert_eq!(base_read_version, 1);

// Advance the dataset to version 2 using the same read_version.
let _ = CommitBuilder::new(dataset.clone())
.execute(sample_transaction(base_read_version))
.await
.unwrap();

// Attempt to commit again using the stale read_version and serial_commit.
let result = CommitBuilder::new(dataset.clone())
.with_serial_commit(true)
.execute(sample_transaction(base_read_version))
.await;

assert!(matches!(result, Err(Error::CommitConflict { .. })));

// Successfully commit using the same read_version and non serial_isolation.
CommitBuilder::new(dataset.clone())
.with_serial_commit(false)
.execute(sample_transaction(base_read_version))
.await
.unwrap();
}

#[tokio::test]
async fn test_commit_batch() {
// Create a dataset
Expand Down
12 changes: 12 additions & 0 deletions rust/lance/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,18 @@ pub(crate) async fn commit_transaction(
if !strict_overwrite {
(dataset, other_transactions) = load_and_sort_new_transactions(&dataset).await?;

// In serial commit mode, we fail immediately if any concurrent transactions have been
// committed since the read version of the transaction being committed.
if commit_config.serial_commit && !other_transactions.is_empty() {
return Err(Error::CommitConflict {
version: target_version,
source:
"Concurrent updates detected since read_version with serial_commit enabled"
.into(),
location: location!(),
});
}

// See if we can retry the commit. Try to account for all
// transactions that have been committed since the read_version.
// Use small amount of backoff to handle transactions that all
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/utils/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl TestDatasetGenerator {
None,
Default::default(),
false,
false,
)
.await
.unwrap()
Expand Down
Loading