Skip to content
Prev Previous commit
Next Next commit
reuse ancestors_of existing functionality
  • Loading branch information
kaushiksrini committed Mar 13, 2025
commit e631ddfa67d9df07e2b5b2095be68b2dcb8c533f
19 changes: 12 additions & 7 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
Snapshot,
SnapshotSummaryCollector,
Summary,
ancestors_of,
update_snapshot_summaries,
)
from pyiceberg.table.update import (
Expand Down Expand Up @@ -254,9 +255,10 @@ def _commit(self) -> UpdatesAndRequirements:
manifest_list_file_path = location_provider.new_metadata_location(file_name)

# get current snapshot id and starting snapshot id, and validate that there are no conflicts
starting_snapshot_id = self._parent_snapshot_id
current_snapshot_id = self._transaction._table.refresh().metadata.current_snapshot_id
self._validate(starting_snapshot_id, current_snapshot_id)
if self._transaction._table.__class__.__name__ != "StagedTable":
starting_snapshot = self._transaction.table_metadata.current_snapshot()
current_snapshot = self._transaction._table.refresh().metadata.current_snapshot()
self._validate(starting_snapshot, current_snapshot)

with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
Expand Down Expand Up @@ -286,10 +288,7 @@ def _commit(self) -> UpdatesAndRequirements:
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),),
)

def _validate(self, starting_snapshot_id: Optional[int], current_snapshot_id: Optional[int]) -> None:
# get all the snapshots between the current snapshot id and the parent id
snapshots = ancestors_between(starting_snapshot_id, current_snapshot_id, self._transaction._table.metadata.snapshot_by_id)

def _validate(self, starting_snapshot: Optional[Snapshot], current_snapshot: Optional[Snapshot]) -> None:
# Define allowed operations for each type of operation
allowed_operations = {
Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE},
Expand All @@ -298,7 +297,13 @@ def _validate(self, starting_snapshot_id: Optional[int], current_snapshot_id: Op
Operation.DELETE: set(),
}

# get all the snapshots between the current snapshot id and the parent id
snapshots = ancestors_of(current_snapshot, self._transaction._table.metadata)

for snapshot in snapshots:
if snapshot.snapshot_id == starting_snapshot.snapshot_id:
break

snapshot_operation = snapshot.summary.operation

if snapshot_operation not in allowed_operations[self._operation]:
Expand Down
53 changes: 0 additions & 53 deletions pyiceberg/utils/snapshot.py

This file was deleted.

Loading