Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 68 additions & 7 deletions go/core/internal/a2a/a2a_registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ import (
type A2ARegistrar struct {
cache crcache.Cache
handlerMux A2AHandlerMux
clientRegistry *AgentClientRegistry
a2aBaseURL string
sandboxA2AURL string
authenticator auth.AuthProvider
a2aBaseOptions []a2aclient.Option
agentObserver AgentObserver
}

type AgentObserver interface {
NotifyAgentsChanged(ctx context.Context)
}

var _ manager.Runnable = (*A2ARegistrar)(nil)
Expand All @@ -45,11 +51,12 @@ func NewA2ARegistrar(
streamingTimeout time.Duration,
) *A2ARegistrar {
reg := &A2ARegistrar{
cache: cache,
handlerMux: mux,
a2aBaseURL: a2aBaseUrl,
sandboxA2AURL: sandboxA2ABaseURL,
authenticator: authenticator,
cache: cache,
handlerMux: mux,
clientRegistry: NewAgentClientRegistry(),
a2aBaseURL: a2aBaseUrl,
sandboxA2AURL: sandboxA2ABaseURL,
authenticator: authenticator,
a2aBaseOptions: []a2aclient.Option{
a2aclient.WithTimeout(streamingTimeout),
a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf),
Expand Down Expand Up @@ -82,6 +89,18 @@ func (a *A2ARegistrar) Start(ctx context.Context) error {
return nil
}

// ClientRegistry returns the registry of A2A clients for direct agent
// invocation, populated as agents are registered and deregistered.
func (a *A2ARegistrar) ClientRegistry() *AgentClientRegistry {
return a.clientRegistry
}

// SetAgentObserver registers an observer notified whenever an agent is added,
// updated, or removed.
func (a *A2ARegistrar) SetAgentObserver(o AgentObserver) {
a.agentObserver = o
}

func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1alpha2.AgentObject, log logr.Logger) error {
informer, err := a.cache.GetInformer(ctx, prototype)
if err != nil {
Expand All @@ -96,19 +115,29 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al
}
if err := a.upsertAgentHandler(ctx, agent, log); err != nil {
log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(agent))
return
}
a.notifyAgentChange(ctx)
},
UpdateFunc: func(oldObj, newObj any) {
oldAgent, ok1 := informerAgentObject(oldObj)
newAgent, ok2 := informerAgentObject(newObj)
if !ok1 || !ok2 {
return
}
if oldAgent.GetGeneration() != newAgent.GetGeneration() || !sameAgentSpec(oldAgent, newAgent) {
specChanged := oldAgent.GetGeneration() != newAgent.GetGeneration() || !sameAgentSpec(oldAgent, newAgent)
if specChanged {
if err := a.upsertAgentHandler(ctx, newAgent, log); err != nil {
log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(newAgent))
return
}
}
// Also notify when readiness conditions change so subscribers don't
// hold stale agent lists (the resource filter uses Accepted +
// DeploymentReady, which are status conditions, not spec fields).
if specChanged || agentReadinessChanged(oldAgent, newAgent) {
a.notifyAgentChange(ctx)
}
},
DeleteFunc: func(obj any) {
agent, ok := deletedInformerAgentObject(obj)
Expand All @@ -117,7 +146,9 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al
}
ref := a2aRouteKey(agent)
a.handlerMux.RemoveAgentHandler(ref)
a.clientRegistry.delete(ref)
log.V(1).Info("removed A2A handler", "agent", ref)
a.notifyAgentChange(ctx)
},
}); err != nil {
return fmt.Errorf("failed to add informer event handler for %T: %w", prototype, err)
Expand All @@ -126,6 +157,33 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al
return nil
}

func (a *A2ARegistrar) notifyAgentChange(ctx context.Context) {
if a.agentObserver != nil {
a.agentObserver.NotifyAgentsChanged(ctx)
}
}

func agentReadinessChanged(oldAgent, newAgent v1alpha2.AgentObject) bool {
return isAgentReady(oldAgent) != isAgentReady(newAgent)
}

func isAgentReady(agent v1alpha2.AgentObject) bool {
status := agent.GetAgentStatus()
if status == nil {
return false
}
deploymentReady, accepted := false, false
for _, c := range status.Conditions {
if c.Type == "Ready" && c.Reason == "DeploymentReady" && string(c.Status) == "True" {
deploymentReady = true
}
if c.Type == "Accepted" && string(c.Status) == "True" {
accepted = true
}
}
return deploymentReady && accepted
}

func sameAgentSpec(oldAgent, newAgent v1alpha2.AgentObject) bool {
oldSpec := oldAgent.GetAgentSpec()
newSpec := newAgent.GetAgentSpec()
Expand Down Expand Up @@ -182,10 +240,13 @@ func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent v1alpha2.Ag
cardCopy := *card
cardCopy.URL = a.a2aRouteURL(agent)

if err := a.handlerMux.SetAgentHandler(a2aRouteKey(agent), client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil {
routeRef := a2aRouteKey(agent)
if err := a.handlerMux.SetAgentHandler(routeRef, client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil {
return fmt.Errorf("set handler for %s: %w", agentRef, err)
}

a.clientRegistry.set(routeRef, client)

log.V(1).Info("registered/updated A2A handler", "agent", agentRef)
return nil
}
Expand Down
58 changes: 58 additions & 0 deletions go/core/internal/a2a/agent_client_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package a2a

import (
"context"
"fmt"
"sync"

a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
"trpc.group/trpc-go/trpc-a2a-go/protocol"
)

// AgentClientRegistry maps agent route keys to their A2A clients.
// The A2ARegistrar populates it; the MCP handler reads from it to invoke
// agents without an HTTP round trip through the controller's own A2A listener.
type AgentClientRegistry struct {
mu sync.RWMutex
clients map[string]*a2aclient.A2AClient
}

func NewAgentClientRegistry() *AgentClientRegistry {
return &AgentClientRegistry{clients: make(map[string]*a2aclient.A2AClient)}
}

// set stores the client under the agent's route key (e.g. "namespace/name" or
// "sandboxes/namespace/name").
func (r *AgentClientRegistry) set(agentRef string, c *a2aclient.A2AClient) {
r.mu.Lock()
defer r.mu.Unlock()
r.clients[agentRef] = c
}

// delete removes the client for the given agent route key.
func (r *AgentClientRegistry) delete(agentRef string) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.clients, agentRef)
}

// Register adds or replaces the A2A client for the given agent. It is the
// exported counterpart of set, intended for use in tests and explicit
// registrations outside the A2ARegistrar lifecycle.
func (r *AgentClientRegistry) Register(namespace, name string, c *a2aclient.A2AClient) {
r.set(namespace+"/"+name, c)
}

// SendMessage invokes an agent directly via its cached A2A client.
// namespace and name must identify a non-sandbox agent; sandbox agents use a
// different route key and are not yet reachable via this method.
func (r *AgentClientRegistry) SendMessage(ctx context.Context, namespace, name string, params protocol.SendMessageParams) (*protocol.MessageResult, error) {
key := namespace + "/" + name
r.mu.RLock()
c, ok := r.clients[key]
r.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("agent %s/%s not registered", namespace, name)
}
return c.SendMessage(ctx, params)
}
Loading
Loading