Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,34 +70,27 @@ func NewRequestBridge(ctx context.Context, providers []provider.Provider, rec re
// Create per-provider circuit breaker if configured
cfg := prov.CircuitBreakerConfig()
providerName := prov.Name()
onChange := func(endpoint string, from, to gobreaker.State) {
onChange := func(endpoint, model string, from, to gobreaker.State) {
logger.Info(context.Background(), "circuit breaker state change",
slog.F("provider", providerName),
slog.F("endpoint", endpoint),
slog.F("model", model),
slog.F("from", from.String()),
slog.F("to", to.String()),
)
if m != nil {
m.CircuitBreakerState.WithLabelValues(providerName, endpoint).Set(circuitbreaker.StateToGaugeValue(to))
m.CircuitBreakerState.WithLabelValues(providerName, endpoint, model).Set(circuitbreaker.StateToGaugeValue(to))
if to == gobreaker.StateOpen {
m.CircuitBreakerTrips.WithLabelValues(providerName, endpoint).Inc()
m.CircuitBreakerTrips.WithLabelValues(providerName, endpoint, model).Inc()
}
}
}
cbs := circuitbreaker.NewProviderCircuitBreakers(providerName, cfg, onChange)
cbs := circuitbreaker.NewProviderCircuitBreakers(providerName, cfg, onChange, m)

// Add the known provider-specific routes which are bridged (i.e. intercepted and augmented).
for _, path := range prov.BridgedRoutes() {
// Initialize circuit breaker state metric to closed (0) for known routes
if m != nil && cbs != nil {
endpoint := strings.TrimPrefix(path, "/"+providerName)
m.CircuitBreakerState.WithLabelValues(providerName, endpoint).Set(0)
}

handler := newInterceptionProcessor(prov, rec, mcpProxy, logger, m, tracer)
// Wrap with circuit breaker middleware (nil cbs passes through)
wrapped := circuitbreaker.Middleware(cbs, m, logger)(handler)
mux.Handle(path, wrapped)
handler := newInterceptionProcessor(prov, cbs, rec, mcpProxy, logger, m, tracer)
mux.Handle(path, handler)
}

// Any requests which passthrough to this will be reverse-proxied to the upstream.
Expand Down Expand Up @@ -132,7 +125,8 @@ func NewRequestBridge(ctx context.Context, providers []provider.Provider, rec re

// newInterceptionProcessor returns an [http.HandlerFunc] which is capable of creating a new interceptor and processing a given request
// using [Provider] p, recording all usage events using [Recorder] rec.
func newInterceptionProcessor(p provider.Provider, rec recorder.Recorder, mcpProxy mcp.ServerProxier, logger slog.Logger, m *metrics.Metrics, tracer trace.Tracer) http.HandlerFunc {
// If cbs is non-nil, circuit breaker protection is applied per endpoint/model tuple.
func newInterceptionProcessor(p provider.Provider, cbs *circuitbreaker.ProviderCircuitBreakers, rec recorder.Recorder, mcpProxy mcp.ServerProxier, logger slog.Logger, m *metrics.Metrics, tracer trace.Tracer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(r.Context(), "Intercept")
defer span.End()
Expand Down Expand Up @@ -202,7 +196,10 @@ func newInterceptionProcessor(p provider.Provider, rec recorder.Recorder, mcpPro
}()
}

if err := interceptor.ProcessRequest(w, r); err != nil {
// Process request with circuit breaker protection if configured
if err := cbs.Execute(route, interceptor.Model(), w, func(rw http.ResponseWriter) error {
return interceptor.ProcessRequest(rw, r)
}); err != nil {
if m != nil {
m.InterceptionCount.WithLabelValues(p.Name(), interceptor.Model(), metrics.InterceptionCountStatusFailed, route, r.Method, actor.ID).Add(1)
}
Expand All @@ -214,6 +211,7 @@ func newInterceptionProcessor(p provider.Provider, rec recorder.Recorder, mcpPro
}
log.Debug(ctx, "interception ended")
}

asyncRecorder.RecordInterceptionEnded(ctx, &recorder.InterceptionRecordEnded{ID: interceptor.ID().String()})

// Ensure all recording have completed before completing request.
Expand Down
Loading