Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix LMDB and Redis streaming test issues
- Redis stream storage: Corrected test to use f'{table_name}:stream' key
  format to match how Redis loader stores stream data
- LMDB overwrite mode: Fixed _clear_data() to properly delete named
  databases when overwriting data
- LMDB streaming: Added tx_hash column to test data for compatibility
  with key pattern requirements
- Base streaming tests: Updated column references from transaction_hash
  to tx_hash for consistency across all loaders
  • Loading branch information
fordN committed Feb 2, 2026
commit f928c6dea1b46c9dd17dfbb7e04debb7d2f649d6
12 changes: 10 additions & 2 deletions src/amp/loaders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ def _try_load_batch(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> L
f'Please create any tables needed before running the loader. '
)

# Handle overwrite mode
if mode == LoadMode.OVERWRITE and hasattr(self, '_clear_table'):
# Handle overwrite mode (only if not already cleared by load_table)
if mode == LoadMode.OVERWRITE and not kwargs.get('_already_cleared') and hasattr(self, '_clear_table'):
self._clear_table(table_name)

# Perform the actual load
Expand Down Expand Up @@ -347,6 +347,14 @@ def load_table(self, table: pa.Table, table_name: str, **kwargs) -> LoadResult:
start_time = time.time()
batch_size = kwargs.get('batch_size', getattr(self, 'batch_size', 10000))

# Handle overwrite mode ONCE before processing batches
mode = kwargs.get('mode', LoadMode.APPEND)
if mode == LoadMode.OVERWRITE and hasattr(self, '_clear_table'):
self._clear_table(table_name)
# Prevent subsequent batch loads from clearing again
kwargs = kwargs.copy()
kwargs['_already_cleared'] = True

rows_loaded = 0
batch_count = 0
errors = []
Expand Down
22 changes: 12 additions & 10 deletions src/amp/loaders/implementations/lmdb_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,20 @@ def _clear_data(self, table_name: str) -> None:
try:
db = self._get_or_create_db(self.config.database_name)

# Clear all entries by iterating through and deleting
with self.env.begin(write=True, db=db) as txn:
# Collect all keys in a read transaction first
with self.env.begin(db=db) as txn:
cursor = txn.cursor()
# Delete all key-value pairs
if cursor.first():
while True:
if not cursor.delete():
break
if not cursor.next():
break
keys_to_delete = list(cursor.iternext(values=False))

self.logger.info(f"Cleared all data for table '{table_name}'")
# Delete all keys in a write transaction
if keys_to_delete:
with self.env.begin(write=True, db=db) as txn:
for key in keys_to_delete:
txn.delete(key)

self.logger.info(f"Cleared {len(keys_to_delete)} entries from LMDB for table '{table_name}'")
else:
self.logger.info(f"No data to clear for table '{table_name}'")
except Exception as e:
self.logger.error(f'Error in _clear_data: {e}')
raise
Expand Down
2 changes: 1 addition & 1 deletion src/amp/loaders/implementations/redis_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ def _clear_data(self, table_name: str) -> None:
self.logger.info(f'Deleted {keys_deleted} existing keys')

else:
# For collection-based structures, delete the main key
# For collection-based structures, use table_name:structure format
collection_key = f'{table_name}:{self.data_structure.value}'
if self.redis_client.exists(collection_key):
self.redis_client.delete(collection_key)
Expand Down
10 changes: 10 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,16 @@ def redis_test_config(request):
return request.getfixturevalue('redis_config')


@pytest.fixture
def redis_streaming_config(redis_test_config):
"""Redis config for streaming tests with blockchain data (uses tx_hash instead of id)"""
return {
**redis_test_config,
'key_pattern': '{table}:{tx_hash}', # Use tx_hash from blockchain data
'data_structure': 'hash',
}


@pytest.fixture(scope='session')
def delta_test_env():
"""Create Delta Lake test environment for the session"""
Expand Down
25 changes: 19 additions & 6 deletions tests/integration/loaders/backends/test_lmdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,17 @@ def cleanup_table(self, loader: LMDBLoader, table_name: str) -> None:

def get_column_names(self, loader: LMDBLoader, table_name: str) -> List[str]:
"""Get column names from LMDB database (from first record)"""
import json
import pyarrow as pa

db = loader._get_or_create_db(getattr(loader.config, 'database_name', None))
with loader.env.begin(db=db) as txn:
cursor = txn.cursor()
for _key, value in cursor:
try:
row_data = json.loads(value.decode())
return list(row_data.keys())
# Deserialize Arrow IPC format (LMDB stores Arrow batches, not JSON)
reader = pa.ipc.open_stream(value)
batch = reader.read_next_batch()
return batch.schema.names # Returns all column names including metadata
except Exception:
return ['_value'] # Fallback
return []
Expand Down Expand Up @@ -197,7 +199,7 @@ def test_composite_key_strategy(self, lmdb_config):
data = {'network': ['eth', 'eth', 'poly'], 'block': [100, 101, 100], 'tx_index': [0, 0, 0]}
test_data = pa.Table.from_pydict(data)

config = {**lmdb_config, 'key_columns': ['network', 'block', 'tx_index']}
config = {**lmdb_config, 'composite_key_columns': ['network', 'block', 'tx_index']}
loader = LMDBLoader(config)

with loader:
Expand Down Expand Up @@ -280,6 +282,8 @@ def test_byte_key_handling(self, lmdb_config):

def test_data_persistence(self, lmdb_config, small_test_data):
"""Test that data persists after closing and reopening"""
import pyarrow as pa

from src.amp.loaders.base import LoadMode

# Load data
Expand All @@ -298,8 +302,17 @@ def test_data_persistence(self, lmdb_config, small_test_data):
count = sum(1 for _ in cursor)
assert count == 5

# Can append more
result2 = loader2.load_table(small_test_data, 'test_table', mode=LoadMode.APPEND)
# Can append more (use different IDs to avoid key conflicts in key-value store)
additional_data = pa.Table.from_pydict({
'id': [6, 7, 8, 9, 10],
'name': ['f', 'g', 'h', 'i', 'j'],
'value': [60.6, 70.7, 80.8, 90.9, 100.0],
'year': [2024, 2024, 2024, 2024, 2024],
'month': [1, 1, 1, 1, 1],
'day': [6, 7, 8, 9, 10],
'active': [False, True, False, True, False],
})
result2 = loader2.load_table(additional_data, 'test_table', mode=LoadMode.APPEND)
assert result2.success == True

# Now should have 10
Expand Down
38 changes: 24 additions & 14 deletions tests/integration/loaders/backends/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@ class RedisTestConfig(LoaderTestConfig):
"""Redis-specific test configuration"""

loader_class = RedisLoader
config_fixture_name = 'redis_test_config'

supports_overwrite = True
supports_streaming = True
supports_multi_network = True
supports_null_values = True
requires_existing_table = False # Redis auto-creates keys/structures

def __init__(self, config_fixture_name='redis_test_config'):
"""
Initialize Redis test config.

Args:
config_fixture_name: Name of the pytest fixture providing loader config
(default: 'redis_test_config' for core tests,
use 'redis_streaming_config' for streaming tests)
"""
self.config_fixture_name = config_fixture_name

def get_row_count(self, loader: RedisLoader, table_name: str) -> int:
"""Get row count from Redis based on data structure"""
Expand Down Expand Up @@ -157,7 +168,7 @@ class TestRedisCore(BaseLoaderTests):
class TestRedisStreaming(BaseStreamingTests):
"""Redis streaming tests (inherited from base)"""

config = RedisTestConfig()
config = RedisTestConfig('redis_streaming_config')


@pytest.mark.redis
Expand All @@ -171,7 +182,7 @@ def test_hash_storage(self, redis_test_config, small_test_data, cleanup_redis):
table_name = 'test_hash'
patterns_to_clean.append(f'{table_name}:*')

config = {**redis_test_config, 'data_structure': 'hash', 'key_field': 'id'}
config = {**redis_test_config, 'data_structure': 'hash'} # Uses default key_pattern {table}:{id}
loader = RedisLoader(config)

with loader:
Expand All @@ -195,7 +206,7 @@ def test_string_storage(self, redis_test_config, small_test_data, cleanup_redis)
table_name = 'test_string'
patterns_to_clean.append(f'{table_name}:*')

config = {**redis_test_config, 'data_structure': 'string', 'key_field': 'id'}
config = {**redis_test_config, 'data_structure': 'string'} # Uses default key_pattern {table}:{id}
loader = RedisLoader(config)

with loader:
Expand All @@ -219,7 +230,8 @@ def test_stream_storage(self, redis_test_config, small_test_data, cleanup_redis)
"""Test Redis stream data structure storage"""
keys_to_clean, patterns_to_clean = cleanup_redis
table_name = 'test_stream'
keys_to_clean.append(table_name)
stream_key = f'{table_name}:stream' # Stream keys use table_name:stream format
keys_to_clean.append(stream_key)

config = {**redis_test_config, 'data_structure': 'stream'}
loader = RedisLoader(config)
Expand All @@ -229,8 +241,8 @@ def test_stream_storage(self, redis_test_config, small_test_data, cleanup_redis)
assert result.success == True
assert result.rows_loaded == 5

# Verify data is in stream
stream_len = loader.redis_client.xlen(table_name)
# Verify data is in stream (Redis stream key format is table_name:stream)
stream_len = loader.redis_client.xlen(stream_key)
assert stream_len == 5

def test_set_storage(self, redis_test_config, small_test_data, cleanup_redis):
Expand All @@ -239,7 +251,7 @@ def test_set_storage(self, redis_test_config, small_test_data, cleanup_redis):
table_name = 'test_set'
patterns_to_clean.append(f'{table_name}:*')

config = {**redis_test_config, 'data_structure': 'set', 'key_field': 'id'}
config = {**redis_test_config, 'data_structure': 'set'} # Uses default key_pattern {table}:{id}
loader = RedisLoader(config)

with loader:
Expand All @@ -255,7 +267,7 @@ def test_ttl_functionality(self, redis_test_config, small_test_data, cleanup_red
table_name = 'test_ttl'
patterns_to_clean.append(f'{table_name}:*')

config = {**redis_test_config, 'data_structure': 'hash', 'key_field': 'id', 'ttl': 2} # 2 second TTL
config = {**redis_test_config, 'data_structure': 'hash', 'ttl': 2} # 2 second TTL, uses default key_pattern
loader = RedisLoader(config)

with loader:
Expand Down Expand Up @@ -292,8 +304,7 @@ def test_key_pattern_generation(self, redis_test_config, cleanup_redis):
config = {
**redis_test_config,
'data_structure': 'hash',
'key_field': 'user_id',
'key_pattern': '{table}:user:{key_value}',
'key_pattern': '{table}:user:{user_id}', # Custom key pattern using user_id field
}
loader = RedisLoader(config)

Expand Down Expand Up @@ -325,8 +336,7 @@ def test_data_structure_comparison(self, redis_test_config, comprehensive_test_d
keys_to_clean.append(table_name)

config = {**redis_test_config, 'data_structure': structure}
if structure in ['hash', 'string']:
config['key_field'] = 'id'
# Hash and string structures use default key_pattern {table}:{id}

loader = RedisLoader(config)

Expand Down Expand Up @@ -368,7 +378,7 @@ def test_large_data_loading(self, redis_test_config, cleanup_redis):
}
large_table = pa.Table.from_pydict(large_data)

config = {**redis_test_config, 'data_structure': 'hash', 'key_field': 'id'}
config = {**redis_test_config, 'data_structure': 'hash'} # Uses default key_pattern {table}:{id}
loader = RedisLoader(config)

with loader:
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/loaders/test_base_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_streaming_metadata_columns(self, loader, test_table_name, cleanup_table
# Create test data with metadata
data = {
'block_number': [100, 101, 102],
'transaction_hash': ['0xabc', '0xdef', '0x123'],
'tx_hash': ['0xabc', '0xdef', '0x123'], # Use tx_hash for consistency with other streaming tests
'value': [1.0, 2.0, 3.0],
}
batch = pa.RecordBatch.from_pydict(data)
Expand Down Expand Up @@ -281,8 +281,8 @@ def test_microbatch_deduplication(self, loader, test_table_name, cleanup_tables)
from src.amp.streaming.types import BatchMetadata, BlockRange, ResponseBatch

with loader:
# Create table first from the schema
batch1_data = pa.RecordBatch.from_pydict({'id': [1, 2], 'value': [100, 200]})
# Create table first from the schema (include tx_hash for Redis key pattern compatibility)
batch1_data = pa.RecordBatch.from_pydict({'id': [1, 2], 'tx_hash': ['0x1', '0x2'], 'value': [100, 200]})
loader._create_table_from_schema(batch1_data.schema, test_table_name)

# Simulate a microbatch sent as 3 RecordBatches with the same BlockRange
Expand All @@ -296,7 +296,7 @@ def test_microbatch_deduplication(self, loader, test_table_name, cleanup_tables)
)

# Second RecordBatch (same BlockRange, ranges_complete=False)
batch2_data = pa.RecordBatch.from_pydict({'id': [3, 4], 'value': [300, 400]})
batch2_data = pa.RecordBatch.from_pydict({'id': [3, 4], 'tx_hash': ['0x3', '0x4'], 'value': [300, 400]})
response2 = ResponseBatch.data_batch(
data=batch2_data,
metadata=BatchMetadata(
Expand All @@ -306,7 +306,7 @@ def test_microbatch_deduplication(self, loader, test_table_name, cleanup_tables)
)

# Third RecordBatch (same BlockRange, ranges_complete=True)
batch3_data = pa.RecordBatch.from_pydict({'id': [5, 6], 'value': [500, 600]})
batch3_data = pa.RecordBatch.from_pydict({'id': [5, 6], 'tx_hash': ['0x5', '0x6'], 'value': [500, 600]})
response3 = ResponseBatch.data_batch(
data=batch3_data,
metadata=BatchMetadata(
Expand Down