From 45f245b7c07f98724c9e41ab42a9bbefbb25bb1d Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Wed, 10 Jun 2026 10:14:52 -0400 Subject: [PATCH 1/3] Validate broker events at ingress with zod schemas Add a zod discriminated union (keyed on `kind`, matching the SDK's `BrokerEvent` type) for every broker event the app forwards, and validate events ONCE where they enter `BrokerManager` via `HarnessDriverClient.onEvent`. - src/shared/schemas/broker-events.ts: discriminated union of all known event kinds. Every payload uses `.passthrough()` so SDK-minor additions and the dedupe logic's dynamic field reads (`seq`, `event_id`, `id`, `chunk`) survive. Enum-ish fields (runtime/provider/mode) stay loose `z.string()` so a new enum value can't drop an otherwise-valid event. `classifyBrokerEvent` returns valid / unknown / malformed. - broker.ts ingress: known kind that fails the schema is dropped with a per-kind throttled warning; unknown kind is forwarded unchanged with a once-per-kind warning (forward-compat). Valid worker_stream chunks keep their cheap inline-typed fast path, so we never run a discriminated-union parse per keystroke. Removes the `as unknown as BrokerEventRecordPayload` cast at the publish boundary in favor of the typed parsed payload. - agent-store.ts: replace `event.name!` / `event.parent!` non-null assertions with const-narrowed locals (main is the validation point; renderer change is minimal). - broker.test.ts: malformed known event dropped + logged without killing the stream; warning throttled per kind; unknown kind forwarded + logged once; valid events of each major shape flow; classifyBrokerEvent unit tests. Co-Authored-By: Claude Opus 4.8 --- src/main/broker.test.ts | 202 ++++++++++++ src/main/broker.ts | 60 +++- src/renderer/src/stores/agent-store.ts | 72 +++-- src/shared/schemas/broker-events.ts | 432 +++++++++++++++++++++++++ 4 files changed, 730 insertions(+), 36 deletions(-) create mode 100644 src/shared/schemas/broker-events.ts diff --git a/src/main/broker.test.ts b/src/main/broker.test.ts index ed3bcd03..e4858a0a 100644 --- a/src/main/broker.test.ts +++ b/src/main/broker.test.ts @@ -196,6 +196,10 @@ import { resolveAgentRelayMcpCommand, resolveBundledBrokerBinary } from './broker' +import { + classifyBrokerEvent, + KNOWN_BROKER_EVENT_KINDS +} from '../shared/schemas/broker-events' const PROJECT_ID = 'project-1' const originalMcpCommand = process.env.AGENT_RELAY_MCP_COMMAND @@ -1884,6 +1888,204 @@ describe('BrokerManager local + cloud coexistence', () => { }) }) +function brokerEventSends(win: BrowserWindow): unknown[] { + return (win.webContents.send as ReturnType).mock.calls + .filter(([channel]) => channel === 'broker:event') + .map(([, payload]) => payload) +} + +describe('BrokerManager broker event ingress validation', () => { + it('drops a malformed known event, logs once, and keeps the stream alive', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined) + const manager = new BrokerManager() + const win = createMockWindow() + const local = await startLocalWithWindow(manager, win) + const listener = local.onEvent.mock.calls.at(-1)?.[0] + expect(listener).toBeTypeOf('function') + + // relay_inbound is a known kind, but `body` is required by the schema. + listener?.({ + kind: 'relay_inbound', + from: 'codex-2', + target: '#general', + event_id: 'evt-malformed' + }) + + // The malformed event must not be forwarded to the renderer. + expect( + brokerEventSends(win).some( + (payload) => (payload as { event_id?: string }).event_id === 'evt-malformed' + ) + ).toBe(false) + expect(warnSpy).toHaveBeenCalledWith( + '[broker] Dropped malformed broker event:', + expect.objectContaining({ kind: 'relay_inbound' }) + ) + + // A subsequent valid event still flows — the stream is not torn down. + listener?.({ + kind: 'relay_inbound', + from: 'codex-2', + target: '#general', + body: 'still alive', + event_id: 'evt-valid' + }) + expect(win.webContents.send).toHaveBeenCalledWith( + 'broker:event', + expect.objectContaining({ kind: 'relay_inbound', body: 'still alive' }) + ) + + warnSpy.mockRestore() + await manager.shutdown() + }) + + it('throttles repeated malformed warnings for the same kind', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined) + const manager = new BrokerManager() + const win = createMockWindow() + const local = await startLocalWithWindow(manager, win) + const listener = local.onEvent.mock.calls.at(-1)?.[0] + + for (let i = 0; i < 3; i += 1) { + listener?.({ kind: 'agent_idle', name: 'claude-1' }) // missing idle_secs + } + + const idleWarnings = warnSpy.mock.calls.filter( + ([message, detail]) => + message === '[broker] Dropped malformed broker event:' && + (detail as { kind?: string }).kind === 'agent_idle' + ) + expect(idleWarnings).toHaveLength(1) + + warnSpy.mockRestore() + await manager.shutdown() + }) + + it('forwards unknown event kinds unchanged and logs once per kind', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined) + const manager = new BrokerManager() + const win = createMockWindow() + const local = await startLocalWithWindow(manager, win) + const listener = local.onEvent.mock.calls.at(-1)?.[0] + + listener?.({ kind: 'future_event_kind', name: 'claude-1', detail: 'one' }) + listener?.({ kind: 'future_event_kind', name: 'claude-1', detail: 'two' }) + + // Both unknown events are forwarded (not dropped) — forward-compat. + const forwarded = brokerEventSends(win).filter( + (payload) => (payload as { kind?: string }).kind === 'future_event_kind' + ) + expect(forwarded).toHaveLength(2) + expect(forwarded[0]).toMatchObject({ detail: 'one' }) + + // But the unknown-kind telemetry warning fires only once for the kind. + const unknownWarnings = warnSpy.mock.calls.filter( + ([message, detail]) => + message === '[broker] Forwarding unrecognized broker event kind (forward-compat):' && + (detail as { kind?: string }).kind === 'future_event_kind' + ) + expect(unknownWarnings).toHaveLength(1) + + warnSpy.mockRestore() + await manager.shutdown() + }) + + it('parses and forwards valid events of each major shape', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined) + const manager = new BrokerManager() + const win = createMockWindow() + const local = await startLocalWithWindow(manager, win) + const listener = local.onEvent.mock.calls.at(-1)?.[0] + + listener?.({ kind: 'agent_spawned', name: 'claude-2', runtime: 'pty', cli: 'claude' }) + listener?.({ + kind: 'relay_inbound', + from: 'codex-2', + target: '#general', + body: 'hello', + event_id: 'evt-shape' + }) + listener?.({ kind: 'agent_idle', name: 'claude-2', idle_secs: 5 }) + listener?.({ kind: 'delivery_queued', name: 'claude-2', delivery_id: 'd1', event_id: 'e1' }) + // worker_stream is delivered out-of-band on broker:pty-chunk, not broker:event. + listener?.({ kind: 'worker_stream', name: 'claude-2', stream: 'stdout', chunk: 'tick\n' }) + + const kinds = brokerEventSends(win).map((payload) => (payload as { kind?: string }).kind) + expect(kinds).toEqual( + expect.arrayContaining(['agent_spawned', 'relay_inbound', 'agent_idle', 'delivery_queued']) + ) + const ptyCalls = (win.webContents.send as ReturnType).mock.calls + .filter(([channel]) => channel === 'broker:pty-chunk') + expect(ptyCalls).toEqual([['broker:pty-chunk', PROJECT_ID, 'claude-2', 'tick\n']]) + + // None of the valid shapes should have warned. + expect(warnSpy).not.toHaveBeenCalledWith( + '[broker] Dropped malformed broker event:', + expect.anything() + ) + + warnSpy.mockRestore() + await manager.shutdown() + }) +}) + +describe('classifyBrokerEvent', () => { + it('classifies a valid known event and preserves passthrough fields', () => { + const result = classifyBrokerEvent({ + kind: 'worker_stream', + name: 'claude-1', + stream: 'stdout', + chunk: 'hi', + // Extra fields the SDK may add / dedupe reads dynamically must survive. + seq: 42, + event_id: 'evt-1' + }) + expect(result.status).toBe('valid') + if (result.status === 'valid') { + expect(result.kind).toBe('worker_stream') + expect(result.event).toMatchObject({ chunk: 'hi', seq: 42, event_id: 'evt-1' }) + } + }) + + it('flags a known event with a wrong field type as malformed', () => { + const result = classifyBrokerEvent({ kind: 'worker_stream', name: 'claude-1', stream: 'stdout', chunk: 123 }) + expect(result.status).toBe('malformed') + if (result.status === 'malformed') { + expect(result.kind).toBe('worker_stream') + expect(result.reason).toContain('chunk') + } + }) + + it('treats an unknown kind as forwardable, not malformed', () => { + const result = classifyBrokerEvent({ kind: 'brand_new_kind', name: 'x' }) + expect(result.status).toBe('unknown') + if (result.status === 'unknown') { + expect(result.event).toMatchObject({ kind: 'brand_new_kind', name: 'x' }) + } + }) + + it('treats a payload with no usable kind as malformed', () => { + expect(classifyBrokerEvent({ name: 'x' }).status).toBe('malformed') + expect(classifyBrokerEvent(null).status).toBe('malformed') + expect(classifyBrokerEvent({ kind: 42 }).status).toBe('malformed') + }) + + it('recognizes the broker event kinds the app consumes', () => { + for (const kind of [ + 'agent_spawned', + 'agent_exited', + 'relay_inbound', + 'worker_stream', + 'delivery_queued', + 'channel_subscribed', + 'agent_idle', + 'agent_blocked_on_send' + ]) { + expect(KNOWN_BROKER_EVENT_KINDS.has(kind)).toBe(true) + } + }) +}) + describe('BrokerManager spawnAgent CLI preflight', () => { let tempDir: string | null = null diff --git a/src/main/broker.ts b/src/main/broker.ts index 1d34bfc8..3fbdd32a 100644 --- a/src/main/broker.ts +++ b/src/main/broker.ts @@ -28,6 +28,7 @@ import { type GeneratedCommitDraft } from './schemas' import { compactBrokerEvent, normalizeEventTimestamp } from '../shared/lib/broker-events' +import { classifyBrokerEvent } from '../shared/schemas/broker-events' import type { BrokerEventStreamDiagnostic, BrokerReconciledChatMessage, @@ -544,6 +545,9 @@ const EVENT_STREAM_REBIND_COOLDOWN_MS = 5_000 const PTY_CHUNK_IDENTITY_DEDUPE_TTL_MS = 60_000 const PTY_CHUNK_CONTENT_DEDUPE_TTL_MS = 1_000 const MAX_PTY_CHUNK_DEDUPE_ENTRIES = 2_000 +// Throttle malformed-event warnings per `kind` so a misbehaving broker stream +// can't flood the terminal (AGENTS.md low-noise telemetry doctrine). +const MALFORMED_BROKER_EVENT_WARN_THROTTLE_MS = 60_000 // After this many consecutive failures to open a PTY input stream, give up on // the WS fast path for that agent briefly and send over HTTP while it cools down. const MAX_INPUT_STREAM_OPEN_FAILURES = 3 @@ -1475,6 +1479,10 @@ export class BrokerManager { private eventHistory: BrokerEventRecord[] = [] private recentPtyChunks = new Map() private eventSerial = 0 + // Ingress-validation telemetry: last warn time per malformed `kind` + // (throttled) and the set of unknown `kind`s already logged (once each). + private malformedEventWarnedAt = new Map() + private warnedUnknownEventKinds = new Set() get cwd(): string | null { return this.sessions.values().next().value?.cwd || null @@ -2386,7 +2394,10 @@ export class BrokerManager { // dedicated channel so typing latency doesn't pay for compactBrokerEvent, // the broker:event metadata spread, or pushing into eventHistory per // character. Activity bookkeeping (rememberAgentSession + cloud sandbox - // observers) still runs. + // observers) still runs. The inline string checks below are the cheap, + // per-keystroke validation for worker_stream chunks — a chunk that fails + // them falls through to full zod validation at the general boundary + // below, so we never pay a discriminated-union parse per character. if ( event.kind === 'worker_stream' && 'name' in event && typeof event.name === 'string' && @@ -2408,7 +2419,14 @@ export class BrokerManager { return } - this.publishBrokerEvent(sessionKey, projectId, win, event as unknown as BrokerEventRecordPayload) + // General ingress: validate ONCE with zod. Malformed known events are + // dropped (the stream keeps running); unknown kinds are forwarded + // unchanged for forward-compat. `forwarded` is the typed payload we + // publish, which removes the `as unknown as` cast at the publish call. + const forwarded = this.validateIngressBrokerEvent(event) + if (!forwarded) return + + this.publishBrokerEvent(sessionKey, projectId, win, forwarded) if (event.kind === 'agent_spawned' && event.name) { this.rememberAgentSession(event.name, sessionKey) @@ -2506,6 +2524,44 @@ export class BrokerManager { return false } + /** + * Validate a raw broker event at ingress. Returns the typed payload to + * forward, or `null` when the event is malformed and should be dropped. + * + * - Malformed known events: dropped, with a per-`kind` throttled warning. + * - Unknown `kind`s: forwarded unchanged, logged once per `kind` (forward + * compat — a future SDK minor may add kinds this build doesn't model). + * - Valid events: forwarded as the parsed (passthrough) payload. + */ + private validateIngressBrokerEvent(event: BrokerEvent): BrokerEventRecordPayload | null { + const result = classifyBrokerEvent(event) + if (result.status === 'malformed') { + this.warnMalformedBrokerEvent(result.kind, result.reason) + return null + } + if (result.status === 'unknown') { + this.noteUnknownBrokerEventKind(result.kind) + } + return result.event + } + + private warnMalformedBrokerEvent(kind: string | undefined, reason: string): void { + const key = kind || '' + const now = Date.now() + const lastWarnedAt = this.malformedEventWarnedAt.get(key) + if (lastWarnedAt !== undefined && now - lastWarnedAt < MALFORMED_BROKER_EVENT_WARN_THROTTLE_MS) { + return + } + this.malformedEventWarnedAt.set(key, now) + console.warn('[broker] Dropped malformed broker event:', { kind: key, reason }) + } + + private noteUnknownBrokerEventKind(kind: string): void { + if (this.warnedUnknownEventKinds.has(kind)) return + this.warnedUnknownEventKinds.add(kind) + console.warn('[broker] Forwarding unrecognized broker event kind (forward-compat):', { kind }) + } + private publishBrokerEvent( sessionKey: string, projectId: string, diff --git a/src/renderer/src/stores/agent-store.ts b/src/renderer/src/stores/agent-store.ts index 19c27856..0a4e7efa 100644 --- a/src/renderer/src/stores/agent-store.ts +++ b/src/renderer/src/stores/agent-store.ts @@ -815,33 +815,37 @@ export const useAgentStore = create()(subscribeWithSelector((set, ge }, handleBrokerEvent: (event) => { - const { kind } = event + // Destructure the discriminator and the two fields the guards key off. + // Because `name`/`parent` are `const`, a `&& name` guard narrows them to + // `string` for the whole branch — including the `set(...)` closures — so we + // no longer need non-null assertions on event fields inside those closures. + const { kind, name, parent } = event - if (kind === 'agent_spawned' && event.name) { + if (kind === 'agent_spawned' && name) { set((state) => { - const parentAgent = event.parent - ? state.agents.find((agent) => matchesAgent(agent, event.projectId, event.parent!)) + const parentAgent = parent + ? state.agents.find((agent) => matchesAgent(agent, event.projectId, parent)) : undefined const { brokerProjectId, activeProjectId } = useProjectStore.getState() const projectId = event.projectId || parentAgent?.projectId || activeProjectId || brokerProjectId || undefined const rootId = parentAgent?.rootId const rootPath = parentAgent?.rootPath - const agentKey = getAgentKey(projectId, event.name!) + const agentKey = getAgentKey(projectId, name) const currentState: AgentCurrentState = 'idle' const channels = normalizeChannelList(event.channels) - const existingAgent = state.agents.find((a) => matchesAgent(a, projectId, event.name!)) + const existingAgent = state.agents.find((a) => matchesAgent(a, projectId, name)) const existingChannels = existingAgent?.channels || [] const notices = channels ? channels .filter((channel) => !existingChannels.includes(channel)) - .map((channel) => createChannelJoinNotice(projectId, channel, event.name!)) + .map((channel) => createChannelJoinNotice(projectId, channel, name)) .filter((notice): notice is ChatMessage => notice !== null) : [] return { - agents: state.agents.some((a) => matchesAgent(a, projectId, event.name!)) + agents: state.agents.some((a) => matchesAgent(a, projectId, name)) ? state.agents.map((a) => - matchesAgent(a, projectId, event.name!) + matchesAgent(a, projectId, name) ? { ...a, cli: event.cli || a.cli, @@ -861,7 +865,7 @@ export const useAgentStore = create()(subscribeWithSelector((set, ge : [ ...state.agents, { - name: event.name!, + name, cli: event.cli || 'unknown', model: event.model, status: 'running', @@ -880,7 +884,7 @@ export const useAgentStore = create()(subscribeWithSelector((set, ge messages: notices.length > 0 ? appendJoinNotices(state.messages, notices) : state.messages } }) - } else if ((kind === 'channel_subscribed' || kind === 'channel_unsubscribed') && event.name) { + } else if ((kind === 'channel_subscribed' || kind === 'channel_unsubscribed') && name) { const channels = normalizeChannelList(event.channels) if (!channels || channels.length === 0) return @@ -889,7 +893,7 @@ export const useAgentStore = create()(subscribeWithSelector((set, ge let matchedAgent = false const agents = state.agents.map((agent) => { - if (!matchesAgent(agent, event.projectId, event.name!)) return agent + if (!matchesAgent(agent, event.projectId, name)) return agent matchedAgent = true const projectChannels = useProjectStore.getState().projects @@ -904,7 +908,7 @@ export const useAgentStore = create()(subscribeWithSelector((set, ge : currentChannels.filter((channel) => !channels.includes(channel)) for (const channel of joinedChannels) { - const notice = createChannelJoinNotice(event.projectId || agent.projectId, channel, event.name!) + const notice = createChannelJoinNotice(event.projectId || agent.projectId, channel, name) if (notice) notices.push(notice) } @@ -913,7 +917,7 @@ export const useAgentStore = create()(subscribeWithSelector((set, ge if (!matchedAgent && kind === 'channel_subscribed') { for (const channel of channels) { - const notice = createChannelJoinNotice(event.projectId, channel, event.name!) + const notice = createChannelJoinNotice(event.projectId, channel, name) if (notice) notices.push(notice) } } @@ -923,12 +927,12 @@ export const useAgentStore = create()(subscribeWithSelector((set, ge messages: notices.length > 0 ? appendJoinNotices(state.messages, notices) : state.messages } }) - } else if ((kind === 'agent_exited' || kind === 'agent_released') && event.name) { - const removedKeyForExpiry = getAgentKey(event.projectId, event.name!) + } else if ((kind === 'agent_exited' || kind === 'agent_released') && name) { + const removedKeyForExpiry = getAgentKey(event.projectId, name) set((state) => { - const removed = state.agents.find((a) => matchesAgent(a, event.projectId, event.name!)) + const removed = state.agents.find((a) => matchesAgent(a, event.projectId, name)) const removedKey = removed ? getAgentKeyForAgent(removed) : removedKeyForExpiry - const remaining = state.agents.filter((a) => !matchesAgent(a, event.projectId, event.name!)) + const remaining = state.agents.filter((a) => !matchesAgent(a, event.projectId, name)) const needNewActive = state.activeAgentKey === removedKey const nextActiveAgent = remaining.find((a) => a.status === 'running') || remaining[0] useTypingStore.getState().clear(removedKey) @@ -945,10 +949,10 @@ export const useAgentStore = create()(subscribeWithSelector((set, ge } else if (kind === 'worker_stream') { // worker_stream is delivered out-of-band via broker:pty-chunk for typing // latency reasons; nothing to do here. - } else if (kind === 'delivery_queued' && event.name) { + } else if (kind === 'delivery_queued' && name) { set((state) => ({ agents: state.agents.map((a) => - matchesAgent(a, event.projectId, event.name!) + matchesAgent(a, event.projectId, name) ? { ...addPendingDelivery(a, event.event_id), activity: a.terminalMode === 'drive' ? a.activity : 'active', @@ -957,11 +961,11 @@ export const useAgentStore = create()(subscribeWithSelector((set, ge : a ) })) - } else if (kind === 'delivery_active' && event.name) { - useTypingStore.getState().noteActivity(getAgentKey(event.projectId, event.name)) + } else if (kind === 'delivery_active' && name) { + useTypingStore.getState().noteActivity(getAgentKey(event.projectId, name)) set((state) => ({ agents: state.agents.map((a) => { - if (!matchesAgent(a, event.projectId, event.name!)) return a + if (!matchesAgent(a, event.projectId, name)) return a return { ...addPendingDelivery(a, event.event_id), activity: 'active', @@ -971,15 +975,15 @@ export const useAgentStore = create()(subscribeWithSelector((set, ge })) } else if ( ['delivery_injected', 'delivery_verified', 'delivery_ack', 'delivery_failed', 'message_delivery_confirmed', 'message_delivery_failed'].includes(kind) && - event.name + name ) { const startsActivity = ['delivery_injected', 'delivery_verified', 'message_delivery_confirmed'].includes(kind) if (startsActivity) { - useTypingStore.getState().noteActivity(getAgentKey(event.projectId, event.name)) + useTypingStore.getState().noteActivity(getAgentKey(event.projectId, name)) } set((state) => ({ agents: state.agents.map((a) => { - if (!matchesAgent(a, event.projectId, event.name!)) return a + if (!matchesAgent(a, event.projectId, name)) return a const nextAgent = clearPendingDeliveries(a, event.event_id) if (!startsActivity) { return nextAgent @@ -987,16 +991,16 @@ export const useAgentStore = create()(subscribeWithSelector((set, ge return { ...nextAgent, activity: 'active', currentState: 'working' } }) })) - } else if (kind === 'agent_pending_drained' && event.name) { + } else if (kind === 'agent_pending_drained' && name) { set((state) => ({ agents: state.agents.map((a) => - matchesAgent(a, event.projectId, event.name!) ? clearPendingDeliveries(a) : a + matchesAgent(a, event.projectId, name) ? clearPendingDeliveries(a) : a ) })) - } else if (kind === 'agent_inbound_delivery_mode_changed' && event.name) { + } else if (kind === 'agent_inbound_delivery_mode_changed' && name) { set((state) => ({ agents: state.agents.map((a) => { - if (!matchesAgent(a, event.projectId, event.name!)) return a + if (!matchesAgent(a, event.projectId, name)) return a if (event.mode === 'manual_flush') { return { ...a, terminalMode: 'drive' } } @@ -1059,18 +1063,18 @@ export const useAgentStore = create()(subscribeWithSelector((set, ge relayMessages: capByCount([...state.relayMessages, relay], MAX_RELAY_MESSAGES) } }) - } else if (kind === 'agent_blocked_on_send' && event.name) { + } else if (kind === 'agent_blocked_on_send' && name) { set((state) => ({ agents: state.agents.map((a) => { - if (!matchesAgent(a, event.projectId, event.name!)) return a + if (!matchesAgent(a, event.projectId, name)) return a useTypingStore.getState().clear(getAgentKeyForAgent(a)) return { ...a, activity: 'active', currentState: 'blocked_on_send' } }) })) - } else if (kind === 'agent_idle' && event.name) { + } else if (kind === 'agent_idle' && name) { set((state) => ({ agents: state.agents.map((a) => { - if (!matchesAgent(a, event.projectId, event.name!)) return a + if (!matchesAgent(a, event.projectId, name)) return a useTypingStore.getState().clear(getAgentKeyForAgent(a)) return { ...clearPendingDeliveries(a), activity: 'idle', currentState: 'idle' } }) diff --git a/src/shared/schemas/broker-events.ts b/src/shared/schemas/broker-events.ts new file mode 100644 index 00000000..95629bee --- /dev/null +++ b/src/shared/schemas/broker-events.ts @@ -0,0 +1,432 @@ +import { z } from 'zod' + +/** + * Zod schemas for the broker events the app ingests from + * `@agent-relay/harness-driver` (`HarnessDriverClient.onEvent`). These mirror + * the SDK's `BrokerEvent` discriminated union (keyed on `kind`) and are used to + * validate events ONCE at main-process ingress (see `BrokerManager`). + * + * Design notes: + * + * - **`kind` is the discriminator.** The SDK union — and every consumer in + * `broker.ts` / `agent-store.ts` — switches on `kind`, not `name`. + * + * - **Every payload uses `.passthrough()`.** The SDK adds fields between minor + * versions, and the duplicate-suppression logic in `broker.ts` reads some + * fields dynamically (`seq`, `event_id`, `id`, `chunk`). Stripping unknown + * keys would silently break dedupe and drop forward-compatible data, so we + * keep everything and only validate the fields the SDK declares. + * + * - **Enum-ish fields stay loose `z.string()`** (runtime, provider, delivery + * mode). A new enum value shipped in an SDK minor must not cause an otherwise + * valid event to be dropped. + * + * - **Required fields mirror the SDK contract.** A known event missing a field + * the SDK guarantees is genuinely malformed; ingress drops it (throttled + * warning). Unknown `kind`s are forwarded unchanged for forward-compat — see + * `classifyBrokerEvent`. + */ + +const agentSpawned = z + .object({ + kind: z.literal('agent_spawned'), + name: z.string(), + runtime: z.string(), + provider: z.string().optional(), + cli: z.string().optional(), + model: z.string().optional(), + sessionId: z.string().optional(), + parent: z.string().optional(), + pid: z.number().optional(), + source: z.string().optional() + }) + .passthrough() + +const agentReleased = z + .object({ + kind: z.literal('agent_released'), + name: z.string() + }) + .passthrough() + +const agentExit = z + .object({ + kind: z.literal('agent_exit'), + name: z.string(), + reason: z.string() + }) + .passthrough() + +const agentExited = z + .object({ + kind: z.literal('agent_exited'), + name: z.string(), + code: z.number().optional(), + signal: z.string().optional(), + reason: z.string().optional() + }) + .passthrough() + +const agentContextLow = z + .object({ + kind: z.literal('agent_context_low'), + name: z.string(), + pct: z.number() + }) + .passthrough() + +const relayInbound = z + .object({ + kind: z.literal('relay_inbound'), + event_id: z.string(), + from: z.string(), + target: z.string(), + body: z.string(), + thread_id: z.string().optional(), + mode: z.string().optional(), + injection_mode: z.string().optional() + }) + .passthrough() + +const workerStream = z + .object({ + kind: z.literal('worker_stream'), + name: z.string(), + stream: z.string(), + chunk: z.string() + }) + .passthrough() + +const deliveryRetry = z + .object({ + kind: z.literal('delivery_retry'), + name: z.string(), + delivery_id: z.string(), + event_id: z.string(), + attempts: z.number() + }) + .passthrough() + +const deliveryDropped = z + .object({ + kind: z.literal('delivery_dropped'), + name: z.string(), + count: z.number(), + reason: z.string() + }) + .passthrough() + +const deliveryQueued = z + .object({ + kind: z.literal('delivery_queued'), + name: z.string(), + delivery_id: z.string(), + event_id: z.string(), + timestamp: z.unknown().optional() + }) + .passthrough() + +const agentPendingDrained = z + .object({ + kind: z.literal('agent_pending_drained'), + name: z.string(), + count: z.number(), + reason: z.string().optional() + }) + .passthrough() + +const agentInboundDeliveryModeChanged = z + .object({ + kind: z.literal('agent_inbound_delivery_mode_changed'), + name: z.string(), + previous_mode: z.string(), + mode: z.string() + }) + .passthrough() + +const deliveryInjected = z + .object({ + kind: z.literal('delivery_injected'), + name: z.string(), + delivery_id: z.string(), + event_id: z.string(), + timestamp: z.unknown().optional() + }) + .passthrough() + +const deliveryVerified = z + .object({ + kind: z.literal('delivery_verified'), + name: z.string(), + delivery_id: z.string(), + event_id: z.string() + }) + .passthrough() + +const deliveryFailed = z + .object({ + kind: z.literal('delivery_failed'), + name: z.string(), + delivery_id: z.string(), + event_id: z.string(), + reason: z.string() + }) + .passthrough() + +const messageDeliveryConfirmed = z + .object({ + kind: z.literal('message_delivery_confirmed'), + name: z.string(), + delivery_id: z.string(), + event_id: z.string(), + from: z.string(), + to: z.string() + }) + .passthrough() + +const messageDeliveryFailed = z + .object({ + kind: z.literal('message_delivery_failed'), + name: z.string(), + delivery_id: z.string().optional(), + event_id: z.string().optional(), + from: z.string(), + to: z.string(), + attempts: z.number(), + lastError: z.string() + }) + .passthrough() + +const deliveryActive = z + .object({ + kind: z.literal('delivery_active'), + name: z.string(), + delivery_id: z.string(), + event_id: z.string() + }) + .passthrough() + +const deliveryAck = z + .object({ + kind: z.literal('delivery_ack'), + name: z.string(), + delivery_id: z.string(), + event_id: z.string() + }) + .passthrough() + +const channelSubscribed = z + .object({ + kind: z.literal('channel_subscribed'), + name: z.string(), + channels: z.array(z.string()) + }) + .passthrough() + +const channelUnsubscribed = z + .object({ + kind: z.literal('channel_unsubscribed'), + name: z.string(), + channels: z.array(z.string()) + }) + .passthrough() + +const workerReady = z + .object({ + kind: z.literal('worker_ready'), + name: z.string(), + runtime: z.string(), + provider: z.string().optional(), + cli: z.string().optional(), + model: z.string().optional(), + sessionId: z.string().optional(), + pid: z.number().optional() + }) + .passthrough() + +const workerError = z + .object({ + kind: z.literal('worker_error'), + name: z.string(), + code: z.string(), + message: z.string() + }) + .passthrough() + +const relaycastPublished = z + .object({ + kind: z.literal('relaycast_published'), + event_id: z.string(), + to: z.string(), + target_type: z.string() + }) + .passthrough() + +const relaycastPublishFailed = z + .object({ + kind: z.literal('relaycast_publish_failed'), + event_id: z.string(), + to: z.string(), + reason: z.string() + }) + .passthrough() + +const aclDenied = z + .object({ + kind: z.literal('acl_denied'), + name: z.string(), + sender: z.string(), + owner_chain: z.array(z.string()) + }) + .passthrough() + +const agentIdle = z + .object({ + kind: z.literal('agent_idle'), + name: z.string(), + idle_secs: z.number(), + since: z.string().optional() + }) + .passthrough() + +const agentResult = z + .object({ + kind: z.literal('agent_result'), + name: z.string(), + result_id: z.string(), + data: z.unknown(), + final: z.boolean(), + metadata: z.unknown().optional() + }) + .passthrough() + +const agentBlockedOnSend = z + .object({ + kind: z.literal('agent_blocked_on_send'), + name: z.string(), + blocked_secs: z.number(), + pending_delivery_count: z.number() + }) + .passthrough() + +const agentRestarting = z + .object({ + kind: z.literal('agent_restarting'), + name: z.string(), + code: z.number().optional(), + signal: z.string().optional(), + restart_count: z.number(), + delay_ms: z.number() + }) + .passthrough() + +const agentRestarted = z + .object({ + kind: z.literal('agent_restarted'), + name: z.string(), + restart_count: z.number() + }) + .passthrough() + +const agentPermanentlyDead = z + .object({ + kind: z.literal('agent_permanently_dead'), + name: z.string(), + reason: z.string() + }) + .passthrough() + +/** + * Discriminated union of every broker event `kind` the SDK emits and the app + * forwards. Membership defines the "known" set — see `classifyBrokerEvent`. + */ +export const BrokerEventSchema = z.discriminatedUnion('kind', [ + agentSpawned, + agentReleased, + agentExit, + agentExited, + agentContextLow, + relayInbound, + workerStream, + deliveryRetry, + deliveryDropped, + deliveryQueued, + agentPendingDrained, + agentInboundDeliveryModeChanged, + deliveryInjected, + deliveryVerified, + deliveryFailed, + messageDeliveryConfirmed, + messageDeliveryFailed, + deliveryActive, + deliveryAck, + channelSubscribed, + channelUnsubscribed, + workerReady, + workerError, + relaycastPublished, + relaycastPublishFailed, + aclDenied, + agentIdle, + agentResult, + agentBlockedOnSend, + agentRestarting, + agentRestarted, + agentPermanentlyDead +]) + +export type ValidatedBrokerEvent = z.infer + +/** + * The shape every forwarded broker event satisfies: an object carrying a + * string `kind`. Both validated and (kind-only-validated) unknown events are + * assignable to this, so the main-process ingress can forward them without any + * `as unknown as` casts. + */ +export type BrokerEventPayload = Record & { kind: string } + +/** The set of event `kind`s the schema recognizes. */ +export const KNOWN_BROKER_EVENT_KINDS: ReadonlySet = new Set( + BrokerEventSchema.options.map((option) => option.shape.kind.value) +) + +export type BrokerEventClassification = + /** A recognized `kind` whose payload matched the schema. */ + | { status: 'valid'; kind: string; event: ValidatedBrokerEvent } + /** A `kind` the schema doesn't know — forward unchanged (forward-compat). */ + | { status: 'unknown'; kind: string; event: BrokerEventPayload } + /** A recognized `kind` (or no usable `kind`) whose payload failed validation. */ + | { status: 'malformed'; kind?: string; reason: string } + +function isRecord(value: unknown): value is Record { + return !!value && typeof value === 'object' && !Array.isArray(value) +} + +function describeIssue(error: z.ZodError): string { + const issue = error.issues[0] + if (!issue) return 'invalid event shape' + const path = issue.path.join('.') + return path ? `${path}: ${issue.message}` : issue.message +} + +/** + * Classify a raw broker event at ingress: + * - no usable `kind` → `malformed` + * - unknown `kind` → `unknown` (caller forwards it unchanged) + * - known `kind`, payload OK → `valid` + * - known `kind`, payload bad → `malformed` (caller drops it) + */ +export function classifyBrokerEvent(raw: unknown): BrokerEventClassification { + if (!isRecord(raw) || typeof raw.kind !== 'string' || !raw.kind) { + return { status: 'malformed', reason: 'missing event kind' } + } + const kind = raw.kind + if (!KNOWN_BROKER_EVENT_KINDS.has(kind)) { + return { status: 'unknown', kind, event: raw as BrokerEventPayload } + } + const parsed = BrokerEventSchema.safeParse(raw) + if (parsed.success) { + return { status: 'valid', kind, event: parsed.data } + } + return { status: 'malformed', kind, reason: describeIssue(parsed.error) } +} From f3dbdf3aa7940bfcee0d12b64e9af1ed0682e1ff Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Wed, 10 Jun 2026 10:21:36 -0400 Subject: [PATCH 2/3] Report all zod issues when logging a dropped event Gemini review: describeIssue only surfaced the first issue, hiding multi-field payload problems from the malformed-event telemetry. Co-Authored-By: Claude Fable 5 --- src/shared/schemas/broker-events.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/shared/schemas/broker-events.ts b/src/shared/schemas/broker-events.ts index 95629bee..d59ad4d3 100644 --- a/src/shared/schemas/broker-events.ts +++ b/src/shared/schemas/broker-events.ts @@ -403,10 +403,10 @@ function isRecord(value: unknown): value is Record { } function describeIssue(error: z.ZodError): string { - const issue = error.issues[0] - if (!issue) return 'invalid event shape' - const path = issue.path.join('.') - return path ? `${path}: ${issue.message}` : issue.message + if (error.issues.length === 0) return 'invalid event shape' + return error.issues + .map((issue) => (issue.path.length ? `${issue.path.join('.')}: ${issue.message}` : issue.message)) + .join('; ') } /** From bb6b8ca7b2caf4d2f5c4cd71d86581f243d103a4 Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Wed, 10 Jun 2026 10:28:24 -0400 Subject: [PATCH 3/3] Make delivery_id optional on delivery events per Codex review The app's delivery logic (isDeliveryEventForMessage) keys on event_id + name only; requiring delivery_id was stricter than any consumer and would silently drop confirmations from brokers that omit it, leaving pending-delivery UI state stuck. Regression test covers all delivery kinds without delivery_id. Co-Authored-By: Claude Fable 5 --- src/main/broker.test.ts | 17 +++++++++++++++++ src/shared/schemas/broker-events.ts | 12 ++++++------ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/main/broker.test.ts b/src/main/broker.test.ts index e4858a0a..1a6c7e1b 100644 --- a/src/main/broker.test.ts +++ b/src/main/broker.test.ts @@ -2056,6 +2056,23 @@ describe('classifyBrokerEvent', () => { } }) + it('accepts delivery events without delivery_id', () => { + // The app's delivery logic keys on event_id + name only + // (isDeliveryEventForMessage); requiring delivery_id would silently drop + // confirmations from brokers that omit it and leave pending-delivery UI + // state stuck. + for (const kind of ['delivery_injected', 'delivery_verified', 'delivery_ack', 'delivery_active']) { + const result = classifyBrokerEvent({ kind, name: 'claude-1', event_id: 'evt-9' }) + expect(result.status).toBe('valid') + } + expect(classifyBrokerEvent({ + kind: 'delivery_failed', name: 'claude-1', event_id: 'evt-9', reason: 'timeout' + }).status).toBe('valid') + expect(classifyBrokerEvent({ + kind: 'message_delivery_confirmed', name: 'claude-1', event_id: 'evt-9', from: 'a', to: 'b' + }).status).toBe('valid') + }) + it('treats an unknown kind as forwardable, not malformed', () => { const result = classifyBrokerEvent({ kind: 'brand_new_kind', name: 'x' }) expect(result.status).toBe('unknown') diff --git a/src/shared/schemas/broker-events.ts b/src/shared/schemas/broker-events.ts index d59ad4d3..7968c042 100644 --- a/src/shared/schemas/broker-events.ts +++ b/src/shared/schemas/broker-events.ts @@ -148,7 +148,7 @@ const deliveryInjected = z .object({ kind: z.literal('delivery_injected'), name: z.string(), - delivery_id: z.string(), + delivery_id: z.string().optional(), event_id: z.string(), timestamp: z.unknown().optional() }) @@ -158,7 +158,7 @@ const deliveryVerified = z .object({ kind: z.literal('delivery_verified'), name: z.string(), - delivery_id: z.string(), + delivery_id: z.string().optional(), event_id: z.string() }) .passthrough() @@ -167,7 +167,7 @@ const deliveryFailed = z .object({ kind: z.literal('delivery_failed'), name: z.string(), - delivery_id: z.string(), + delivery_id: z.string().optional(), event_id: z.string(), reason: z.string() }) @@ -177,7 +177,7 @@ const messageDeliveryConfirmed = z .object({ kind: z.literal('message_delivery_confirmed'), name: z.string(), - delivery_id: z.string(), + delivery_id: z.string().optional(), event_id: z.string(), from: z.string(), to: z.string() @@ -201,7 +201,7 @@ const deliveryActive = z .object({ kind: z.literal('delivery_active'), name: z.string(), - delivery_id: z.string(), + delivery_id: z.string().optional(), event_id: z.string() }) .passthrough() @@ -210,7 +210,7 @@ const deliveryAck = z .object({ kind: z.literal('delivery_ack'), name: z.string(), - delivery_id: z.string(), + delivery_id: z.string().optional(), event_id: z.string() }) .passthrough()