Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix Iceberg reorg deletion to use modern batch ID approach
Iceberg loader was using an outdated reorg deletion method that
used a _meta_block_ranges column instead of using the modern
state_store + _amp_batch_id approach.

Changes:
1. _handle_reorg now uses state_store.invalidate_from_block() to get
   affected batch IDs, matching PostgreSQL/Snowflake/DeltaLake approach

2. _perform_reorg_deletion now filters rows by _amp_batch_id instead of
   trying to parse non-existent _meta_block_ranges JSON column

3. Efficient filtering using set membership checks on batch IDs
  • Loading branch information
fordN committed Feb 2, 2026
commit 622ba82d0f66f60e740d9f9bfe38099a1609d1b8
151 changes: 47 additions & 104 deletions src/amp/loaders/implementations/iceberg_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,19 @@ def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str,
return

try:
# Collect all affected batch IDs from state store
all_affected_batch_ids = []
for range_obj in invalidation_ranges:
# Get batch IDs that need to be deleted from state store
affected_batch_ids = self.state_store.invalidate_from_block(
connection_name, table_name, range_obj.network, range_obj.start
)
all_affected_batch_ids.extend(affected_batch_ids)

if not all_affected_batch_ids:
self.logger.info(f'No batches to delete for reorg in {table_name}')
return

# Load the Iceberg table
table_identifier = f'{self.config.namespace}.{table_name}'
try:
Expand All @@ -532,58 +545,31 @@ def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str,
self.logger.warning(f"Table '{table_identifier}' does not exist, skipping reorg handling")
return

# Build delete predicate for all invalidation ranges
# For Iceberg, we'll use PyArrow expressions which get converted automatically
delete_conditions = []

for range_obj in invalidation_ranges:
network = range_obj.network
reorg_start = range_obj.start

# Create condition for this network's reorg
# Delete all rows where the block range metadata for this network has end >= reorg_start
# This catches both overlapping ranges and forward ranges from the reorg point

# Build expression to check _meta_block_ranges JSON array
# We need to parse the JSON and check if any range for this network
# has an end block >= reorg_start
delete_conditions.append(
f'_meta_block_ranges LIKE \'%"network":"{network}"%\' AND '
f'EXISTS (SELECT 1 FROM JSON_ARRAY_ELEMENTS(_meta_block_ranges) AS range_elem '
f"WHERE range_elem->>'network' = '{network}' AND "
f"(range_elem->>'end')::int >= {reorg_start})"
)

# Process reorg if we have deletion conditions
if delete_conditions:
self.logger.info(
f'Executing blockchain reorg deletion for {len(invalidation_ranges)} networks '
f"in Iceberg table '{table_name}'"
)
self.logger.info(
f'Executing blockchain reorg deletion for {len(invalidation_ranges)} networks '
f"in Iceberg table '{table_name}' ({len(all_affected_batch_ids)} batch IDs)"
)

# Since PyIceberg doesn't have a direct delete API yet, we'll use overwrite
# with filtered data as a workaround
# Future: Use SQL delete when available:
# combined_condition = ' OR '.join(f'({cond})' for cond in delete_conditions)
# delete_expr = f"DELETE FROM {table_identifier} WHERE {combined_condition}"
self._perform_reorg_deletion(iceberg_table, invalidation_ranges, table_name)
self._perform_reorg_deletion(iceberg_table, all_affected_batch_ids, table_name)

except Exception as e:
self.logger.error(f"Failed to handle blockchain reorg for table '{table_name}': {str(e)}")
raise

def _perform_reorg_deletion(
self, iceberg_table: IcebergTable, invalidation_ranges: List[BlockRange], table_name: str
) -> None:
def _perform_reorg_deletion(self, iceberg_table: IcebergTable, all_affected_batch_ids, table_name: str) -> None:
"""
Perform the actual deletion for reorg handling using Iceberg's capabilities.
Perform the actual deletion for reorg handling using batch IDs.

Since PyIceberg doesn't have a direct DELETE API yet, we'll use scan and overwrite
to achieve the same effect while maintaining ACID guarantees.

Args:
iceberg_table: The Iceberg table to delete from
all_affected_batch_ids: List of BatchIdentifier objects to delete
table_name: Table name for logging
"""
try:
# First, scan the table to get current data
# We'll filter out the invalidated ranges during the scan
scan = iceberg_table.scan()

# Read all data into memory (for now - could be optimized with streaming)
Expand All @@ -593,79 +579,35 @@ def _perform_reorg_deletion(
self.logger.info(f"Table '{table_name}' is empty, nothing to delete for reorg")
return

# Check if the table has the metadata column
if '_meta_block_ranges' not in arrow_table.schema.names:
# Check if the table has the batch ID column
if '_amp_batch_id' not in arrow_table.schema.names:
self.logger.warning(
f"Table '{table_name}' doesn't have '_meta_block_ranges' column, skipping reorg handling"
f"Table '{table_name}' doesn't have '_amp_batch_id' column, skipping reorg handling"
)
return

# Filter out invalidated rows
import pyarrow.compute as pc

# Start with all rows marked as valid
keep_mask = pc.equal(pc.scalar(True), pc.scalar(True))
# Build set of unique batch IDs to delete
unique_batch_ids = set(bid.unique_id for bid in all_affected_batch_ids)

for range_obj in invalidation_ranges:
network = range_obj.network
reorg_start = range_obj.start

# For each row, check if it should be invalidated
# This is complex with JSON, so we'll parse and check each row
for i in range(arrow_table.num_rows):
meta_json = arrow_table['_meta_block_ranges'][i].as_py()
if meta_json:
try:
ranges_data = json.loads(meta_json)
# Check if any range for this network should be invalidated
for range_info in ranges_data:
if range_info['network'] == network and range_info['end'] >= reorg_start:
# Mark this row for deletion
keep_mask = pc.and_(keep_mask, pc.not_equal(pc.scalar(i), pc.scalar(i)))
break
except (json.JSONDecodeError, KeyError):
continue

# Create a filtered table with only the rows we want to keep
# For a more efficient implementation, build a boolean array
# Filter out rows with matching batch IDs
# A row should be deleted if its _amp_batch_id contains any of the affected IDs
# (handles multi-network batches with "|"-separated IDs)
keep_indices = []
deleted_count = 0

for i in range(arrow_table.num_rows):
should_delete = False
meta_json = arrow_table['_meta_block_ranges'][i].as_py()

if meta_json:
try:
ranges_data = json.loads(meta_json)

# Ensure ranges_data is a list
if not isinstance(ranges_data, list):
continue

# Check each invalidation range
for range_obj in invalidation_ranges:
network = range_obj.network
reorg_start = range_obj.start

# Check if any range for this network should be invalidated
for range_info in ranges_data:
if (
isinstance(range_info, dict)
and range_info.get('network') == network
and range_info.get('end', 0) >= reorg_start
):
should_delete = True
deleted_count += 1
break

if should_delete:
break

except (json.JSONDecodeError, KeyError):
pass

if not should_delete:
batch_id_value = arrow_table['_amp_batch_id'][i].as_py()

if batch_id_value:
# Check if any affected batch ID appears in this row's batch ID
should_delete = any(bid in batch_id_value for bid in unique_batch_ids)

if should_delete:
deleted_count += 1
else:
keep_indices.append(i)
else:
# Keep rows without batch ID
keep_indices.append(i)

if deleted_count == 0:
Expand All @@ -684,7 +626,8 @@ def _perform_reorg_deletion(
iceberg_table.overwrite(filtered_table)

self.logger.info(
f"Blockchain reorg deleted {deleted_count} rows from Iceberg table '{table_name}'. "
f"Blockchain reorg deleted {deleted_count} rows from Iceberg table '{table_name}' "
f"({len(all_affected_batch_ids)} batch IDs). "
f'New snapshot created with {filtered_table.num_rows} remaining rows.'
)

Expand Down