-
Notifications
You must be signed in to change notification settings - Fork 77
Expand file tree
/
Copy pathingest_tasks.py
More file actions
73 lines (56 loc) · 2.23 KB
/
ingest_tasks.py
File metadata and controls
73 lines (56 loc) · 2.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""Celery task definitions.
This module defines the async tasks that process trace data from S3 to ClickHouse.
"""
import logging
from worker.celery_app import app
logger = logging.getLogger(__name__)
@app.task(
bind=True,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=600, # Max 10 minutes between retries
max_retries=5,
)
def process_s3_traces(self, s3_key: str, project_id: str) -> dict:
"""Process OTEL traces from S3 and insert into ClickHouse.
This task:
1. Downloads the OTEL JSON from S3
2. Transforms it to ClickHouse format
3. Batch inserts traces and spans
Args:
s3_key: S3 key where the OTEL JSON is stored
project_id: Project ID for the traces
Returns:
Dict with counts of inserted traces and spans
"""
# Import here to avoid circular imports and ensure fresh connections
from db.clickhouse.client import get_clickhouse_client
from rest.services.s3 import get_s3_service
from worker.otel_transform import transform_otel_to_clickhouse
logger.info(f"Processing S3 traces: {s3_key} for project {project_id}")
try:
# 1. Download from S3
s3_service = get_s3_service()
otel_data = s3_service.download_json(s3_key)
logger.debug(f"Downloaded OTEL data from {s3_key}")
# 2. Transform to ClickHouse format
traces, spans = transform_otel_to_clickhouse(otel_data, project_id)
logger.info(f"Transformed {len(traces)} traces and {len(spans)} spans from {s3_key}")
# 3. Insert into ClickHouse
if traces or spans:
ch_client = get_clickhouse_client()
if traces:
ch_client.insert_traces_batch(traces)
logger.info(f"Inserted {len(traces)} traces into ClickHouse")
if spans:
ch_client.insert_spans_batch(spans)
logger.info(f"Inserted {len(spans)} spans into ClickHouse")
return {
"s3_key": s3_key,
"project_id": project_id,
"traces": len(traces),
"spans": len(spans),
}
except Exception as e:
logger.error(f"Failed to process {s3_key}: {e}", exc_info=True)
raise # Re-raise to trigger Celery retry