Skip to content

Commit 0bf6513

Browse files
refactor: refactor Observer to serve new Tracing APIs (openchoreo#2326)
* refactor: Refactor Observer to serve new Tracing APIs Signed-off-by: Nilushan Costa <nilushan@wso2.com> * refactor: fix lint failures and address PR review comments Signed-off-by: Nilushan Costa <nilushan@wso2.com> * refactor: add tests Signed-off-by: Nilushan Costa <nilushan@wso2.com> * refactor: address PR review comments Signed-off-by: Nilushan Costa <nilushan@wso2.com> --------- Signed-off-by: Nilushan Costa <nilushan@wso2.com>
1 parent 2db0390 commit 0bf6513

File tree

18 files changed

+2234
-13
lines changed

18 files changed

+2234
-13
lines changed

cmd/observer/main.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,22 @@ func main() {
9393
logger.Info("Using OpenSearch for component logs")
9494
}
9595

96+
// Initialize traces backend (optional - based on experimental flag)
97+
var tracesBackend observability.TracesBackend
98+
if cfg.Experimental.UseTracesBackend {
99+
logger.Info("Experimental feature active: Using traces backend",
100+
"backend_url", cfg.Experimental.TracesBackendURL)
101+
102+
backendConfig := service.TracesBackendConfig{
103+
BaseURL: cfg.Experimental.TracesBackendURL,
104+
Timeout: cfg.Experimental.TracesBackendTimeout,
105+
}
106+
tracesBackend = service.NewTracesBackend(backendConfig)
107+
logger.Info("Traces backend initialized")
108+
} else {
109+
logger.Info("Using OpenSearch for traces")
110+
}
111+
96112
// Initialize legacy logging service (for legacy API endpoints)
97113
legacyLoggingService := legacyservice.NewLoggingService(osClient, promService, k8sClient, cfg, logger, logsBackend)
98114

@@ -132,6 +148,16 @@ func main() {
132148
os.Exit(1)
133149
}
134150

151+
// Initialize traces service
152+
tracesService, tracesServiceErr := service.NewTracesService(
153+
tracesBackend, uidResolver, cfg, logger.With("component", "traces-service"),
154+
)
155+
if tracesServiceErr != nil {
156+
logger.Error("Failed to initialize traces service", "error", tracesServiceErr)
157+
os.Exit(1)
158+
}
159+
logger.Info("Traces service initialized")
160+
135161
// Initialize health service
136162
healthService, healthServiceErr := service.NewHealthService(logger.With("component", "health-service"))
137163
if healthServiceErr != nil {
@@ -154,6 +180,7 @@ func main() {
154180
logsService,
155181
metricsService,
156182
alertService,
183+
tracesService,
157184
logger.With("component", "api-handler"),
158185
authzClient,
159186
)
@@ -220,6 +247,11 @@ func main() {
220247
api.HandleFunc("POST /api/v1/logs/query", newAPIHandler.QueryLogs)
221248
api.HandleFunc("POST /api/v1/metrics/query", newAPIHandler.QueryMetrics)
222249

250+
// ===== New API Routes (v1alpha1) - Traces =====
251+
api.HandleFunc("POST /api/v1alpha1/traces/query", newAPIHandler.QueryTraces)
252+
api.HandleFunc("POST /api/v1alpha1/traces/{traceId}/spans/query", newAPIHandler.QuerySpansForTrace)
253+
api.HandleFunc("GET /api/v1alpha1/traces/{traceId}/spans/{spanId}", newAPIHandler.GetSpanDetailsForTrace)
254+
223255
// MCP endpoint with chained middleware (logger -> recovery -> auth401 -> jwt -> handler)
224256
mcpMiddleware := initMCPMiddleware(logger)
225257
mcpRoutes := routes.Group(mcpMiddleware, jwtAuth)
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Copyright 2026 The OpenChoreo Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package adaptor
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"log/slog"
11+
"sort"
12+
"time"
13+
14+
"github.com/openchoreo/openchoreo/internal/observer/config"
15+
"github.com/openchoreo/openchoreo/internal/observer/opensearch"
16+
"github.com/openchoreo/openchoreo/pkg/observability"
17+
)
18+
19+
var (
20+
ErrSpanNotFound = errors.New("span not found")
21+
)
22+
23+
type DefaultTracesAdaptor struct {
24+
osClient *opensearch.Client
25+
queryBuilder *opensearch.QueryBuilder
26+
logger *slog.Logger
27+
}
28+
29+
func NewDefaultTracesAdaptor(cfg *config.OpenSearchConfig, logger *slog.Logger) (*DefaultTracesAdaptor, error) {
30+
osClient, err := opensearch.NewClient(cfg, logger)
31+
if err != nil {
32+
return nil, fmt.Errorf("failed to create OpenSearch client: %w", err)
33+
}
34+
35+
return &DefaultTracesAdaptor{
36+
osClient: osClient,
37+
queryBuilder: opensearch.NewQueryBuilder(cfg.IndexPrefix),
38+
logger: logger,
39+
}, nil
40+
}
41+
42+
func (a *DefaultTracesAdaptor) GetTraces(ctx context.Context, params observability.TracesQueryParams) (*observability.TracesQueryResult, error) {
43+
a.logger.Debug("Getting traces from OpenSearch")
44+
45+
// Build OpenSearch query params from observability params
46+
osParams := opensearch.TracesRequestParams{
47+
StartTime: params.StartTime.Format(time.RFC3339),
48+
EndTime: params.EndTime.Format(time.RFC3339),
49+
ProjectUID: params.ProjectID,
50+
EnvironmentUID: params.EnvironmentID,
51+
TraceID: params.TraceID,
52+
Limit: params.Limit,
53+
SortOrder: params.SortOrder,
54+
}
55+
// Only add ComponentUID if it's not empty
56+
if params.ComponentID != "" {
57+
osParams.ComponentUIDs = []string{params.ComponentID}
58+
}
59+
60+
// Build and execute query
61+
query := a.queryBuilder.BuildTracesQuery(osParams)
62+
response, err := a.osClient.Search(ctx, []string{"otel-traces-*"}, query)
63+
if err != nil {
64+
return nil, fmt.Errorf("failed to execute traces search: %w", err)
65+
}
66+
67+
// Parse and group spans by trace ID
68+
traceMap := make(map[string][]opensearch.Span)
69+
for _, hit := range response.Hits.Hits {
70+
span := opensearch.ParseSpanEntry(hit)
71+
traceID := opensearch.GetTraceID(hit)
72+
if traceID != "" {
73+
traceMap[traceID] = append(traceMap[traceID], span)
74+
}
75+
}
76+
77+
// Convert to observability.Trace format
78+
traces := make([]observability.Trace, 0, len(traceMap))
79+
traceIDs := make([]string, 0, len(traceMap))
80+
for traceID := range traceMap {
81+
traceIDs = append(traceIDs, traceID)
82+
}
83+
sort.Strings(traceIDs)
84+
85+
for _, traceID := range traceIDs {
86+
spans := traceMap[traceID]
87+
trace := convertToObservabilityTrace(traceID, spans)
88+
traces = append(traces, trace)
89+
}
90+
91+
return &observability.TracesQueryResult{
92+
Traces: traces,
93+
TotalCount: len(traces),
94+
Took: response.Took,
95+
}, nil
96+
}
97+
98+
// GetSpanDetails retrieves details for a specific span
99+
func (a *DefaultTracesAdaptor) GetSpanDetails(ctx context.Context, traceID string, spanID string) (*opensearch.Span, error) {
100+
a.logger.Debug("Getting span details from OpenSearch", "traceId", traceID, "spanId", spanID)
101+
102+
// Build query to get the specific span
103+
query := a.queryBuilder.BuildSpanDetailsQuery(traceID, spanID)
104+
response, err := a.osClient.Search(ctx, []string{"otel-traces-*"}, query)
105+
if err != nil {
106+
return nil, fmt.Errorf("failed to execute span search: %w", err)
107+
}
108+
109+
// Check if span was found
110+
if len(response.Hits.Hits) == 0 {
111+
return nil, ErrSpanNotFound
112+
}
113+
114+
// Parse and return the span
115+
span := opensearch.ParseSpanEntry(response.Hits.Hits[0])
116+
return &span, nil
117+
}
118+
119+
func convertToObservabilityTrace(traceID string, spans []opensearch.Span) observability.Trace {
120+
// Find root span and calculate trace metadata
121+
var rootSpan *opensearch.Span
122+
var minStartTime, maxEndTime time.Time
123+
124+
for i := range spans {
125+
span := &spans[i]
126+
if rootSpan == nil || span.ParentSpanID == "" {
127+
rootSpan = span
128+
}
129+
if minStartTime.IsZero() || span.StartTime.Before(minStartTime) {
130+
minStartTime = span.StartTime
131+
}
132+
if maxEndTime.IsZero() || span.EndTime.After(maxEndTime) {
133+
maxEndTime = span.EndTime
134+
}
135+
}
136+
137+
// Convert opensearch spans to observability spans
138+
traceSpans := make([]observability.TraceSpan, len(spans))
139+
for i, span := range spans {
140+
traceSpans[i] = observability.TraceSpan{
141+
SpanID: span.SpanID,
142+
Name: span.Name,
143+
ParentSpanID: span.ParentSpanID,
144+
StartTime: span.StartTime,
145+
EndTime: span.EndTime,
146+
DurationNs: span.DurationNanoseconds,
147+
Attributes: span.Attributes,
148+
ResourceAttributes: span.ResourceAttributes,
149+
}
150+
}
151+
152+
trace := observability.Trace{
153+
TraceID: traceID,
154+
SpanCount: len(spans),
155+
StartTime: minStartTime,
156+
EndTime: maxEndTime,
157+
DurationNs: maxEndTime.Sub(minStartTime).Nanoseconds(),
158+
Spans: traceSpans,
159+
}
160+
161+
if rootSpan != nil {
162+
trace.RootSpanID = rootSpan.SpanID
163+
trace.RootSpanName = rootSpan.Name
164+
trace.TraceName = rootSpan.Name
165+
}
166+
167+
return trace
168+
}

0 commit comments

Comments
 (0)