-
Notifications
You must be signed in to change notification settings - Fork 77
Expand file tree
/
Copy pathtraces.py
More file actions
228 lines (181 loc) · 7.09 KB
/
traces.py
File metadata and controls
228 lines (181 loc) · 7.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""Public traces endpoint for OTEL ingestion.
This endpoint receives OTLP trace data from SDKs and:
1. Stores OTEL JSON to S3/MinIO (durable buffer)
2. Enqueues a Celery task with S3 reference for async processing
The endpoint accepts OTLP protobuf format only:
- application/x-protobuf: Decoded and converted to camelCase JSON before storage
Authentication is via API key in the Authorization header:
Authorization: Bearer <api_key>
"""
import gzip
import hashlib
import logging
import uuid
from datetime import UTC, datetime
from typing import Annotated, Any
import httpx
from fastapi import APIRouter, Depends, Header, HTTPException, Request, status
from google.protobuf.json_format import MessageToDict
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
ExportTraceServiceRequest,
)
from pydantic import BaseModel
from rest.services.s3 import get_s3_service
from shared.config import settings
from worker.ingest_tasks import process_s3_traces
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/public/traces", tags=["Traces (Public)"])
async def authenticate_api_key(
authorization: Annotated[str | None, Header()] = None,
) -> str:
"""Authenticate the request using API key and return the project_id.
The API key should be in the Authorization header as:
Authorization: Bearer <api_key>
Validates the API key by calling the Next.js internal API.
Args:
authorization: Authorization header value.
Returns:
The project_id associated with the API key.
Raises:
HTTPException: If authentication fails.
"""
if not authorization:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing Authorization header",
)
# Parse "Bearer <token>" format
parts = authorization.split(" ", 1)
if len(parts) != 2 or parts[0].lower() != "bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid Authorization header format. Expected: Bearer <api_key>",
)
api_key = parts[1]
# Hash the key
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
# Validate via Next.js internal API
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{settings.traceroot_ui_url}/api/internal/validate-api-key",
json={"keyHash": key_hash},
headers={"X-Internal-Secret": settings.internal_api_secret},
)
except httpx.RequestError as e:
logger.error(f"Failed to validate API key: {e}")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Authentication service unavailable",
) from e
if response.status_code == 401:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication failed",
)
if response.status_code != 200:
logger.error(f"Unexpected response from auth service: {response.status_code}")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Authentication service error",
)
data = response.json()
if not data.get("valid"):
error_message = data.get("error", "Invalid API key")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=error_message,
)
return data["projectId"]
ProjectId = Annotated[str, Depends(authenticate_api_key)]
def decode_otlp_protobuf(data: bytes) -> dict[str, Any]:
"""Decode OTLP protobuf to a Python dict.
Uses protobuf's MessageToDict for conversion, which produces
a JSON-compatible dict with proper field naming.
Args:
data: Raw protobuf bytes
Returns:
Dict representation of the OTLP trace data
"""
request = ExportTraceServiceRequest()
request.ParseFromString(data)
# MessageToDict converts protobuf to dict with camelCase field names
# (standard OTLP JSON format) and proper handling of bytes (base64), enums, etc.
return MessageToDict(request)
class IngestResponse(BaseModel):
"""Response for trace ingestion."""
status: str
file_key: str
@router.post("", response_model=IngestResponse)
async def ingest_traces(
request: Request,
project_id: ProjectId,
):
"""Ingest OTLP trace data.
Accepts OTLP protobuf format only (optionally gzip compressed).
Protobuf is converted to camelCase JSON before storage in S3.
S3 path: events/otel/{project_id}/{yyyy}/{mm}/{dd}/{hh}/{uuid}.json
Headers:
Authorization: Bearer <api_key>
Content-Encoding: gzip (optional)
Content-Type: application/x-protobuf
Body:
OTLP trace data in protobuf format
"""
# 1. Read body
body = await request.body()
if not body:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Empty request body",
)
# 2. Decompress if gzip
content_encoding = request.headers.get("content-encoding", "")
if "gzip" in content_encoding.lower():
try:
body = gzip.decompress(body)
except Exception as e:
logger.warning(f"Failed to decompress gzip: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Failed to decompress gzip: {e}",
) from e
# 3. Decode protobuf to camelCase JSON (OTLP standard format)
try:
trace_data = decode_otlp_protobuf(body)
logger.debug("Decoded OTLP protobuf to JSON")
except Exception as e:
logger.warning(f"Failed to parse OTLP protobuf: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Failed to parse OTLP protobuf: {e}",
) from e
# 4. Generate S3 key (time-partitioned)
now = datetime.now(UTC)
file_id = str(uuid.uuid4())
s3_key = (
f"events/otel/{project_id}/"
f"{now.year}/{now.month:02d}/{now.day:02d}/{now.hour:02d}/"
f"{file_id}.json"
)
# 5. Upload JSON to S3
try:
s3_service = get_s3_service()
s3_service.ensure_bucket_exists()
s3_service.upload_json(s3_key, trace_data)
logger.info(f"Stored OTEL JSON to {s3_key} for project {project_id}")
except Exception as e:
logger.error(f"Failed to upload OTEL JSON to S3: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Storage error: {e}",
) from e
# 6. Enqueue Celery task for async processing (S3 reference only, not full payload)
try:
process_s3_traces.delay(s3_key=s3_key, project_id=project_id)
logger.info(f"Enqueued Celery task for {s3_key}")
except Exception as e:
# Log but don't fail the request - S3 has the data, can retry later
logger.error(f"Failed to enqueue Celery task for {s3_key}: {e}")
# 7. Return success (async processing happens in background)
return IngestResponse(status="ok", file_key=s3_key)