From b4fde785362ed1d73d2e15e7c292a4da86d2c733 Mon Sep 17 00:00:00 2001 From: Brian Fox <878612+onematchfox@users.noreply.github.com> Date: Tue, 12 May 2026 16:50:31 +0200 Subject: [PATCH 1/3] refactor(controller): invoke agents directly in MCP handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the HTTP round-trip through the controller's own A2A listener with direct invocation via a new `AgentClientRegistry`. The registry is owned by `A2ARegistrar`, which already maintains an `A2AClient` per agent for its HTTP mux — the registry gives the MCP handler access to those same clients without an extra network hop. The old approach routed through the controller's public A2A endpoint, meaning requests could traverse the external network (and any ingress or load-balancer in front of it) unnecessarily. The new path stays in-process. The old handler also cached its own `A2AClient` per agent in a `sync.Map` with no eviction, so clients for deleted agents would remain indefinitely. The registry is kept consistent by the registrar's add/update/delete lifecycle, eliminating that staleness. `A2ARegistrar.upsertAgentHandler` writes to both the HTTP mux (for inbound /api/a2a/// routing) and the registry (for direct invocation). The registry is exposed via `ClientRegistry()` and passed to `NewMCPHandler` in app.go. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com> --- go/core/internal/a2a/a2a_registrar.go | 24 ++- go/core/internal/a2a/agent_client_registry.go | 58 +++++++ go/core/internal/mcp/mcp_handler.go | 77 ++------- go/core/internal/mcp/mcp_handler_test.go | 159 ++++++++++++++++++ go/core/pkg/app/app.go | 12 +- 5 files changed, 253 insertions(+), 77 deletions(-) create mode 100644 go/core/internal/a2a/agent_client_registry.go create mode 100644 go/core/internal/mcp/mcp_handler_test.go diff --git a/go/core/internal/a2a/a2a_registrar.go b/go/core/internal/a2a/a2a_registrar.go index d21dbec04..f3d96e9fd 100644 --- a/go/core/internal/a2a/a2a_registrar.go +++ b/go/core/internal/a2a/a2a_registrar.go @@ -26,6 +26,7 @@ import ( type A2ARegistrar struct { cache crcache.Cache handlerMux A2AHandlerMux + clientRegistry *AgentClientRegistry a2aBaseURL string sandboxA2AURL string authenticator auth.AuthProvider @@ -45,11 +46,12 @@ func NewA2ARegistrar( streamingTimeout time.Duration, ) *A2ARegistrar { reg := &A2ARegistrar{ - cache: cache, - handlerMux: mux, - a2aBaseURL: a2aBaseUrl, - sandboxA2AURL: sandboxA2ABaseURL, - authenticator: authenticator, + cache: cache, + handlerMux: mux, + clientRegistry: NewAgentClientRegistry(), + a2aBaseURL: a2aBaseUrl, + sandboxA2AURL: sandboxA2ABaseURL, + authenticator: authenticator, a2aBaseOptions: []a2aclient.Option{ a2aclient.WithTimeout(streamingTimeout), a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf), @@ -60,6 +62,12 @@ func NewA2ARegistrar( return reg } +// ClientRegistry returns the registry of A2A clients for direct agent +// invocation, populated as agents are registered and deregistered. +func (a *A2ARegistrar) ClientRegistry() *AgentClientRegistry { + return a.clientRegistry +} + func (a *A2ARegistrar) NeedLeaderElection() bool { return false } @@ -117,6 +125,7 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al } ref := a2aRouteKey(agent) a.handlerMux.RemoveAgentHandler(ref) + a.clientRegistry.delete(ref) log.V(1).Info("removed A2A handler", "agent", ref) }, }); err != nil { @@ -182,10 +191,13 @@ func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent v1alpha2.Ag cardCopy := *card cardCopy.URL = a.a2aRouteURL(agent) - if err := a.handlerMux.SetAgentHandler(a2aRouteKey(agent), client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil { + routeRef := a2aRouteKey(agent) + if err := a.handlerMux.SetAgentHandler(routeRef, client, cardCopy, newA2ATracingMiddleware(agentRef, provider)); err != nil { return fmt.Errorf("set handler for %s: %w", agentRef, err) } + a.clientRegistry.set(routeRef, client) + log.V(1).Info("registered/updated A2A handler", "agent", agentRef) return nil } diff --git a/go/core/internal/a2a/agent_client_registry.go b/go/core/internal/a2a/agent_client_registry.go new file mode 100644 index 000000000..1e5b7b6bc --- /dev/null +++ b/go/core/internal/a2a/agent_client_registry.go @@ -0,0 +1,58 @@ +package a2a + +import ( + "context" + "fmt" + "sync" + + a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" + "trpc.group/trpc-go/trpc-a2a-go/protocol" +) + +// AgentClientRegistry maps agent route keys to their A2A clients. +// The A2ARegistrar populates it; the MCP handler reads from it to invoke +// agents without an HTTP round trip through the controller's own A2A listener. +type AgentClientRegistry struct { + mu sync.RWMutex + clients map[string]*a2aclient.A2AClient +} + +func NewAgentClientRegistry() *AgentClientRegistry { + return &AgentClientRegistry{clients: make(map[string]*a2aclient.A2AClient)} +} + +// set stores the client under the agent's route key (e.g. "namespace/name" or +// "sandboxes/namespace/name"). +func (r *AgentClientRegistry) set(agentRef string, c *a2aclient.A2AClient) { + r.mu.Lock() + defer r.mu.Unlock() + r.clients[agentRef] = c +} + +// delete removes the client for the given agent route key. +func (r *AgentClientRegistry) delete(agentRef string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.clients, agentRef) +} + +// Register adds or replaces the A2A client for the given agent. It is the +// exported counterpart of set, intended for use in tests and explicit +// registrations outside the A2ARegistrar lifecycle. +func (r *AgentClientRegistry) Register(namespace, name string, c *a2aclient.A2AClient) { + r.set(namespace+"/"+name, c) +} + +// SendMessage invokes an agent directly via its cached A2A client. +// namespace and name must identify a non-sandbox agent; sandbox agents use a +// different route key and are not yet reachable via this method. +func (r *AgentClientRegistry) SendMessage(ctx context.Context, namespace, name string, params protocol.SendMessageParams) (*protocol.MessageResult, error) { + key := namespace + "/" + name + r.mu.RLock() + c, ok := r.clients[key] + r.mu.RUnlock() + if !ok { + return nil, fmt.Errorf("agent %s/%s not found or not ready", namespace, name) + } + return c.SendMessage(ctx, params) +} diff --git a/go/core/internal/mcp/mcp_handler.go b/go/core/internal/mcp/mcp_handler.go index 8182df6fb..94619065b 100644 --- a/go/core/internal/mcp/mcp_handler.go +++ b/go/core/internal/mcp/mcp_handler.go @@ -5,31 +5,24 @@ import ( "fmt" "net/http" "strings" - "sync" - "time" "github.com/kagent-dev/kagent/go/api/v1alpha2" "github.com/kagent-dev/kagent/go/core/internal/a2a" - authimpl "github.com/kagent-dev/kagent/go/core/internal/httpserver/auth" "github.com/kagent-dev/kagent/go/core/internal/version" "github.com/kagent-dev/kagent/go/core/pkg/auth" mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ctrllog "sigs.k8s.io/controller-runtime/pkg/log" - a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" "trpc.group/trpc-go/trpc-a2a-go/protocol" ) // MCPHandler handles MCP requests and bridges them to A2A endpoints type MCPHandler struct { kubeClient client.Client - a2aBaseURL string - a2aTimeout time.Duration + agentClients *a2a.AgentClientRegistry authenticator auth.AuthProvider httpHandler *mcpsdk.StreamableHTTPHandler server *mcpsdk.Server - a2aClients sync.Map } // Input types for MCP tools @@ -56,20 +49,12 @@ type InvokeAgentOutput struct { ContextID string `json:"context_id,omitempty"` } -// defaultA2ATimeout is the fallback timeout for A2A client calls and should match -// the configured default streaming timeout. -const defaultA2ATimeout = 10 * time.Minute - -// NewMCPHandler creates a new MCP handler -// Wraps the StreamableHTTPHandler and adds A2A bridging and context management. -func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator auth.AuthProvider, a2aTimeout time.Duration) (*MCPHandler, error) { - if a2aTimeout <= 0 { - a2aTimeout = defaultA2ATimeout - } +// NewMCPHandler creates a new MCP handler that bridges MCP tool calls directly +// to agent A2A clients, bypassing the controller's own HTTP A2A listener. +func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegistry, authenticator auth.AuthProvider) (*MCPHandler, error) { handler := &MCPHandler{ kubeClient: kubeClient, - a2aBaseURL: a2aBaseURL, - a2aTimeout: a2aTimeout, + agentClients: agentClients, authenticator: authenticator, } @@ -183,9 +168,9 @@ func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolR func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallToolRequest, input InvokeAgentInput) (*mcpsdk.CallToolResult, InvokeAgentOutput, error) { log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "invoke_agent") - // Parse agent reference (namespace/name or just name) - agentNS, agentName, ok := strings.Cut(input.Agent, "/") - if !ok { + // Parse agent reference — must be exactly "namespace/name". + parts := strings.SplitN(input.Agent, "/", 3) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { return &mcpsdk.CallToolResult{ Content: []mcpsdk.Content{ &mcpsdk.TextContent{Text: "agent must be in format 'namespace/name'"}, @@ -193,8 +178,8 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallTool IsError: true, }, InvokeAgentOutput{}, nil } + agentNS, agentName := parts[0], parts[1] agentRef := agentNS + "/" + agentName - agentNns := types.NamespacedName{Namespace: agentNS, Name: agentName} // Get context ID from client request (stateless mode) // If not provided, contextIDPtr will be nil and a new conversation will start @@ -204,47 +189,9 @@ func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallTool log.V(1).Info("Using context_id from client request", "context_id", input.ContextID) } - // Get or create cached A2A client for this agent - a2aURL := fmt.Sprintf("%s/%s/", h.a2aBaseURL, agentRef) - var a2aClient *a2aclient.A2AClient - - if cached, ok := h.a2aClients.Load(agentRef); ok { - if client, ok := cached.(*a2aclient.A2AClient); ok { - a2aClient = client - } - } - - // Create new client if not cached - if a2aClient == nil { - // Build A2A client options with authentication propagation - a2aOpts := []a2aclient.Option{ - a2aclient.WithTimeout(h.a2aTimeout), - a2aclient.WithHTTPReqHandler( - authimpl.A2ARequestHandler( - h.authenticator, - agentNns, - ), - ), - } - - newClient, err := a2aclient.NewA2AClient(a2aURL, a2aOpts...) - if err != nil { - log.Error(err, "Failed to create A2A client", "agent", agentRef) - return &mcpsdk.CallToolResult{ - Content: []mcpsdk.Content{ - &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to create A2A client: %v", err)}, - }, - IsError: true, - }, InvokeAgentOutput{}, nil - } - - // Cache the client - h.a2aClients.Store(agentRef, newClient) - a2aClient = newClient - } - - // Send message via A2A - result, err := a2aClient.SendMessage(ctx, protocol.SendMessageParams{ + // Send message directly via the agent's A2A client, bypassing the + // controller's own HTTP A2A listener. + result, err := h.agentClients.SendMessage(ctx, agentNS, agentName, protocol.SendMessageParams{ Message: protocol.Message{ Kind: protocol.KindMessage, Role: protocol.MessageRoleUser, diff --git a/go/core/internal/mcp/mcp_handler_test.go b/go/core/internal/mcp/mcp_handler_test.go new file mode 100644 index 000000000..d343df520 --- /dev/null +++ b/go/core/internal/mcp/mcp_handler_test.go @@ -0,0 +1,159 @@ +package mcp + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "sync" + "testing" + + "github.com/kagent-dev/kagent/go/core/internal/a2a" + mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" +) + +// a2aBackend is a fake A2A server that records whether it was called. +type a2aBackend struct { + server *httptest.Server + mu sync.Mutex + called bool +} + +func (b *a2aBackend) wasCalled() bool { + b.mu.Lock() + defer b.mu.Unlock() + return b.called +} + +func newA2ABackend(t *testing.T) *a2aBackend { + t.Helper() + b := &a2aBackend{} + b.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b.mu.Lock() + b.called = true + b.mu.Unlock() + resp := map[string]any{ + "jsonrpc": "2.0", + "id": "", + "result": map[string]any{ + "kind": "message", + "messageId": "test-msg", + "role": "agent", + "parts": []any{map[string]any{"kind": "text", "text": "hello from agent"}}, + }, + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(resp); err != nil { + t.Errorf("failed to encode fake A2A response: %v", err) + } + })) + t.Cleanup(b.server.Close) + return b +} + +// newTestRegistry builds an AgentClientRegistry with a single agent pre-registered. +func newTestRegistry(t *testing.T, namespace, name, backendURL string) *a2a.AgentClientRegistry { + t.Helper() + c, err := a2aclient.NewA2AClient(backendURL + "/" + namespace + "/" + name + "/") + require.NoError(t, err) + registry := a2a.NewAgentClientRegistry() + registry.Register(namespace, name, c) + return registry +} + +// TestInvokeAgent_InvalidAgentRef verifies that invoke_agent returns a tool +// error for agent references that are not exactly "namespace/name". +func TestInvokeAgent_InvalidAgentRef(t *testing.T) { + for _, ref := range []string{"no-slash", "ns/name/extra", "/name", "ns/"} { + t.Run(ref, func(t *testing.T) { + registry := a2a.NewAgentClientRegistry() + mcpHandler, err := NewMCPHandler(nil, registry, nil) + require.NoError(t, err) + + mcpServer := httptest.NewServer(mcpHandler) + t.Cleanup(mcpServer.Close) + + transport := &mcpsdk.StreamableClientTransport{ + Endpoint: mcpServer.URL, + DisableStandaloneSSE: true, + } + + ctx := context.Background() + cs, err := mcpsdk.NewClient(&mcpsdk.Implementation{Name: "test", Version: "1.0"}, nil). + Connect(ctx, transport, nil) + require.NoError(t, err) + t.Cleanup(func() { cs.Close() }) + + result, err := cs.CallTool(ctx, &mcpsdk.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]any{"agent": ref, "task": "say hello"}, + }) + require.NoError(t, err) + assert.True(t, result.IsError, "expected a tool error for invalid agent ref %q", ref) + }) + } +} + +// TestInvokeAgent_UnregisteredAgent verifies that invoke_agent returns a tool +// error when the requested agent is not present in the AgentClientRegistry. +func TestInvokeAgent_UnregisteredAgent(t *testing.T) { + registry := a2a.NewAgentClientRegistry() // empty — no agents registered + mcpHandler, err := NewMCPHandler(nil, registry, nil) + require.NoError(t, err) + + mcpServer := httptest.NewServer(mcpHandler) + t.Cleanup(mcpServer.Close) + + transport := &mcpsdk.StreamableClientTransport{ + Endpoint: mcpServer.URL, + DisableStandaloneSSE: true, + } + + ctx := context.Background() + cs, err := mcpsdk.NewClient(&mcpsdk.Implementation{Name: "test", Version: "1.0"}, nil). + Connect(ctx, transport, nil) + require.NoError(t, err) + t.Cleanup(func() { cs.Close() }) + + result, err := cs.CallTool(ctx, &mcpsdk.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]any{"agent": "default/unknown-agent", "task": "say hello"}, + }) + require.NoError(t, err) + assert.True(t, result.IsError, "expected a tool error for an unregistered agent") +} + +// TestInvokeAgent_RoutesViaRegistry verifies that invoke_agent retrieves the +// pre-registered A2A client from AgentClientRegistry and forwards the call. +func TestInvokeAgent_RoutesViaRegistry(t *testing.T) { + backend := newA2ABackend(t) + + registry := newTestRegistry(t, "default", "test-agent", backend.server.URL) + mcpHandler, err := NewMCPHandler(nil, registry, nil) + require.NoError(t, err) + + mcpServer := httptest.NewServer(mcpHandler) + t.Cleanup(mcpServer.Close) + + transport := &mcpsdk.StreamableClientTransport{ + Endpoint: mcpServer.URL, + DisableStandaloneSSE: true, + } + + ctx := context.Background() + cs, err := mcpsdk.NewClient(&mcpsdk.Implementation{Name: "test", Version: "1.0"}, nil). + Connect(ctx, transport, nil) + require.NoError(t, err) + t.Cleanup(func() { cs.Close() }) + + result, err := cs.CallTool(ctx, &mcpsdk.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]any{"agent": "default/test-agent", "task": "say hello"}, + }) + require.NoError(t, err) + assert.False(t, result.IsError, "unexpected tool error: %v", result.Content) + assert.True(t, backend.wasCalled(), "A2A backend should have received the forwarded request") +} diff --git a/go/core/pkg/app/app.go b/go/core/pkg/app/app.go index d47ab55ad..f14476cf1 100644 --- a/go/core/pkg/app/app.go +++ b/go/core/pkg/app/app.go @@ -613,8 +613,7 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne // Register A2A handlers on all replicas a2aHandler := a2a.NewA2AHttpMux(httpserver.APIPathA2A, httpserver.APIPathA2ASandboxes, extensionCfg.Authenticator) - - if err := mgr.Add(a2a.NewA2ARegistrar( + a2aRegistrar := a2a.NewA2ARegistrar( mgr.GetCache(), a2aHandler, cfg.A2ABaseUrl+httpserver.APIPathA2A, @@ -623,17 +622,18 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne int(cfg.Streaming.MaxBufSize.Value()), int(cfg.Streaming.InitialBufSize.Value()), cfg.Streaming.Timeout, - )); err != nil { + ) + if err := mgr.Add(a2aRegistrar); err != nil { setupLog.Error(err, "unable to set up a2a registrar") os.Exit(1) } - // Create MCP handler that bridges to A2A + // Create MCP handler that invokes agents directly via their A2A clients, + // bypassing the controller's own HTTP A2A listener. mcpHandler, err := mcp.NewMCPHandler( mgr.GetClient(), - cfg.A2ABaseUrl+httpserver.APIPathA2A, + a2aRegistrar.ClientRegistry(), extensionCfg.Authenticator, - cfg.Streaming.Timeout, ) if err != nil { setupLog.Error(err, "unable to create MCP handler") From aacdefa842abac79a6d423f334a86ef700705f6d Mon Sep 17 00:00:00 2001 From: Brian Fox <878612+onematchfox@users.noreply.github.com> Date: Tue, 12 May 2026 14:19:38 +0200 Subject: [PATCH 2/3] feat(controller): expose agents as MCP resource with change notifications Adds a `kagent://agents` [MCP resource](https://modelcontextprotocol.io/specification/2025-06-18/server/resources) that lists agents as JSON. Clients can read the resource directly or subscribe to receive notifications when the agent list changes. I've left the existing `list_agents` tool in place for now to not break existing consumers although this can potentially be removed prior to the next breaking version? - Extract listReadyAgents helper shared by tool and resource handlers - Register kagent://agents resource with subscribe/unsubscribe support - Add NotifyAgentsChanged on MCPHandler, called via a new callback on A2ARegistrar whenever agents are added, updated, or removed - Wire the callback in app.go Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com> --- go/core/internal/a2a/a2a_registrar.go | 59 ++++++++++-- go/core/internal/a2a/agent_client_registry.go | 2 +- go/core/internal/mcp/mcp_handler.go | 92 ++++++++++++++----- go/core/pkg/app/app.go | 1 + 4 files changed, 125 insertions(+), 29 deletions(-) diff --git a/go/core/internal/a2a/a2a_registrar.go b/go/core/internal/a2a/a2a_registrar.go index f3d96e9fd..da3c8b1a5 100644 --- a/go/core/internal/a2a/a2a_registrar.go +++ b/go/core/internal/a2a/a2a_registrar.go @@ -31,6 +31,7 @@ type A2ARegistrar struct { sandboxA2AURL string authenticator auth.AuthProvider a2aBaseOptions []a2aclient.Option + onAgentChange func(ctx context.Context) } var _ manager.Runnable = (*A2ARegistrar)(nil) @@ -62,12 +63,6 @@ func NewA2ARegistrar( return reg } -// ClientRegistry returns the registry of A2A clients for direct agent -// invocation, populated as agents are registered and deregistered. -func (a *A2ARegistrar) ClientRegistry() *AgentClientRegistry { - return a.clientRegistry -} - func (a *A2ARegistrar) NeedLeaderElection() bool { return false } @@ -90,6 +85,18 @@ func (a *A2ARegistrar) Start(ctx context.Context) error { return nil } +// ClientRegistry returns the registry of A2A clients for direct agent +// invocation, populated as agents are registered and deregistered. +func (a *A2ARegistrar) ClientRegistry() *AgentClientRegistry { + return a.clientRegistry +} + +// SetAgentObserver registers an observer notified whenever an agent is added, +// updated, or removed. +func (a *A2ARegistrar) SetAgentObserver(o AgentObserver) { + a.agentObserver = o +} + func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1alpha2.AgentObject, log logr.Logger) error { informer, err := a.cache.GetInformer(ctx, prototype) if err != nil { @@ -104,7 +111,9 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al } if err := a.upsertAgentHandler(ctx, agent, log); err != nil { log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(agent)) + return } + a.notifyAgentChange(ctx) }, UpdateFunc: func(oldObj, newObj any) { oldAgent, ok1 := informerAgentObject(oldObj) @@ -112,11 +121,19 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al if !ok1 || !ok2 { return } - if oldAgent.GetGeneration() != newAgent.GetGeneration() || !sameAgentSpec(oldAgent, newAgent) { + specChanged := oldAgent.GetGeneration() != newAgent.GetGeneration() || !sameAgentSpec(oldAgent, newAgent) + if specChanged { if err := a.upsertAgentHandler(ctx, newAgent, log); err != nil { log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(newAgent)) + return } } + // Also notify when readiness conditions change so subscribers don't + // hold stale agent lists (the resource filter uses Accepted + + // DeploymentReady, which are status conditions, not spec fields). + if specChanged || agentReadinessChanged(oldAgent, newAgent) { + a.notifyAgentChange(ctx) + } }, DeleteFunc: func(obj any) { agent, ok := deletedInformerAgentObject(obj) @@ -127,6 +144,7 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al a.handlerMux.RemoveAgentHandler(ref) a.clientRegistry.delete(ref) log.V(1).Info("removed A2A handler", "agent", ref) + a.notifyAgentChange(ctx) }, }); err != nil { return fmt.Errorf("failed to add informer event handler for %T: %w", prototype, err) @@ -135,6 +153,33 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al return nil } +func (a *A2ARegistrar) notifyAgentChange(ctx context.Context) { + if a.onAgentChange != nil { + a.onAgentChange(ctx) + } +} + +func agentReadinessChanged(oldAgent, newAgent v1alpha2.AgentObject) bool { + return isAgentReady(oldAgent) != isAgentReady(newAgent) +} + +func isAgentReady(agent v1alpha2.AgentObject) bool { + status := agent.GetAgentStatus() + if status == nil { + return false + } + deploymentReady, accepted := false, false + for _, c := range status.Conditions { + if c.Type == "Ready" && c.Reason == "DeploymentReady" && string(c.Status) == "True" { + deploymentReady = true + } + if c.Type == "Accepted" && string(c.Status) == "True" { + accepted = true + } + } + return deploymentReady && accepted +} + func sameAgentSpec(oldAgent, newAgent v1alpha2.AgentObject) bool { oldSpec := oldAgent.GetAgentSpec() newSpec := newAgent.GetAgentSpec() diff --git a/go/core/internal/a2a/agent_client_registry.go b/go/core/internal/a2a/agent_client_registry.go index 1e5b7b6bc..e6ddfb68c 100644 --- a/go/core/internal/a2a/agent_client_registry.go +++ b/go/core/internal/a2a/agent_client_registry.go @@ -52,7 +52,7 @@ func (r *AgentClientRegistry) SendMessage(ctx context.Context, namespace, name s c, ok := r.clients[key] r.mu.RUnlock() if !ok { - return nil, fmt.Errorf("agent %s/%s not found or not ready", namespace, name) + return nil, fmt.Errorf("agent %s/%s not registered", namespace, name) } return c.SendMessage(ctx, params) } diff --git a/go/core/internal/mcp/mcp_handler.go b/go/core/internal/mcp/mcp_handler.go index 94619065b..3dc289476 100644 --- a/go/core/internal/mcp/mcp_handler.go +++ b/go/core/internal/mcp/mcp_handler.go @@ -2,6 +2,7 @@ package mcp import ( "context" + "encoding/json" "fmt" "net/http" "strings" @@ -38,7 +39,7 @@ type AgentSummary struct { } type InvokeAgentInput struct { - Agent string `json:"agent" jsonschema:"Agent reference in format namespace/name"` + Agent string `json:"agent" jsonschema:"Agent reference in format namespace/name. To find a list of available sources, use the 'agents' resource."` Task string `json:"task" jsonschema:"Task to run"` ContextID string `json:"context_id,omitempty" jsonschema:"Optional A2A context ID to continue a conversation"` } @@ -63,7 +64,12 @@ func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegist Name: "kagent-agents", Version: version.Version, } - server := mcpsdk.NewServer(impl, nil) + server := mcpsdk.NewServer(impl, &mcpsdk.ServerOptions{ + // No-op handlers enable subscription tracking in the SDK; actual + // notifications are sent via NotifyAgentsChanged. + SubscribeHandler: func(context.Context, *mcpsdk.SubscribeRequest) error { return nil }, + UnsubscribeHandler: func(context.Context, *mcpsdk.UnsubscribeRequest) error { return nil }, + }) handler.server = server // Add list_agents tool @@ -86,6 +92,17 @@ func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegist handler.handleInvokeAgent, ) + // Add agents resource for clients that pre-populate context + server.AddResource( + &mcpsdk.Resource{ + URI: "kagent://agents", + Name: "agents", + Description: "List of invokable kagent agents (accepted + deploymentReady)", + MIMEType: "application/json", + }, + handler.readAgentsResource, + ) + // Create HTTP handler handler.httpHandler = mcpsdk.NewStreamableHTTPHandler( func(*http.Request) *mcpsdk.Server { @@ -97,23 +114,14 @@ func NewMCPHandler(kubeClient client.Client, agentClients *a2a.AgentClientRegist return handler, nil } -// handleListAgents handles the list_agents MCP tool -func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolRequest, input ListAgentsInput) (*mcpsdk.CallToolResult, ListAgentsOutput, error) { - log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "list_agents") - +// listReadyAgents returns agents that are accepted and deployment-ready. +func (h *MCPHandler) listReadyAgents(ctx context.Context) ([]AgentSummary, error) { agentList := &v1alpha2.AgentList{} if err := h.kubeClient.List(ctx, agentList); err != nil { - return &mcpsdk.CallToolResult{ - Content: []mcpsdk.Content{ - &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to list agents: %v", err)}, - }, - IsError: true, - }, ListAgentsOutput{}, nil + return nil, err } - - agents := make([]AgentSummary, 0) + agents := make([]AgentSummary, 0, len(agentList.Items)) for _, agent := range agentList.Items { - // Check if agent is accepted and deployment ready deploymentReady := false accepted := false for _, condition := range agent.Status.Conditions { @@ -124,18 +132,30 @@ func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolR accepted = true } } - if !accepted || !deploymentReady { continue } - - ref := agent.Namespace + "/" + agent.Name - description := agent.Spec.Description agents = append(agents, AgentSummary{ - Ref: ref, - Description: description, + Ref: agent.Namespace + "/" + agent.Name, + Description: agent.Spec.Description, }) } + return agents, nil +} + +// handleListAgents handles the list_agents MCP tool +func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolRequest, input ListAgentsInput) (*mcpsdk.CallToolResult, ListAgentsOutput, error) { + log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "list_agents") + + agents, err := h.listReadyAgents(ctx) + if err != nil { + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to list agents: %v", err)}, + }, + IsError: true, + }, ListAgentsOutput{}, nil + } log.Info("Listed agents", "count", len(agents)) @@ -164,6 +184,36 @@ func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolR }, output, nil } +// readAgentsResource handles reads of the kagent://agents resource. +func (h *MCPHandler) readAgentsResource(ctx context.Context, req *mcpsdk.ReadResourceRequest) (*mcpsdk.ReadResourceResult, error) { + agents, err := h.listReadyAgents(ctx) + if err != nil { + return nil, fmt.Errorf("listing agents: %w", err) + } + data, err := json.Marshal(agents) + if err != nil { + return nil, fmt.Errorf("marshaling agents: %w", err) + } + return &mcpsdk.ReadResourceResult{ + Contents: []*mcpsdk.ResourceContents{{ + URI: "kagent://agents", + MIMEType: "application/json", + Text: string(data), + }}, + }, nil +} + +// NotifyAgentsChanged sends a resources/updated notification for kagent://agents +// to all subscribed clients. Called by A2ARegistrar when agents are added, updated, +// or removed. +func (h *MCPHandler) NotifyAgentsChanged(ctx context.Context) { + if err := h.server.ResourceUpdated(ctx, &mcpsdk.ResourceUpdatedNotificationParams{ + URI: "kagent://agents", + }); err != nil { + ctrllog.FromContext(ctx).WithName("mcp-handler").Error(err, "failed to send resource updated notification") + } +} + // handleInvokeAgent handles the invoke_agent MCP tool func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallToolRequest, input InvokeAgentInput) (*mcpsdk.CallToolResult, InvokeAgentOutput, error) { log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "invoke_agent") diff --git a/go/core/pkg/app/app.go b/go/core/pkg/app/app.go index f14476cf1..05cd1b916 100644 --- a/go/core/pkg/app/app.go +++ b/go/core/pkg/app/app.go @@ -639,6 +639,7 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne setupLog.Error(err, "unable to create MCP handler") os.Exit(1) } + a2aRegistrar.SetAgentChangeCallback(mcpHandler.NotifyAgentsChanged) // +kubebuilder:scaffold:builder if metricsCertWatcher != nil { From e3b33c9d45b6833cb684934726d93b4be3dde16b Mon Sep 17 00:00:00 2001 From: Brian Fox <878612+onematchfox@users.noreply.github.com> Date: Mon, 18 May 2026 13:24:39 +0200 Subject: [PATCH 3/3] refactor: switch callback to use an interface Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com> --- go/core/internal/a2a/a2a_registrar.go | 10 +++++++--- go/core/pkg/app/app.go | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/go/core/internal/a2a/a2a_registrar.go b/go/core/internal/a2a/a2a_registrar.go index da3c8b1a5..84496260f 100644 --- a/go/core/internal/a2a/a2a_registrar.go +++ b/go/core/internal/a2a/a2a_registrar.go @@ -31,7 +31,11 @@ type A2ARegistrar struct { sandboxA2AURL string authenticator auth.AuthProvider a2aBaseOptions []a2aclient.Option - onAgentChange func(ctx context.Context) + agentObserver AgentObserver +} + +type AgentObserver interface { + NotifyAgentsChanged(ctx context.Context) } var _ manager.Runnable = (*A2ARegistrar)(nil) @@ -154,8 +158,8 @@ func (a *A2ARegistrar) registerAgentInformer(ctx context.Context, prototype v1al } func (a *A2ARegistrar) notifyAgentChange(ctx context.Context) { - if a.onAgentChange != nil { - a.onAgentChange(ctx) + if a.agentObserver != nil { + a.agentObserver.NotifyAgentsChanged(ctx) } } diff --git a/go/core/pkg/app/app.go b/go/core/pkg/app/app.go index 05cd1b916..fc43c5c71 100644 --- a/go/core/pkg/app/app.go +++ b/go/core/pkg/app/app.go @@ -639,7 +639,7 @@ func Start(getExtensionConfig GetExtensionConfig, migrationRunner MigrationRunne setupLog.Error(err, "unable to create MCP handler") os.Exit(1) } - a2aRegistrar.SetAgentChangeCallback(mcpHandler.NotifyAgentsChanged) + a2aRegistrar.SetAgentObserver(mcpHandler) // +kubebuilder:scaffold:builder if metricsCertWatcher != nil {