Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 219 additions & 0 deletions src/main/broker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
}

class HarnessDriverClient {
static spawn = vi.fn(async (_options: unknown) => {

Check warning on line 122 in src/main/broker.test.ts

View workflow job for this annotation

GitHub Actions / checks

'_options' is defined but never used
const client = createMockClient(state.nextLocalAgents.splice(0))
state.spawnedClients.push(client)
return client
Expand Down Expand Up @@ -196,6 +196,10 @@
resolveAgentRelayMcpCommand,
resolveBundledBrokerBinary
} from './broker'
import {
classifyBrokerEvent,
KNOWN_BROKER_EVENT_KINDS
} from '../shared/schemas/broker-events'

const PROJECT_ID = 'project-1'
const originalMcpCommand = process.env.AGENT_RELAY_MCP_COMMAND
Expand Down Expand Up @@ -1884,6 +1888,221 @@
})
})

function brokerEventSends(win: BrowserWindow): unknown[] {
return (win.webContents.send as ReturnType<typeof vi.fn>).mock.calls
.filter(([channel]) => channel === 'broker:event')
.map(([, payload]) => payload)
}

describe('BrokerManager broker event ingress validation', () => {
it('drops a malformed known event, logs once, and keeps the stream alive', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined)
const manager = new BrokerManager()
const win = createMockWindow()
const local = await startLocalWithWindow(manager, win)
const listener = local.onEvent.mock.calls.at(-1)?.[0]
expect(listener).toBeTypeOf('function')

// relay_inbound is a known kind, but `body` is required by the schema.
listener?.({
kind: 'relay_inbound',
from: 'codex-2',
target: '#general',
event_id: 'evt-malformed'
})

// The malformed event must not be forwarded to the renderer.
expect(
brokerEventSends(win).some(
(payload) => (payload as { event_id?: string }).event_id === 'evt-malformed'
)
).toBe(false)
expect(warnSpy).toHaveBeenCalledWith(
'[broker] Dropped malformed broker event:',
expect.objectContaining({ kind: 'relay_inbound' })
)

// A subsequent valid event still flows — the stream is not torn down.
listener?.({
kind: 'relay_inbound',
from: 'codex-2',
target: '#general',
body: 'still alive',
event_id: 'evt-valid'
})
expect(win.webContents.send).toHaveBeenCalledWith(
'broker:event',
expect.objectContaining({ kind: 'relay_inbound', body: 'still alive' })
)

warnSpy.mockRestore()
await manager.shutdown()
})

it('throttles repeated malformed warnings for the same kind', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined)
const manager = new BrokerManager()
const win = createMockWindow()
const local = await startLocalWithWindow(manager, win)
const listener = local.onEvent.mock.calls.at(-1)?.[0]

for (let i = 0; i < 3; i += 1) {
listener?.({ kind: 'agent_idle', name: 'claude-1' }) // missing idle_secs
}

const idleWarnings = warnSpy.mock.calls.filter(
([message, detail]) =>
message === '[broker] Dropped malformed broker event:' &&
(detail as { kind?: string }).kind === 'agent_idle'
)
expect(idleWarnings).toHaveLength(1)

warnSpy.mockRestore()
await manager.shutdown()
})

it('forwards unknown event kinds unchanged and logs once per kind', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined)
const manager = new BrokerManager()
const win = createMockWindow()
const local = await startLocalWithWindow(manager, win)
const listener = local.onEvent.mock.calls.at(-1)?.[0]

listener?.({ kind: 'future_event_kind', name: 'claude-1', detail: 'one' })
listener?.({ kind: 'future_event_kind', name: 'claude-1', detail: 'two' })

// Both unknown events are forwarded (not dropped) — forward-compat.
const forwarded = brokerEventSends(win).filter(
(payload) => (payload as { kind?: string }).kind === 'future_event_kind'
)
expect(forwarded).toHaveLength(2)
expect(forwarded[0]).toMatchObject({ detail: 'one' })

// But the unknown-kind telemetry warning fires only once for the kind.
const unknownWarnings = warnSpy.mock.calls.filter(
([message, detail]) =>
message === '[broker] Forwarding unrecognized broker event kind (forward-compat):' &&
(detail as { kind?: string }).kind === 'future_event_kind'
)
expect(unknownWarnings).toHaveLength(1)

warnSpy.mockRestore()
await manager.shutdown()
})

it('parses and forwards valid events of each major shape', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined)
const manager = new BrokerManager()
const win = createMockWindow()
const local = await startLocalWithWindow(manager, win)
const listener = local.onEvent.mock.calls.at(-1)?.[0]

listener?.({ kind: 'agent_spawned', name: 'claude-2', runtime: 'pty', cli: 'claude' })
listener?.({
kind: 'relay_inbound',
from: 'codex-2',
target: '#general',
body: 'hello',
event_id: 'evt-shape'
})
listener?.({ kind: 'agent_idle', name: 'claude-2', idle_secs: 5 })
listener?.({ kind: 'delivery_queued', name: 'claude-2', delivery_id: 'd1', event_id: 'e1' })
// worker_stream is delivered out-of-band on broker:pty-chunk, not broker:event.
listener?.({ kind: 'worker_stream', name: 'claude-2', stream: 'stdout', chunk: 'tick\n' })

const kinds = brokerEventSends(win).map((payload) => (payload as { kind?: string }).kind)
expect(kinds).toEqual(
expect.arrayContaining(['agent_spawned', 'relay_inbound', 'agent_idle', 'delivery_queued'])
)
const ptyCalls = (win.webContents.send as ReturnType<typeof vi.fn>).mock.calls
.filter(([channel]) => channel === 'broker:pty-chunk')
expect(ptyCalls).toEqual([['broker:pty-chunk', PROJECT_ID, 'claude-2', 'tick\n']])

// None of the valid shapes should have warned.
expect(warnSpy).not.toHaveBeenCalledWith(
'[broker] Dropped malformed broker event:',
expect.anything()
)

warnSpy.mockRestore()
await manager.shutdown()
})
})

describe('classifyBrokerEvent', () => {
it('classifies a valid known event and preserves passthrough fields', () => {
const result = classifyBrokerEvent({
kind: 'worker_stream',
name: 'claude-1',
stream: 'stdout',
chunk: 'hi',
// Extra fields the SDK may add / dedupe reads dynamically must survive.
seq: 42,
event_id: 'evt-1'
})
expect(result.status).toBe('valid')
if (result.status === 'valid') {
expect(result.kind).toBe('worker_stream')
expect(result.event).toMatchObject({ chunk: 'hi', seq: 42, event_id: 'evt-1' })
}
})

it('flags a known event with a wrong field type as malformed', () => {
const result = classifyBrokerEvent({ kind: 'worker_stream', name: 'claude-1', stream: 'stdout', chunk: 123 })
expect(result.status).toBe('malformed')
if (result.status === 'malformed') {
expect(result.kind).toBe('worker_stream')
expect(result.reason).toContain('chunk')
}
})

it('accepts delivery events without delivery_id', () => {
// The app's delivery logic keys on event_id + name only
// (isDeliveryEventForMessage); requiring delivery_id would silently drop
// confirmations from brokers that omit it and leave pending-delivery UI
// state stuck.
for (const kind of ['delivery_injected', 'delivery_verified', 'delivery_ack', 'delivery_active']) {
const result = classifyBrokerEvent({ kind, name: 'claude-1', event_id: 'evt-9' })
expect(result.status).toBe('valid')
}
expect(classifyBrokerEvent({
kind: 'delivery_failed', name: 'claude-1', event_id: 'evt-9', reason: 'timeout'
}).status).toBe('valid')
expect(classifyBrokerEvent({
kind: 'message_delivery_confirmed', name: 'claude-1', event_id: 'evt-9', from: 'a', to: 'b'
}).status).toBe('valid')
})

it('treats an unknown kind as forwardable, not malformed', () => {
const result = classifyBrokerEvent({ kind: 'brand_new_kind', name: 'x' })
expect(result.status).toBe('unknown')
if (result.status === 'unknown') {
expect(result.event).toMatchObject({ kind: 'brand_new_kind', name: 'x' })
}
})

it('treats a payload with no usable kind as malformed', () => {
expect(classifyBrokerEvent({ name: 'x' }).status).toBe('malformed')
expect(classifyBrokerEvent(null).status).toBe('malformed')
expect(classifyBrokerEvent({ kind: 42 }).status).toBe('malformed')
})

it('recognizes the broker event kinds the app consumes', () => {
for (const kind of [
'agent_spawned',
'agent_exited',
'relay_inbound',
'worker_stream',
'delivery_queued',
'channel_subscribed',
'agent_idle',
'agent_blocked_on_send'
]) {
expect(KNOWN_BROKER_EVENT_KINDS.has(kind)).toBe(true)
}
})
})

describe('BrokerManager spawnAgent CLI preflight', () => {
let tempDir: string | null = null

Expand Down
60 changes: 58 additions & 2 deletions src/main/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
type GeneratedCommitDraft
} from './schemas'
import { compactBrokerEvent, normalizeEventTimestamp } from '../shared/lib/broker-events'
import { classifyBrokerEvent } from '../shared/schemas/broker-events'
import type {
BrokerEventStreamDiagnostic,
BrokerReconciledChatMessage,
Expand Down Expand Up @@ -544,6 +545,9 @@
const PTY_CHUNK_IDENTITY_DEDUPE_TTL_MS = 60_000
const PTY_CHUNK_CONTENT_DEDUPE_TTL_MS = 1_000
const MAX_PTY_CHUNK_DEDUPE_ENTRIES = 2_000
// Throttle malformed-event warnings per `kind` so a misbehaving broker stream
// can't flood the terminal (AGENTS.md low-noise telemetry doctrine).
const MALFORMED_BROKER_EVENT_WARN_THROTTLE_MS = 60_000
// After this many consecutive failures to open a PTY input stream, give up on
// the WS fast path for that agent briefly and send over HTTP while it cools down.
const MAX_INPUT_STREAM_OPEN_FAILURES = 3
Expand Down Expand Up @@ -995,13 +999,13 @@
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error(`${label} timed out after ${BROKER_DETAILS_TIMEOUT_MS}ms`))
}, BROKER_DETAILS_TIMEOUT_MS)

Check warning on line 1002 in src/main/broker.ts

View workflow job for this annotation

GitHub Actions / checks

'getBrokerEventName' is defined but never used

promise.then(
(value) => {
clearTimeout(timer)
resolve(value)
},

Check warning on line 1008 in src/main/broker.ts

View workflow job for this annotation

GitHub Actions / checks

'getBrokerEventFrom' is defined but never used
(err) => {
clearTimeout(timer)
reject(err)
Expand Down Expand Up @@ -1115,7 +1119,7 @@
function parsePositiveIntegerEnv(name: string, fallback: number): number {
const value = Number.parseInt(process.env[name] || '', 10)
return Number.isFinite(value) && value > 0 ? value : fallback
}

Check warning on line 1122 in src/main/broker.ts

View workflow job for this annotation

GitHub Actions / checks

'resolveBrokerConnectionPath' is defined but never used

function compactCommitDiff(diff: string): string {
if (diff.length <= COMMIT_DRAFT_MAX_DIFF_CHARS) return diff
Expand Down Expand Up @@ -1475,6 +1479,10 @@
private eventHistory: BrokerEventRecord[] = []
private recentPtyChunks = new Map<string, number>()
private eventSerial = 0
// Ingress-validation telemetry: last warn time per malformed `kind`
// (throttled) and the set of unknown `kind`s already logged (once each).
private malformedEventWarnedAt = new Map<string, number>()
private warnedUnknownEventKinds = new Set<string>()

get cwd(): string | null {
return this.sessions.values().next().value?.cwd || null
Expand Down Expand Up @@ -1737,7 +1745,7 @@
return false
} finally {
if (this.revivePromises.get(projectId) === promise) {
this.revivePromises.delete(projectId)

Check warning on line 1748 in src/main/broker.ts

View workflow job for this annotation

GitHub Actions / checks

'promise' is never reassigned. Use 'const' instead
}
}
}
Expand Down Expand Up @@ -2386,7 +2394,10 @@
// dedicated channel so typing latency doesn't pay for compactBrokerEvent,
// the broker:event metadata spread, or pushing into eventHistory per
// character. Activity bookkeeping (rememberAgentSession + cloud sandbox
// observers) still runs.
// observers) still runs. The inline string checks below are the cheap,
// per-keystroke validation for worker_stream chunks — a chunk that fails
// them falls through to full zod validation at the general boundary
// below, so we never pay a discriminated-union parse per character.
if (
event.kind === 'worker_stream' &&
'name' in event && typeof event.name === 'string' &&
Expand All @@ -2408,7 +2419,14 @@
return
}

this.publishBrokerEvent(sessionKey, projectId, win, event as unknown as BrokerEventRecordPayload)
// General ingress: validate ONCE with zod. Malformed known events are
// dropped (the stream keeps running); unknown kinds are forwarded
// unchanged for forward-compat. `forwarded` is the typed payload we
// publish, which removes the `as unknown as` cast at the publish call.
const forwarded = this.validateIngressBrokerEvent(event)
if (!forwarded) return

this.publishBrokerEvent(sessionKey, projectId, win, forwarded)

if (event.kind === 'agent_spawned' && event.name) {
this.rememberAgentSession(event.name, sessionKey)
Expand Down Expand Up @@ -2506,6 +2524,44 @@
return false
}

/**
* Validate a raw broker event at ingress. Returns the typed payload to
* forward, or `null` when the event is malformed and should be dropped.
*
* - Malformed known events: dropped, with a per-`kind` throttled warning.
* - Unknown `kind`s: forwarded unchanged, logged once per `kind` (forward
* compat — a future SDK minor may add kinds this build doesn't model).
* - Valid events: forwarded as the parsed (passthrough) payload.
*/
private validateIngressBrokerEvent(event: BrokerEvent): BrokerEventRecordPayload | null {
const result = classifyBrokerEvent(event)
if (result.status === 'malformed') {
this.warnMalformedBrokerEvent(result.kind, result.reason)
return null
}
if (result.status === 'unknown') {
this.noteUnknownBrokerEventKind(result.kind)
}
return result.event
}

private warnMalformedBrokerEvent(kind: string | undefined, reason: string): void {
const key = kind || '<no-kind>'
const now = Date.now()
const lastWarnedAt = this.malformedEventWarnedAt.get(key)
if (lastWarnedAt !== undefined && now - lastWarnedAt < MALFORMED_BROKER_EVENT_WARN_THROTTLE_MS) {
return
}
this.malformedEventWarnedAt.set(key, now)
console.warn('[broker] Dropped malformed broker event:', { kind: key, reason })
}

private noteUnknownBrokerEventKind(kind: string): void {
if (this.warnedUnknownEventKinds.has(kind)) return
this.warnedUnknownEventKinds.add(kind)
console.warn('[broker] Forwarding unrecognized broker event kind (forward-compat):', { kind })
}

private publishBrokerEvent(
sessionKey: string,
projectId: string,
Expand Down Expand Up @@ -2723,7 +2779,7 @@
): Promise<{ name: string; runtime: string }> {
// `broker` selects which of the project's sessions the agent spawns on.
// Default: local-first via getSessionForProject (cloud only when no local
// broker is running, preserving the cloud-only flow).

Check warning on line 2782 in src/main/broker.ts

View workflow job for this annotation

GitHub Actions / checks

'promise' is never reassigned. Use 'const' instead
const { broker: requestedBroker, ...input } = spawnInput
const session = requestedBroker
? this.getSessionForBroker(projectId, requestedBroker)
Expand Down
Loading
Loading