Skip to content

Commit 5ee72d0

Browse files
committed
Add prometheus metrics
- use new dependency `prometheus-client` -
1 parent 1cff349 commit 5ee72d0

File tree

6 files changed

+1058
-2
lines changed

6 files changed

+1058
-2
lines changed

.gitignore

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,18 @@ data/
6060
# Build artifacts
6161
*.tar.gz
6262
*.zip
63+
64+
# Mycelium coordination files (not part of project)
65+
# Remove these lines if you want to commit mycelium to the project
66+
mycelium.py
67+
scripts/forager/
68+
signals/
69+
substrate/
70+
audit/
71+
logs/
72+
map.yaml
73+
pyproject.toml
74+
CLAUDE.md
75+
AGENT-QUICKSTART.md
76+
docs/FORAGER-*.md
77+
*.vesicle

docs/metrics.md

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
# Metrics Instrumentation
2+
3+
This document describes the metrics instrumentation available in amp for observability and monitoring.
4+
5+
## Overview
6+
7+
Amp provides Prometheus-compatible metrics for monitoring data loading operations. The metrics module offers:
8+
9+
- **Low overhead** instrumentation with optional prometheus_client dependency
10+
- **Graceful degradation** when prometheus_client is not installed
11+
- **Consistent naming** following Prometheus conventions
12+
- **Thread-safe** singleton implementation
13+
14+
## Installation
15+
16+
The metrics module works without prometheus_client (using no-op metrics), but to enable actual metric collection:
17+
18+
```bash
19+
# Install with metrics support
20+
pip install amp[metrics]
21+
22+
# Or install prometheus_client directly
23+
pip install prometheus-client
24+
```
25+
26+
## Quick Start
27+
28+
```python
29+
from amp.metrics import get_metrics, start_metrics_server
30+
31+
# Get the global metrics instance
32+
metrics = get_metrics()
33+
34+
# Start HTTP server on port 8000 for Prometheus scraping
35+
start_metrics_server(port=8000)
36+
37+
# Record metrics in your code
38+
metrics.records_processed.labels(
39+
loader='postgresql',
40+
table='users',
41+
connection='default'
42+
).inc(1000)
43+
44+
metrics.processing_latency.labels(
45+
loader='postgresql',
46+
operation='load_batch'
47+
).observe(0.5)
48+
```
49+
50+
## Available Metrics
51+
52+
### Counters
53+
54+
| Metric | Labels | Description |
55+
|--------|--------|-------------|
56+
| `amp_records_processed_total` | loader, table, connection | Total records processed |
57+
| `amp_errors_total` | loader, error_type, table | Total errors by type |
58+
| `amp_bytes_processed_total` | loader, table | Total bytes processed |
59+
| `amp_reorg_events_total` | loader, network, table | Blockchain reorg events |
60+
| `amp_retry_attempts_total` | loader, operation, reason | Retry attempts |
61+
62+
### Histograms
63+
64+
| Metric | Labels | Description |
65+
|--------|--------|-------------|
66+
| `amp_processing_latency_seconds` | loader, operation | Processing time distribution |
67+
| `amp_batch_size_records` | loader, table | Batch size distribution |
68+
69+
### Gauges
70+
71+
| Metric | Labels | Description |
72+
|--------|--------|-------------|
73+
| `amp_active_connections` | loader, target | Current active connections |
74+
| `amp_queue_depth` | queue_name | Current queue depth |
75+
76+
### Info
77+
78+
| Metric | Labels | Description |
79+
|--------|--------|-------------|
80+
| `amp_build_info` | (various) | Build/version information |
81+
82+
## Context Manager for Operations
83+
84+
The `track_operation` context manager simplifies instrumentation:
85+
86+
```python
87+
from amp.metrics import get_metrics
88+
89+
metrics = get_metrics()
90+
91+
with metrics.track_operation('postgresql', 'load_batch', table='users') as ctx:
92+
# Your loading code here
93+
rows_loaded = load_data(batch)
94+
95+
# Set context for automatic metric recording
96+
ctx['records'] = rows_loaded
97+
ctx['bytes'] = batch.nbytes
98+
99+
# Metrics are automatically recorded:
100+
# - processing_latency is observed
101+
# - records_processed is incremented
102+
# - bytes_processed is incremented
103+
# - errors are recorded if an exception occurs
104+
```
105+
106+
## Configuration
107+
108+
Customize metrics collection with `MetricsConfig`:
109+
110+
```python
111+
from amp.metrics import get_metrics, MetricsConfig
112+
113+
config = MetricsConfig(
114+
enabled=True, # Enable/disable all metrics
115+
namespace='amp', # Metric name prefix
116+
subsystem='loader', # Optional subsystem name
117+
default_labels={'env': 'prod'}, # Default labels for all metrics
118+
histogram_buckets=( # Custom latency buckets
119+
0.001, 0.005, 0.01, 0.025, 0.05,
120+
0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0
121+
),
122+
)
123+
124+
metrics = get_metrics(config)
125+
```
126+
127+
## Prometheus Integration
128+
129+
### HTTP Endpoint
130+
131+
Start a metrics server for Prometheus scraping:
132+
133+
```python
134+
from amp.metrics import start_metrics_server
135+
136+
# Start on default port 8000
137+
start_metrics_server()
138+
139+
# Or specify custom port and address
140+
start_metrics_server(port=9090, addr='0.0.0.0')
141+
```
142+
143+
### Generate Metrics Text
144+
145+
Generate metrics in Prometheus text format for custom export:
146+
147+
```python
148+
from amp.metrics import generate_metrics_text
149+
150+
# Get metrics as bytes
151+
metrics_text = generate_metrics_text()
152+
153+
# Use in your HTTP handler
154+
@app.route('/metrics')
155+
def metrics_endpoint():
156+
return Response(generate_metrics_text(), mimetype='text/plain')
157+
```
158+
159+
### Example Prometheus Config
160+
161+
```yaml
162+
scrape_configs:
163+
- job_name: 'amp'
164+
static_configs:
165+
- targets: ['localhost:8000']
166+
scrape_interval: 15s
167+
```
168+
169+
## Grafana Dashboard
170+
171+
Example queries for a Grafana dashboard:
172+
173+
```promql
174+
# Records processed rate (per second)
175+
rate(amp_records_processed_total[5m])
176+
177+
# P99 latency
178+
histogram_quantile(0.99, rate(amp_processing_latency_seconds_bucket[5m]))
179+
180+
# Error rate percentage
181+
rate(amp_errors_total[5m]) / rate(amp_records_processed_total[5m]) * 100
182+
183+
# Active connections by loader
184+
amp_active_connections
185+
186+
# Average batch size
187+
rate(amp_batch_size_records_sum[5m]) / rate(amp_batch_size_records_count[5m])
188+
```
189+
190+
## Graceful Degradation
191+
192+
When prometheus_client is not installed, the metrics module uses no-op implementations that silently accept all operations:
193+
194+
```python
195+
from amp.metrics import get_metrics, is_prometheus_available
196+
197+
if is_prometheus_available():
198+
print("Prometheus metrics enabled")
199+
else:
200+
print("Metrics disabled - install prometheus-client to enable")
201+
202+
# Code works the same either way
203+
metrics = get_metrics()
204+
metrics.records_processed.labels(loader='test', table='t', connection='c').inc(100)
205+
```
206+
207+
## Testing
208+
209+
For testing, you can reset the metrics singleton:
210+
211+
```python
212+
from amp.metrics import AmpMetrics
213+
214+
def test_my_loader():
215+
# Reset before test
216+
AmpMetrics.reset_instance()
217+
218+
# Run test with fresh metrics
219+
metrics = get_metrics()
220+
# ...
221+
222+
# Clean up after test
223+
AmpMetrics.reset_instance()
224+
```
225+
226+
## Best Practices
227+
228+
1. **Use consistent labels** - Keep label values consistent across your codebase
229+
2. **Avoid high cardinality** - Don't use user IDs or request IDs as labels
230+
3. **Use track_operation** - Prefer the context manager for automatic error handling
231+
4. **Set up alerts** - Configure Prometheus alerts for error rates and latency
232+
5. **Dashboard first** - Design your metrics around what you want to see in dashboards

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ lmdb = [
6666
"lmdb>=1.4.0",
6767
]
6868

69+
metrics = [
70+
"prometheus-client>=0.20.0",
71+
]
72+
6973
all_loaders = [
7074
"psycopg2-binary>=2.9.0", # PostgreSQL
7175
"redis>=4.5.0", # Redis
@@ -75,6 +79,7 @@ all_loaders = [
7579
"snowflake-connector-python>=4.0.0", # Snowflake
7680
"snowpipe-streaming>=1.0.0", # Snowpipe Streaming API
7781
"lmdb>=1.4.0", # LMDB
82+
"prometheus-client>=0.20.0", # Metrics
7883
]
7984

8085
test = [

src/amp/loaders/base.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import pyarrow as pa
1313

14+
from ..metrics import get_metrics
1415
from ..streaming.resilience import (
1516
AdaptiveRateLimiter,
1617
BackPressureConfig,
@@ -148,6 +149,18 @@ def close(self) -> None:
148149
"""Alias for disconnect() for backward compatibility"""
149150
self.disconnect()
150151

152+
def _record_connection_opened(self) -> None:
153+
"""Record that a connection was opened. Call this after establishing connection."""
154+
loader_type = self.__class__.__name__.replace('Loader', '').lower()
155+
metrics = get_metrics()
156+
metrics.active_connections.labels(loader=loader_type, target='default').inc()
157+
158+
def _record_connection_closed(self) -> None:
159+
"""Record that a connection was closed. Call this after closing connection."""
160+
loader_type = self.__class__.__name__.replace('Loader', '').lower()
161+
metrics = get_metrics()
162+
metrics.active_connections.labels(loader=loader_type, target='default').dec()
163+
151164
@abstractmethod
152165
def _load_batch_impl(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> int:
153166
"""
@@ -227,6 +240,16 @@ def load_batch(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> LoadRe
227240
f'Transient error loading batch (attempt {backoff.attempt}/{self.retry_config.max_retries}): '
228241
f'{last_error}. Retrying in {delay:.1f}s...'
229242
)
243+
# Record retry metric
244+
loader_type = self.__class__.__name__.replace('Loader', '').lower()
245+
metrics = get_metrics()
246+
# Determine retry reason
247+
reason = 'transient'
248+
if '429' in last_error or 'rate limit' in last_error.lower():
249+
reason = 'rate_limit'
250+
elif 'timeout' in last_error.lower() or 'timed out' in last_error.lower():
251+
reason = 'timeout'
252+
metrics.retry_attempts.labels(loader=loader_type, operation='load_batch', reason=reason).inc()
230253
time.sleep(delay)
231254

232255
def _try_load_batch(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> LoadResult:
@@ -320,24 +343,39 @@ def _try_load_batch(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> L
320343

321344
duration = time.time() - start_time
322345

346+
# Record metrics for successful batch load
347+
loader_type = self.__class__.__name__.replace('Loader', '').lower()
348+
metrics = get_metrics()
349+
metrics.records_processed.labels(loader=loader_type, table=table_name, connection=connection_name).inc(
350+
rows_loaded
351+
)
352+
metrics.batch_sizes.labels(loader=loader_type, table=table_name).observe(rows_loaded)
353+
metrics.processing_latency.labels(loader=loader_type, operation='load_batch').observe(duration)
354+
if hasattr(batch, 'nbytes'):
355+
metrics.bytes_processed.labels(loader=loader_type, table=table_name).inc(batch.nbytes)
356+
323357
return LoadResult(
324358
rows_loaded=rows_loaded,
325359
duration=duration,
326360
ops_per_second=round(rows_loaded / duration, 2) if duration > 0 else 0,
327361
table_name=table_name,
328-
loader_type=self.__class__.__name__.replace('Loader', '').lower(),
362+
loader_type=loader_type,
329363
success=True,
330364
metadata=self._get_batch_metadata(batch, duration, **kwargs),
331365
)
332366

333367
except Exception as e:
334368
self.logger.error(f'Failed to load batch: {str(e)}')
369+
# Record error metrics
370+
loader_type = self.__class__.__name__.replace('Loader', '').lower()
371+
metrics = get_metrics()
372+
metrics.errors.labels(loader=loader_type, error_type=type(e).__name__, table=table_name).inc()
335373
return LoadResult(
336374
rows_loaded=0,
337375
duration=time.time() - start_time,
338376
ops_per_second=0,
339377
table_name=table_name,
340-
loader_type=self.__class__.__name__.replace('Loader', '').lower(),
378+
loader_type=loader_type,
341379
success=False,
342380
error=str(e),
343381
)
@@ -575,8 +613,14 @@ def _process_reorg_event(
575613

576614
# Invalidate affected batches from state store
577615
if response.invalidation_ranges:
616+
# Record reorg metrics
617+
loader_type = self.__class__.__name__.replace('Loader', '').lower()
618+
metrics = get_metrics()
619+
578620
# Log reorg details
579621
for range_obj in response.invalidation_ranges:
622+
# Record reorg event per network
623+
metrics.reorg_events.labels(loader=loader_type, network=range_obj.network, table=table_name).inc()
580624
self.logger.warning(
581625
f'Reorg detected on {range_obj.network}: blocks {range_obj.start}-{range_obj.end} invalidated'
582626
)

0 commit comments

Comments
 (0)