diff --git a/packages/core/src/__tests__/workflow-reliability-contract.test.ts b/packages/core/src/__tests__/workflow-reliability-contract.test.ts index f341f22..56406e5 100644 --- a/packages/core/src/__tests__/workflow-reliability-contract.test.ts +++ b/packages/core/src/__tests__/workflow-reliability-contract.test.ts @@ -529,6 +529,49 @@ describe('workflow reliability contract', () => { } }); + it('retries transient agent-step network failures without consuming the step retry budget', async () => { + const executeAgentStep = vi + .fn() + .mockRejectedValueOnce(new Error('fetch failed')) + .mockResolvedValueOnce('artifact complete\nRICKY_MASTER_CHILD_RUN_VERIFIED'); + const events: WorkflowEvent[] = []; + const runner = new WorkflowRunner({ + db: makeDb(), + workspaceId: 'ws-test', + cwd: process.cwd(), + executor: { executeAgentStep }, + }); + runner.on((event) => events.push(event)); + + const run = await runner.execute( + baseConfig({ + errorHandling: { strategy: 'fail-fast' }, + workflows: [ + { + name: 'default', + steps: [ + { + name: 'write-artifact', + agent: 'fixer', + task: 'Write a structured workflow artifact.', + retries: 0, + verification: { + type: 'output_contains', + value: 'RICKY_MASTER_CHILD_RUN_VERIFIED', + }, + }, + ], + }, + ], + }), + 'default' + ); + + expect(run.status, run.error).toBe('completed'); + expect(executeAgentStep).toHaveBeenCalledTimes(2); + expect(events.some((event) => event.type === 'step:retrying')).toBe(false); + }); + it('repairs malformed agent artifacts before retrying the agent step', async () => { const executeAgentStep = vi.fn(async (step) => { if (step.name.includes('-repair-')) return 'patched artifact instructions'; diff --git a/packages/core/src/__tests__/workflow-runner.test.ts b/packages/core/src/__tests__/workflow-runner.test.ts index 0fedd94..f1b33ac 100644 --- a/packages/core/src/__tests__/workflow-runner.test.ts +++ b/packages/core/src/__tests__/workflow-runner.test.ts @@ -68,6 +68,7 @@ vi.mock('@relaycast/sdk', () => ({ let waitForExitFn: (ms?: number) => Promise<'exited' | 'timeout' | 'released'>; let waitForIdleFn: (ms?: number) => Promise<'idle' | 'timeout' | 'exited'>; let mockSpawnOutputs: string[] = []; +const mockHarnessDriverSpawn = vi.fn(async () => mockRelayInstance); // Spawned-agent handle shaped like harness-driver's SpawnedAgentHandle, but // driven by the test's waitForExitFn/waitForIdleFn. @@ -146,6 +147,7 @@ const mockRelayInstance = { eventListeners.add(cb); return () => eventListeners.delete(cb); }), + onBrokerExit: vi.fn(() => () => {}), connectEvents: vi.fn(), getStatus: vi.fn().mockResolvedValue({ status: 'running' }), listAgents: vi.fn().mockResolvedValue([]), @@ -161,7 +163,7 @@ vi.mock('@agent-relay/harness-driver', async (importOriginal) => { ...actual, HarnessDriverClient: { connect: vi.fn(() => mockRelayInstance), - spawn: vi.fn(async () => mockRelayInstance), + spawn: mockHarnessDriverSpawn, }, }; }); @@ -293,6 +295,7 @@ describe('WorkflowRunner', () => { waitForExitFn = vi.fn().mockResolvedValue('exited'); waitForIdleFn = vi.fn().mockImplementation(() => never()); mockSpawnOutputs = []; + mockHarnessDriverSpawn.mockImplementation(async () => mockRelayInstance); mockRelayInstance.spawnPty.mockImplementation(defaultSpawnPtyImplementation); eventListeners.clear(); db = makeDb(); @@ -575,6 +578,178 @@ agents: expect(startedSteps).toHaveLength(2); }); + it('restarts the broker when a PTY spawn fails with a transient transport error', async () => { + const tmpDir = mkdtempSync(path.join(os.tmpdir(), 'relayflows-broker-recovery-')); + const failingRelay = { + ...mockRelayInstance, + spawnPty: vi.fn(async () => { + const error = new Error('fetch failed'); + (error as Error & { retryable?: boolean }).retryable = true; + throw error; + }), + shutdown: vi.fn().mockResolvedValue(undefined), + }; + const recoveredRelay = { + ...mockRelayInstance, + spawnPty: vi.fn().mockImplementation(defaultSpawnPtyImplementation), + shutdown: vi.fn().mockResolvedValue(undefined), + }; + mockHarnessDriverSpawn + .mockImplementationOnce(async () => failingRelay as any) + .mockImplementationOnce(async () => recoveredRelay as any); + + try { + const isolatedRunner = new WorkflowRunner({ + db, + workspaceId: 'ws-test', + cwd: tmpDir, + relay: { + env: { + AGENT_RELAY_WORKFLOW_DISABLE_RELAYCAST: '1', + AGENT_RELAY_STATE_DIR: path.join(tmpDir, '.agentworkforce', 'relay'), + }, + }, + }); + + const run = await isolatedRunner.execute( + makeConfig({ + workflows: [{ name: 'default', steps: [{ name: 'step-1', agent: 'agent-a', task: 'Do step 1' }] }], + }), + 'default' + ); + + expect(run.status, run.error).toBe('completed'); + expect(mockHarnessDriverSpawn).toHaveBeenCalledTimes(2); + expect(failingRelay.shutdown).toHaveBeenCalledTimes(1); + expect(recoveredRelay.spawnPty).toHaveBeenCalledTimes(1); + } finally { + rmSync(tmpDir, { recursive: true, force: true }); + } + }); + + it('recovers when a broker exit clears the relay client before the next spawn', async () => { + const tmpDir = mkdtempSync(path.join(os.tmpdir(), 'relayflows-broker-exit-')); + const exitingRelay = { + ...mockRelayInstance, + brokerPid: 41, + onBrokerExit: vi.fn((cb: (info: { pid?: number; code?: number; signal?: string }) => void) => { + queueMicrotask(() => cb({ pid: 41, code: 1, signal: 'SIGTERM' })); + return () => {}; + }), + spawnPty: vi.fn().mockImplementation(defaultSpawnPtyImplementation), + shutdown: vi.fn().mockResolvedValue(undefined), + }; + const recoveredRelay = { + ...mockRelayInstance, + spawnPty: vi.fn().mockImplementation(defaultSpawnPtyImplementation), + shutdown: vi.fn().mockResolvedValue(undefined), + }; + mockHarnessDriverSpawn + .mockImplementationOnce(async () => exitingRelay as any) + .mockImplementationOnce(async () => recoveredRelay as any); + + try { + const isolatedRunner = new WorkflowRunner({ + db, + workspaceId: 'ws-test', + cwd: tmpDir, + relay: { + env: { + AGENT_RELAY_WORKFLOW_DISABLE_RELAYCAST: '1', + AGENT_RELAY_STATE_DIR: path.join(tmpDir, '.agentworkforce', 'relay'), + }, + }, + }); + + const run = await isolatedRunner.execute( + makeConfig({ + workflows: [{ name: 'default', steps: [{ name: 'step-1', agent: 'agent-a', task: 'Do step 1' }] }], + }), + 'default' + ); + + expect(run.status, run.error).toBe('completed'); + expect(mockHarnessDriverSpawn).toHaveBeenCalledTimes(2); + expect(exitingRelay.onBrokerExit).toHaveBeenCalledTimes(1); + expect(recoveredRelay.spawnPty).toHaveBeenCalledTimes(1); + } finally { + rmSync(tmpDir, { recursive: true, force: true }); + } + }); + + it('refuses broker recovery while workflow agents are still active', async () => { + const candidate = new WorkflowRunner({ db, workspaceId: 'ws-test' }); + (candidate as any).currentBrokerContext = { + runId: 'run-active', + brokerName: 'relayflows-run-active', + channel: 'wf-active', + relaycastDisabled: true, + }; + (candidate as any).activeAgentHandles.set('active-agent', { name: 'active-agent' }); + + await expect((candidate as any).recoverBroker('spawn failed')).rejects.toThrow( + 'Broker recovery is unsafe while 1 agent is still active: active-agent' + ); + expect(mockHarnessDriverSpawn).not.toHaveBeenCalled(); + }); + + it('releases the live PTY before replaying a transient same-attempt network failure', async () => { + const tmpDir = mkdtempSync(path.join(os.tmpdir(), 'relayflows-transient-replay-')); + const firstHandle = { + ...makeMockHandle('step-1-transient'), + release: vi.fn().mockResolvedValue({ name: 'step-1-transient' }), + }; + const secondHandle = { + ...makeMockHandle('step-1-transient'), + release: vi.fn().mockResolvedValue({ name: 'step-1-transient' }), + }; + let spawnCount = 0; + const replayRelay = { + ...mockRelayInstance, + spawnPty: vi.fn(async ({ name }: { name: string }) => { + spawnCount += 1; + const handle = spawnCount === 1 ? firstHandle : secondHandle; + if (spawnCount === 2) { + queueMicrotask(() => { + emitMockEvent('workerOutput', { name, chunk: 'STEP_COMPLETE:step-1\n' }); + }); + } + return handle as any; + }), + }; + mockHarnessDriverSpawn.mockImplementation(async () => replayRelay as any); + + try { + const isolatedRunner = new WorkflowRunner({ + db, + workspaceId: 'ws-test', + cwd: tmpDir, + relay: { + env: { + AGENT_RELAY_WORKFLOW_DISABLE_RELAYCAST: '1', + AGENT_RELAY_STATE_DIR: path.join(tmpDir, '.agentworkforce', 'relay'), + }, + }, + }); + vi.spyOn(isolatedRunner as any, 'waitForExitWithIdleNudging') + .mockRejectedValueOnce(Object.assign(new Error('fetch failed'), { retryable: true })) + .mockResolvedValueOnce('exited'); + + const run = await isolatedRunner.execute( + makeConfig({ + workflows: [{ name: 'default', steps: [{ name: 'step-1', agent: 'agent-a', task: 'Do step 1' }] }], + }), + 'default' + ); + + expect(run.status, run.error).toBe('completed'); + expect(replayRelay.spawnPty).toHaveBeenCalledTimes(2); + expect(firstHandle.release).toHaveBeenCalledTimes(1); + } finally { + rmSync(tmpDir, { recursive: true, force: true }); + } + }); + it('should emit owner assignment and review completion events for interactive steps', async () => { const events: Array<{ type: string; stepName?: string }> = []; runner.on((event) => diff --git a/packages/core/src/runner.ts b/packages/core/src/runner.ts index 9129519..bb31091 100644 --- a/packages/core/src/runner.ts +++ b/packages/core/src/runner.ts @@ -494,6 +494,10 @@ type DiagnosticVerificationCheck = VerificationCheck & { const DEFAULT_WORKFLOW_MAX_RETRIES = 2; const DEFAULT_WORKFLOW_REPAIR_RETRIES = 2; const DEFAULT_WORKFLOW_RETRY_DELAY_MS = 1000; +const BROKER_OPERATION_MAX_ATTEMPTS = 3; +const BROKER_OPERATION_RETRY_DELAY_MS = 1_000; +const AGENT_TRANSIENT_NETWORK_MAX_ATTEMPTS = 3; +const AGENT_TRANSIENT_NETWORK_RETRY_DELAY_MS = 1_000; interface ChannelEvidenceOptions { stepName?: string; @@ -504,6 +508,13 @@ interface ChannelEvidenceOptions { origin?: CompletionEvidenceChannelOrigin; } +interface BrokerRunContext { + runId: string; + brokerName: string; + channel: string; + relaycastDisabled: boolean; +} + // ── CLI resolution ─────────────────────────────────────────────────────────── /** @@ -548,6 +559,8 @@ export class WorkflowRunner { /** @internal exposed for CLI signal-handler shutdown only */ relay?: HarnessDriverClient; + private currentBrokerContext?: BrokerRunContext; + private brokerRecoveryPromise?: Promise; private relaycast?: RelayCast; private relaycastAgent?: AgentClient; private relayApiKey?: string; @@ -1694,6 +1707,248 @@ export class WorkflowRunner { console.log(`${chalk.dim.cyan('[workflow')} ${chalk.dim.cyan(ts)}${chalk.dim.cyan(']')} ${msg}`); } + private buildBrokerName(runId: string): string { + const brokerBaseName = path.basename(this.cwd) || 'workflow'; + return `${brokerBaseName}-${runId.slice(0, 8)}`; + } + + private createBrokerEventHandler(runId: string): (event: BrokerEvent) => void { + return (event: BrokerEvent) => { + // Re-emit every broker event except the high-volume PTY stream. + if (event.kind !== 'worker_stream') { + this.emit({ type: 'broker:event', runId, event }); + } + + switch (event.kind) { + case 'worker_stream': { + const { name, chunk } = event; + const listener = this.ptyListeners.get(name); + if (listener) listener(chunk); + + // Parse PTY output for high-signal activity + const stripped = WorkflowRunner.stripAnsi(chunk); + const shortName = name.replace(/-[a-f0-9]{6,}$/, ''); + let activity: string | undefined; + if (/Read\(/.test(stripped)) { + const m = stripped.match(/Read\(\s*~?([^\s)"']{8,})/); + if (m) { + const base = path.basename(m[1]); + activity = base.length >= 3 ? `Reading ${base}` : 'Reading file...'; + } else { + activity = 'Reading file...'; + } + } else if (/Edit\(/.test(stripped)) { + const m = stripped.match(/Edit\(\s*~?([^\s)"']{8,})/); + if (m) { + const base = path.basename(m[1]); + activity = base.length >= 3 ? `Editing ${base}` : 'Editing file...'; + } else { + activity = 'Editing file...'; + } + } else if (/Bash\(/.test(stripped)) { + const m = stripped.match(/Bash\(\s*(.{1,40})/); + activity = m ? `Running: ${m[1].trim()}...` : 'Running command...'; + } else if (/Explore\(/.test(stripped)) { + const m = stripped.match(/Explore\(\s*(.{1,50})/); + activity = m ? `Exploring: ${m[1].replace(/\).*/, '').trim()}` : 'Exploring codebase...'; + } else if (/Task\(/.test(stripped)) { + activity = 'Running sub-agent...'; + } else if (/Sublimating|Thinking|Coalescing|Cultivating/.test(stripped)) { + const m = stripped.match(/(\d+)s/); + activity = m ? `Thinking... (${m[1]}s)` : 'Thinking...'; + } + if (activity && this.lastActivity.get(name) !== activity) { + this.lastActivity.set(name, activity); + this.log(`[${shortName}] ${activity}`); + } + break; + } + + case 'relay_inbound': { + const from = event.from; + const to = event.target; + const text = event.body; + const body = text.length > 120 ? text.slice(0, 117) + '...' : text; + const fromShort = from.replace(/-[a-f0-9]{6,}$/, ''); + const toShort = to.replace(/-[a-f0-9]{6,}$/, ''); + this.log(`[msg] ${fromShort} → ${toShort}: ${body}`); + + if (this.channel && (to === this.channel || to === `#${this.channel}`)) { + const runtimeAgent = this.runtimeStepAgents.get(from); + this.recordChannelEvidence(text, { + sender: runtimeAgent?.logicalName ?? from, + actor: from, + role: runtimeAgent?.role, + target: to, + origin: 'relay_message', + stepName: runtimeAgent?.stepName, + }); + } + + const supervision = this.supervisedRuntimeAgents.get(from); + if (supervision?.role === 'owner') { + this.recordStepToolSideEffect(supervision.stepName, { + type: 'owner_monitoring', + detail: `Owner messaged ${to}: ${text.slice(0, 120)}`, + raw: { to, text }, + }); + void this.trajectory?.ownerMonitoringEvent( + supervision.stepName, + supervision.logicalName, + `Messaged ${to}: ${text.slice(0, 120)}`, + { to, text } + ); + } + break; + } + + case 'agent_spawned': { + if (!this.activeAgentHandles.has(event.name)) { + this.log(`[spawned] ${event.name} (${event.runtime})`); + } + break; + } + + case 'agent_exited': { + this.lastActivity.delete(event.name); + this.lastIdleLog.delete(event.name); + if (!this.activeAgentHandles.has(event.name)) { + this.log(`[exited] ${event.name} (code: ${event.code ?? '?'})`); + } + break; + } + + case 'agent_idle': { + const { name, idle_secs } = event; + const bucket = Math.floor(idle_secs / 30) * 30; + if (bucket >= 30 && this.lastIdleLog.get(name) !== bucket) { + this.lastIdleLog.set(name, bucket); + const shortName = name.replace(/-[a-f0-9]{6,}$/, ''); + this.log(`[idle] ${shortName} silent for ${bucket}s`); + } + break; + } + + default: + break; + } + }; + } + + private clearRelayListeners(): void { + for (const off of this.unsubRelayListeners) { + try { + off(); + } catch { + /* ignore */ + } + } + this.unsubRelayListeners = []; + } + + private wireRelayClient(runId: string): void { + if (!this.relay) return; + + this.clearRelayListeners(); + this.unsubRelayListeners.push(this.relay.onEvent(this.createBrokerEventHandler(runId))); + const unsubBrokerExit = this.relay.onBrokerExit?.((info) => { + if (this.relay?.brokerPid === info.pid) { + this.relay = undefined; + } + this.log( + `Broker exited (pid: ${info.pid ?? '?'}, code: ${info.code ?? '?'}, signal: ${info.signal ?? '?'})` + ); + }); + if (unsubBrokerExit) { + this.unsubRelayListeners.push(unsubBrokerExit); + } + this.relay.connectEvents(); + } + + private async startBroker(context: BrokerRunContext): Promise { + await this.startOrReuseSharedBroker(context.runId, context.channel, context.relaycastDisabled); + if (!this.relay) { + throw new Error('Broker client was not initialized'); + } + this.wireRelayClient(context.runId); + } + + private isRetryableProtocolError(error: unknown): boolean { + const candidate = error as { retryable?: unknown; status?: unknown; message?: unknown } | undefined; + if (candidate?.retryable === true) return true; + if (typeof candidate?.status === 'number' && candidate.status >= 500) return true; + const message = typeof candidate?.message === 'string' ? candidate.message : ''; + return /\b(fetch failed|econn|enotfound|eai_again|socket hang up|network|service unavailable|timed out)\b/i.test( + message + ); + } + + private isTransientAgentNetworkError(error: unknown): boolean { + const candidate = error as { retryable?: unknown; status?: unknown; message?: unknown } | undefined; + if (candidate?.retryable === true) return true; + if (typeof candidate?.status === 'number' && candidate.status >= 500) return true; + const message = error instanceof Error ? error.message : String(error); + return /\b(fetch failed|econn|enotfound|eai_again|socket hang up|network error|connection reset|connection refused|service unavailable)\b/i.test(message); + } + + private async recoverBroker(reason: string): Promise { + if (!this.currentBrokerContext) { + throw new Error(`Broker unavailable and no recovery context exists (${reason})`); + } + if (this.brokerRecoveryPromise) { + await this.brokerRecoveryPromise; + return; + } + if (this.activeAgentHandles.size > 0) { + const activeAgents = [...this.activeAgentHandles.keys()]; + throw new Error( + `Broker recovery is unsafe while ${activeAgents.length} agent${activeAgents.length === 1 ? ' is' : 's are'} still active: ${activeAgents.slice(0, 3).join(', ')}` + ); + } + + this.brokerRecoveryPromise = (async () => { + this.log(`Broker unavailable (${reason}); restarting...`); + this.clearRelayListeners(); + await this.shutdownRelay().catch(() => undefined); + await this.startBroker(this.currentBrokerContext!); + this.log('Broker restarted'); + })(); + + try { + await this.brokerRecoveryPromise; + } finally { + this.brokerRecoveryPromise = undefined; + } + } + + private async withBrokerRecovery(operation: string, work: (relay: HarnessDriverClient) => Promise): Promise { + let lastError: unknown; + for (let attempt = 1; attempt <= BROKER_OPERATION_MAX_ATTEMPTS; attempt++) { + const relay = this.relay; + if (!relay) { + lastError = new Error(`Broker unavailable while ${operation}`); + } else { + try { + return await work(relay); + } catch (error) { + lastError = error; + if (!this.isRetryableProtocolError(error)) { + throw error; + } + } + } + + if (attempt >= BROKER_OPERATION_MAX_ATTEMPTS) { + break; + } + await this.recoverBroker(`${operation} failed`); + await this.delay(BROKER_OPERATION_RETRY_DELAY_MS * attempt); + } + + const message = lastError instanceof Error ? lastError.message : String(lastError); + throw new Error(`Broker operation failed during ${operation}: ${message}`); + } + // ── Relaycast auto-provisioning ──────────────────────────────────────── /** @@ -3461,145 +3716,13 @@ export class WorkflowRunner { } } - await this.startOrReuseSharedBroker(runId, channel, relaycastDisabled); - const relay = this.relay; - if (!relay) { - throw new Error('Broker client was not initialized'); - } - - // Single dispatcher over the broker event stream. The named event bus - // (`addListener('workerOutput'|...)`) is not fed for direct clients, so - // broker events must be consumed via `onEvent` (the BrokerEvent stream). - this.unsubRelayListeners.push( - relay.onEvent((event: BrokerEvent) => { - // Re-emit every broker event except the high-volume PTY stream. - if (event.kind !== 'worker_stream') { - this.emit({ type: 'broker:event', runId, event }); - } - - switch (event.kind) { - case 'worker_stream': { - const { name, chunk } = event; - const listener = this.ptyListeners.get(name); - if (listener) listener(chunk); - - // Parse PTY output for high-signal activity - const stripped = WorkflowRunner.stripAnsi(chunk); - const shortName = name.replace(/-[a-f0-9]{6,}$/, ''); - let activity: string | undefined; - if (/Read\(/.test(stripped)) { - // Extract filename — path may be truncated at chunk boundary so require - // at least a dir separator or 8+ chars to trust the basename. - const m = stripped.match(/Read\(\s*~?([^\s)"']{8,})/); - if (m) { - const base = path.basename(m[1]); - activity = base.length >= 3 ? `Reading ${base}` : 'Reading file...'; - } else { - activity = 'Reading file...'; - } - } else if (/Edit\(/.test(stripped)) { - const m = stripped.match(/Edit\(\s*~?([^\s)"']{8,})/); - if (m) { - const base = path.basename(m[1]); - activity = base.length >= 3 ? `Editing ${base}` : 'Editing file...'; - } else { - activity = 'Editing file...'; - } - } else if (/Bash\(/.test(stripped)) { - // Extract a short preview of the command - const m = stripped.match(/Bash\(\s*(.{1,40})/); - activity = m ? `Running: ${m[1].trim()}...` : 'Running command...'; - } else if (/Explore\(/.test(stripped)) { - const m = stripped.match(/Explore\(\s*(.{1,50})/); - activity = m ? `Exploring: ${m[1].replace(/\).*/, '').trim()}` : 'Exploring codebase...'; - } else if (/Task\(/.test(stripped)) { - activity = 'Running sub-agent...'; - } else if (/Sublimating|Thinking|Coalescing|Cultivating/.test(stripped)) { - const m = stripped.match(/(\d+)s/); - activity = m ? `Thinking... (${m[1]}s)` : 'Thinking...'; - } - if (activity && this.lastActivity.get(name) !== activity) { - this.lastActivity.set(name, activity); - this.log(`[${shortName}] ${activity}`); - } - break; - } - - case 'relay_inbound': { - const from = event.from; - const to = event.target; - const text = event.body; - const body = text.length > 120 ? text.slice(0, 117) + '...' : text; - const fromShort = from.replace(/-[a-f0-9]{6,}$/, ''); - const toShort = to.replace(/-[a-f0-9]{6,}$/, ''); - this.log(`[msg] ${fromShort} → ${toShort}: ${body}`); - - if (this.channel && (to === this.channel || to === `#${this.channel}`)) { - const runtimeAgent = this.runtimeStepAgents.get(from); - this.recordChannelEvidence(text, { - sender: runtimeAgent?.logicalName ?? from, - actor: from, - role: runtimeAgent?.role, - target: to, - origin: 'relay_message', - stepName: runtimeAgent?.stepName, - }); - } - - const supervision = this.supervisedRuntimeAgents.get(from); - if (supervision?.role === 'owner') { - this.recordStepToolSideEffect(supervision.stepName, { - type: 'owner_monitoring', - detail: `Owner messaged ${to}: ${text.slice(0, 120)}`, - raw: { to, text }, - }); - void this.trajectory?.ownerMonitoringEvent( - supervision.stepName, - supervision.logicalName, - `Messaged ${to}: ${text.slice(0, 120)}`, - { to, text } - ); - } - break; - } - - case 'agent_spawned': { - // Skip agents already managed by step execution - if (!this.activeAgentHandles.has(event.name)) { - this.log(`[spawned] ${event.name} (${event.runtime})`); - } - break; - } - - case 'agent_exited': { - this.lastActivity.delete(event.name); - this.lastIdleLog.delete(event.name); - if (!this.activeAgentHandles.has(event.name)) { - this.log(`[exited] ${event.name} (code: ${event.code ?? '?'})`); - } - break; - } - - case 'agent_idle': { - const { name, idle_secs } = event; - // Only log at 30s multiples to avoid watchdog spam - const bucket = Math.floor(idle_secs / 30) * 30; - if (bucket >= 30 && this.lastIdleLog.get(name) !== bucket) { - this.lastIdleLog.set(name, bucket); - const shortName = name.replace(/-[a-f0-9]{6,}$/, ''); - this.log(`[idle] ${shortName} silent for ${bucket}s`); - } - break; - } - - default: - break; - } - }) - ); - - // Open the broker event stream so the dispatcher above receives events. - relay.connectEvents(); + this.currentBrokerContext = { + runId, + brokerName: this.buildBrokerName(runId), + channel, + relaycastDisabled, + }; + await this.startBroker(this.currentBrokerContext); this.relaycast = undefined; this.relaycastAgent = undefined; @@ -3741,15 +3864,7 @@ export class WorkflowRunner { this.unsubBrokerStderr?.(); this.unsubBrokerStderr = undefined; - // Unsubscribe relay event listeners to prevent leaks - for (const off of this.unsubRelayListeners) { - try { - off(); - } catch { - /* ignore */ - } - } - this.unsubRelayListeners = []; + this.clearRelayListeners(); this.lastIdleLog.clear(); this.lastActivity.clear(); this.supervisedRuntimeAgents.clear(); @@ -3758,6 +3873,8 @@ export class WorkflowRunner { this.log('Shutting down broker...'); await this.shutdownRelay(); + this.currentBrokerContext = undefined; + this.brokerRecoveryPromise = undefined; this.runStartTime = undefined; this.relaycast = undefined; this.relaycastAgent = undefined; @@ -4825,8 +4942,15 @@ export class WorkflowRunner { // - retries remaining => throw back into the loop and retry // - maxRetries = 0 => fail immediately after the first retry request // - retry budget exhausted => fail with retry_requested_by_owner, never "completed" + let repeatSameAttempt = false; + let transientNetworkRetries = 0; for (let attempt = 0; attempt <= maxRetries; attempt++) { this.checkAborted(); + const isSameAttemptReplay = repeatSameAttempt; + repeatSameAttempt = false; + if (!isSameAttemptReplay) { + transientNetworkRetries = 0; + } // Reset per-attempt exit info so stale values don't leak across retries lastExitCode = undefined; @@ -4837,7 +4961,7 @@ export class WorkflowRunner { lastAttemptReportCaptured = false; let stepOutputForDiagnostic = ''; - if (attempt > 0) { + if (attempt > 0 && !isSameAttemptReplay) { this.emit({ type: 'step:retrying', runId, stepName: step.name, attempt }); this.postToChannel(`**[${step.name}]** Retrying (attempt ${attempt + 1}/${maxRetries + 1})`); this.recordStepToolSideEffect(step.name, { @@ -4883,25 +5007,31 @@ export class WorkflowRunner { startedAt: state.row.startedAt, updatedAt: new Date().toISOString(), }); - this.emit({ type: 'step:started', runId, stepName: step.name }); - this.log(`[${step.name}] Started (owner: ${ownerDef.name}, specialist: ${specialistDef.name})`); + if (!isSameAttemptReplay) { + this.emit({ type: 'step:started', runId, stepName: step.name }); + } + this.log( + `[${step.name}] ${isSameAttemptReplay ? 'Retrying transient network failure' : 'Started'} (owner: ${ownerDef.name}, specialist: ${specialistDef.name})` + ); this.initializeStepSignalParticipants(step.name, ownerDef.name, specialistDef.name); - await this.trajectory?.stepStarted(step, ownerDef.name, { - role: usesDedicatedOwner ? 'owner' : 'specialist', - owner: ownerDef.name, - specialist: specialistDef.name, - reviewer: reviewDef?.name, - }); - if (usesDedicatedOwner) { - await this.trajectory?.stepSupervisionAssigned(step, supervised); + if (!isSameAttemptReplay) { + await this.trajectory?.stepStarted(step, ownerDef.name, { + role: usesDedicatedOwner ? 'owner' : 'specialist', + owner: ownerDef.name, + specialist: specialistDef.name, + reviewer: reviewDef?.name, + }); + if (usesDedicatedOwner) { + await this.trajectory?.stepSupervisionAssigned(step, supervised); + } + this.emit({ + type: 'step:owner-assigned', + runId, + stepName: step.name, + ownerName: ownerDef.name, + specialistName: specialistDef.name, + }); } - this.emit({ - type: 'step:owner-assigned', - runId, - stepName: step.name, - ownerName: ownerDef.name, - specialistName: specialistDef.name, - }); // Resolve step-output variables (e.g. {{steps.plan.output}}) at execution time const stepOutputContext = this.buildStepOutputContext(stepStates, runId); @@ -5206,6 +5336,23 @@ export class WorkflowRunner { await this.trajectory?.stepCompleted(step, combinedOutput, attempt + 1); return; } catch (err) { + if ( + this.isTransientAgentNetworkError(err) && + transientNetworkRetries < AGENT_TRANSIENT_NETWORK_MAX_ATTEMPTS - 1 + ) { + transientNetworkRetries += 1; + const message = err instanceof Error ? err.message : String(err); + this.log( + `[${step.name}] Transient network failure during agent step; replaying attempt ${attempt + 1}/${maxRetries + 1} (${transientNetworkRetries}/${AGENT_TRANSIENT_NETWORK_MAX_ATTEMPTS - 1}): ${message}` + ); + this.postToChannel( + `**[${step.name}]** Transient network issue; retrying current attempt (${transientNetworkRetries}/${AGENT_TRANSIENT_NETWORK_MAX_ATTEMPTS - 1})` + ); + repeatSameAttempt = true; + attempt -= 1; + await this.delay(AGENT_TRANSIENT_NETWORK_RETRY_DELAY_MS * transientNetworkRetries); + continue; + } if (process.env.RF_DEBUG_STACK) console.error('RF_DEBUG_STACK', (err as Error)?.stack); lastError = err instanceof Error ? err.message : String(err); lastCompletionReason = err instanceof WorkflowCompletionError ? err.completionReason : undefined; @@ -5509,9 +5656,11 @@ export class WorkflowRunner { return; } - const staleAgents = (await this.relay.listAgents()).filter( - (agent) => agent.name === baseRequestedName || agent.name.startsWith(`${baseRequestedName}-r`) - ); + const staleAgents = ( + await this.withBrokerRecovery(`listing stale retry agents for step "${stepName}"`, (relay) => + relay.listAgents() + ) + ).filter((agent) => agent.name === baseRequestedName || agent.name.startsWith(`${baseRequestedName}-r`)); if (staleAgents.length === 0) { return; } @@ -5520,12 +5669,18 @@ export class WorkflowRunner { this.log(`[${stepName}] Releasing stale retry agent(s): ${staleNames.join(', ')}`); for (const name of staleNames) { - await this.relay.release(name, `workflow retry cleanup for step "${stepName}"`); + await this.withBrokerRecovery(`releasing stale retry agent "${name}"`, (relay) => + relay.release(name, `workflow retry cleanup for step "${stepName}"`) + ); } const deadline = Date.now() + 5_000; while (Date.now() < deadline) { - const remaining = (await this.relay.listAgents()) + const remaining = ( + await this.withBrokerRecovery(`confirming retry cleanup for step "${stepName}"`, (relay) => + relay.listAgents() + ) + ) .map((agent) => agent.name) .filter((name) => staleNames.includes(name)); if (remaining.length === 0) { @@ -6966,10 +7121,6 @@ export class WorkflowRunner { return this.execNonInteractive(agentDef, step, timeoutMs); } - if (!this.relay) { - throw new Error('AgentRelay not initialized'); - } - const evidenceStepName = options.evidenceStepName ?? step.name; const baseRequestedName = this.buildWorkflowRuntimeAgentBaseName(step.name, options); @@ -7042,6 +7193,8 @@ export class WorkflowRunner { let exitResult: string = 'unknown'; let stopHeartbeat: (() => void) | undefined; let ptyChunks: string[] = []; + let agentReleased = false; + let completedWithoutSpawnError = false; try { const agentCwd = this.resolveExecutionCwd(step, agentDef); @@ -7070,10 +7223,12 @@ export class WorkflowRunner { }; this.log(`[${step.name}] Spawning ${agentDef.cli} (pty)`); agent = new WorkflowAgentHandle( - await this.relay.spawnPty({ - ...(spawnOptions as Record), - cli: agentDef.cli, - } as SpawnPtyInput) + await this.withBrokerRecovery(`spawning agent for step "${step.name}"`, (relay) => + relay.spawnPty({ + ...(spawnOptions as Record), + cli: agentDef.cli, + } as SpawnPtyInput) + ) ); // Re-key PTY maps if broker assigned a different name than requested @@ -7138,7 +7293,9 @@ export class WorkflowRunner { // Register in workers.json so `agents:kill` can find this agent let workerPid: number | undefined; try { - const rawAgents = await this.relay!.listAgents(); + const rawAgents = await this.withBrokerRecovery(`listing spawned agents for step "${step.name}"`, (relay) => + relay.listAgents() + ); workerPid = rawAgents.find((a) => a.name === agentName)?.pid ?? undefined; } catch { // Best-effort PID lookup @@ -7209,20 +7366,27 @@ export class WorkflowRunner { `**[${step.name}]** Agent idle after completing work — verification passed, releasing` ); await agent.release().catch(() => undefined); + agentReleased = true; timeoutRecovered = true; } } if (!timeoutRecovered) { await agent.release().catch(() => undefined); + agentReleased = true; throw new Error(`Step "${step.name}" timed out after ${timeoutMs ?? 'unknown'}ms`); } } if (exitResult === 'force-released') { + agentReleased = true; throw new Error( `Step "${step.name}" failed — agent was force-released after exhausting idle nudges without completing` ); } + if (exitResult === 'released') { + agentReleased = true; + } + completedWithoutSpawnError = true; } finally { // Snapshot PTY chunks before cleanup — we need them for output reading below ptyChunks = this.ptyOutputBuffers.get(agentName) ?? []; @@ -7246,6 +7410,10 @@ export class WorkflowRunner { ); } + if (!completedWithoutSpawnError && agent && !agentReleased) { + await agent.release().catch(() => undefined); + } + // Always clean up PTY resources — prevents fd leaks if spawnPty or waitForExit throws stopHeartbeat?.(); this.activeAgentHandles.delete(agentName); @@ -7521,11 +7689,13 @@ export class WorkflowRunner { if (hubAgent) { // Hub-mediated: tell the hub to check on the idle agent (sent as the hub). try { - await this.relay.sendMessage({ - from: hubAgent.name, - to: agent.name, - text: `Agent ${agent.name} appears idle on step "${step.name}". Check on them and remind them to /exit when done.`, - }); + await this.withBrokerRecovery(`nudging idle agent "${agent.name}" via hub`, (relay) => + relay.sendMessage({ + from: hubAgent.name, + to: agent.name, + text: `Agent ${agent.name} appears idle on step "${step.name}". Check on them and remind them to /exit when done.`, + }) + ); return; // Hub nudge succeeded } catch { // Fall through to direct nudge @@ -7533,12 +7703,13 @@ export class WorkflowRunner { } // Direct system injection from the workflow runner. - await this.relay - .sendMessage({ + await this.withBrokerRecovery(`nudging idle agent "${agent.name}" directly`, (relay) => + relay.sendMessage({ from: 'workflow-runner', to: agent.name, text: "You appear idle. If you've completed your task, output /exit. If still working, continue.", }) + ) .catch(() => { // Non-critical — don't break workflow });