Skip to content
164 changes: 162 additions & 2 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::transaction::{ActionCommit, TransactionAction};
/// FastAppendAction is a transaction action for fast append data files to the table.
pub struct FastAppendAction {
check_duplicate: bool,
check_added_data_files: bool,
// below are properties used to create SnapshotProducer when commit
commit_uuid: Option<Uuid>,
key_metadata: Option<Vec<u8>>,
Expand All @@ -43,6 +44,7 @@ impl FastAppendAction {
pub(crate) fn new() -> Self {
Self {
check_duplicate: true,
check_added_data_files: true,
commit_uuid: None,
key_metadata: None,
snapshot_properties: HashMap::default(),
Expand All @@ -56,6 +58,12 @@ impl FastAppendAction {
self
}

/// Set whether to check added data files
pub fn with_check_added_data_files(mut self, v: bool) -> Self {
self.check_added_data_files = v;
self
}

/// Add data files to the snapshot.
pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = DataFile>) -> Self {
self.added_data_files.extend(data_files);
Expand Down Expand Up @@ -92,8 +100,10 @@ impl TransactionAction for FastAppendAction {
self.added_data_files.clone(),
);

// validate added files
snapshot_producer.validate_added_data_files()?;
// Checks added files
if self.check_added_data_files {
snapshot_producer.validate_added_data_files()?;
}

// Checks duplicate files
if self.check_duplicate {
Expand Down Expand Up @@ -333,4 +343,154 @@ mod tests {
);
assert_eq!(data_file, *manifest.entries()[0].data_file());
}

#[tokio::test]
async fn test_fast_append_with_check_duplicate_false() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);
let action = tx.fast_append().with_check_duplicate(false); // set with_check_duplicate to false on the action

let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/3.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition_spec_id(table.metadata().default_partition_spec_id())
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap();

let action = action.add_data_files(vec![data_file.clone()]);
let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
let updates = action_commit.take_updates();
let requirements = action_commit.take_requirements();

// check updates and requirements
assert!(
matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH)
);
assert_eq!(
vec![
TableRequirement::UuidMatch {
uuid: table.metadata().uuid()
},
TableRequirement::RefSnapshotIdMatch {
r#ref: MAIN_BRANCH.to_string(),
snapshot_id: table.metadata().current_snapshot_id
}
],
requirements
);

// check manifest list
let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
snapshot
} else {
unreachable!()
};
let manifest_list = new_snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.unwrap();
assert_eq!(1, manifest_list.entries().len());
assert_eq!(
manifest_list.entries()[0].sequence_number,
new_snapshot.sequence_number()
);

// check manifest
let manifest = manifest_list.entries()[0]
.load_manifest(table.file_io())
.await
.unwrap();
assert_eq!(1, manifest.entries().len());
assert_eq!(
new_snapshot.sequence_number(),
manifest.entries()[0]
.sequence_number()
.expect("Inherit sequence number by load manifest")
);

assert_eq!(
new_snapshot.snapshot_id(),
manifest.entries()[0].snapshot_id().unwrap()
);
assert_eq!(data_file, *manifest.entries()[0].data_file());
}

#[tokio::test]
async fn test_fast_append_with_check_added_data_files_false() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);
let action = tx.fast_append().with_check_added_data_files(false); // set with_check_added_data_files to false on the action

let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/3.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition_spec_id(table.metadata().default_partition_spec_id())
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap();

let action = action.add_data_files(vec![data_file.clone()]);
let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
let updates = action_commit.take_updates();
let requirements = action_commit.take_requirements();

// check updates and requirements
assert!(
matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH)
);
assert_eq!(
vec![
TableRequirement::UuidMatch {
uuid: table.metadata().uuid()
},
TableRequirement::RefSnapshotIdMatch {
r#ref: MAIN_BRANCH.to_string(),
snapshot_id: table.metadata().current_snapshot_id
}
],
requirements
);

// check manifest list
let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
snapshot
} else {
unreachable!()
};
let manifest_list = new_snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.unwrap();
assert_eq!(1, manifest_list.entries().len());
assert_eq!(
manifest_list.entries()[0].sequence_number,
new_snapshot.sequence_number()
);

// check manifest
let manifest = manifest_list.entries()[0]
.load_manifest(table.file_io())
.await
.unwrap();
assert_eq!(1, manifest.entries().len());
assert_eq!(
new_snapshot.sequence_number(),
manifest.entries()[0]
.sequence_number()
.expect("Inherit sequence number by load manifest")
);

assert_eq!(
new_snapshot.snapshot_id(),
manifest.entries()[0].snapshot_id().unwrap()
);
assert_eq!(data_file, *manifest.entries()[0].data_file());
}
}
Loading