-
Notifications
You must be signed in to change notification settings - Fork 77
Expand file tree
/
Copy pathtrace_reader.py
More file actions
293 lines (252 loc) · 9.85 KB
/
trace_reader.py
File metadata and controls
293 lines (252 loc) · 9.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""Service for reading traces from ClickHouse."""
from datetime import UTC, datetime
from db.clickhouse import get_clickhouse_client
def _to_utc_naive(dt: datetime) -> datetime:
"""Convert datetime to UTC naive datetime for ClickHouse comparison."""
if dt.tzinfo is not None:
# Convert to UTC then remove timezone info
return dt.astimezone(UTC).replace(tzinfo=None)
return dt
class TraceReaderService:
"""Read traces and spans from ClickHouse."""
def __init__(self):
self._client = get_clickhouse_client()
def list_traces(
self,
project_id: str,
page: int = 0,
limit: int = 50,
name: str | None = None,
user_id: str | None = None,
start_after: datetime | None = None,
end_before: datetime | None = None,
search_query: str | None = None,
) -> dict:
"""List traces with aggregated metrics from spans."""
offset = page * limit
# Build WHERE conditions
conditions = ["t.project_id = {project_id:String}"]
params = {"project_id": project_id, "limit": limit, "offset": offset}
if name:
conditions.append("t.name ILIKE {name:String}")
params["name"] = f"%{name}%"
if user_id:
conditions.append("t.user_id = {user_id:String}")
params["user_id"] = user_id
# Date range filtering (convert to UTC naive datetime for ClickHouse)
if start_after:
conditions.append("t.trace_start_time >= {start_after:DateTime64(3)}")
params["start_after"] = _to_utc_naive(start_after)
if end_before:
conditions.append("t.trace_start_time <= {end_before:DateTime64(3)}")
params["end_before"] = _to_utc_naive(end_before)
# Multi-field keyword search (trace_id, name, session_id, user_id)
if search_query:
conditions.append(
"(t.trace_id ILIKE {search_kw:String} "
"OR t.name ILIKE {search_kw:String} "
"OR t.session_id ILIKE {search_kw:String} "
"OR t.user_id ILIKE {search_kw:String})"
)
params["search_kw"] = f"%{search_query}%"
where_clause = " AND ".join(conditions)
# Query traces with span aggregates
# Use FINAL to deduplicate ReplacingMergeTree rows
query = f"""
SELECT
t.trace_id,
t.project_id,
t.name,
t.trace_start_time,
t.user_id,
t.session_id,
count(s.span_id) as span_count,
if(
min(s.span_start_time) IS NOT NULL AND max(s.span_end_time) IS NOT NULL,
dateDiff('millisecond', min(s.span_start_time), max(s.span_end_time)),
NULL
) as duration_ms,
if(countIf(s.status = 'ERROR') > 0, 'error', 'ok') as status,
t.input,
t.output
FROM traces AS t FINAL
LEFT JOIN spans AS s FINAL ON t.trace_id = s.trace_id AND t.project_id = s.project_id
WHERE {where_clause}
GROUP BY t.trace_id, t.project_id, t.name, t.trace_start_time, t.user_id, t.session_id, t.input, t.output
ORDER BY t.trace_start_time DESC
LIMIT {{limit:UInt32}} OFFSET {{offset:UInt32}}
"""
result = self._client.query(query, parameters=params)
rows = result.result_rows
# Get total count
count_query = f"""
SELECT count(DISTINCT t.trace_id)
FROM traces AS t FINAL
WHERE {where_clause}
"""
count_result = self._client.query(count_query, parameters=params)
total = count_result.result_rows[0][0] if count_result.result_rows else 0
# Convert rows to dicts
data = []
for row in rows:
data.append(
{
"trace_id": row[0],
"project_id": row[1],
"name": row[2],
"trace_start_time": row[3],
"user_id": row[4],
"session_id": row[5],
"span_count": row[6] or 0,
"duration_ms": float(row[7]) if row[7] is not None else None,
"status": row[8],
"input": row[9],
"output": row[10],
}
)
return {
"data": data,
"meta": {"page": page, "limit": limit, "total": total},
}
def get_trace(self, project_id: str, trace_id: str) -> dict | None:
"""Get single trace with all spans."""
# Fetch trace
trace_query = """
SELECT
trace_id, project_id, name, trace_start_time,
user_id, session_id, environment, release, input, output, metadata
FROM traces FINAL
WHERE project_id = {project_id:String} AND trace_id = {trace_id:String}
LIMIT 1
"""
trace_result = self._client.query(
trace_query,
parameters={"project_id": project_id, "trace_id": trace_id},
)
if not trace_result.result_rows:
return None
row = trace_result.result_rows[0]
trace = {
"trace_id": row[0],
"project_id": row[1],
"name": row[2],
"trace_start_time": row[3],
"user_id": row[4],
"session_id": row[5],
"environment": row[6],
"release": row[7],
"input": row[8],
"output": row[9],
"metadata": row[10],
}
# Fetch spans
spans_query = """
SELECT
span_id, trace_id, parent_span_id, name, span_kind,
span_start_time, span_end_time, status, status_message,
model_name, cost, input_tokens, output_tokens, total_tokens,
input, output, metadata
FROM spans FINAL
WHERE project_id = {project_id:String} AND trace_id = {trace_id:String}
ORDER BY span_start_time ASC
"""
spans_result = self._client.query(
spans_query,
parameters={"project_id": project_id, "trace_id": trace_id},
)
spans = []
for row in spans_result.result_rows:
spans.append(
{
"span_id": row[0],
"trace_id": row[1],
"parent_span_id": row[2],
"name": row[3],
"span_kind": row[4],
"span_start_time": row[5],
"span_end_time": row[6],
"status": row[7],
"status_message": row[8],
"model_name": row[9],
"cost": float(row[10]) if row[10] is not None else None,
"input_tokens": int(row[11]) if row[11] is not None else None,
"output_tokens": int(row[12]) if row[12] is not None else None,
"total_tokens": int(row[13]) if row[13] is not None else None,
"input": row[14],
"output": row[15],
"metadata": row[16],
}
)
trace["spans"] = spans
return trace
def list_users(
self,
project_id: str,
page: int = 0,
limit: int = 50,
search_query: str | None = None,
start_after: datetime | None = None,
end_before: datetime | None = None,
) -> dict:
"""List unique users with trace counts."""
offset = page * limit
# Build WHERE conditions
conditions = [
"project_id = {project_id:String}",
"user_id IS NOT NULL",
"user_id != ''",
]
params: dict = {"project_id": project_id, "limit": limit, "offset": offset}
# Search by user_id
if search_query:
conditions.append("user_id ILIKE {search_kw:String}")
params["search_kw"] = f"%{search_query}%"
# Date range filtering
if start_after:
conditions.append("trace_start_time >= {start_after:DateTime64(3)}")
params["start_after"] = _to_utc_naive(start_after)
if end_before:
conditions.append("trace_start_time <= {end_before:DateTime64(3)}")
params["end_before"] = _to_utc_naive(end_before)
where_clause = " AND ".join(conditions)
query = f"""
SELECT
user_id,
count(DISTINCT trace_id) as trace_count,
max(trace_start_time) as last_trace_time
FROM traces FINAL
WHERE {where_clause}
GROUP BY user_id
ORDER BY last_trace_time DESC
LIMIT {{limit:UInt32}} OFFSET {{offset:UInt32}}
"""
result = self._client.query(query, parameters=params)
# Get total count
count_query = f"""
SELECT count(DISTINCT user_id)
FROM traces FINAL
WHERE {where_clause}
"""
count_result = self._client.query(count_query, parameters=params)
total = count_result.result_rows[0][0] if count_result.result_rows else 0
data = []
for row in result.result_rows:
data.append(
{
"user_id": row[0],
"trace_count": row[1],
"last_trace_time": row[2],
}
)
return {
"data": data,
"meta": {"page": page, "limit": limit, "total": total},
}
# Singleton instance
_service: TraceReaderService | None = None
def get_trace_reader_service() -> TraceReaderService:
"""Get or create the singleton TraceReaderService."""
global _service
if _service is None:
_service = TraceReaderService()
return _service