Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions packages/core/src/__tests__/workflow-reliability-contract.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,49 @@ describe('workflow reliability contract', () => {
}
});

it('retries transient agent-step network failures without consuming the step retry budget', async () => {
const executeAgentStep = vi
.fn()
.mockRejectedValueOnce(new Error('fetch failed'))
.mockResolvedValueOnce('artifact complete\nRICKY_MASTER_CHILD_RUN_VERIFIED');
const events: WorkflowEvent[] = [];
const runner = new WorkflowRunner({
db: makeDb(),
workspaceId: 'ws-test',
cwd: process.cwd(),
executor: { executeAgentStep },
});
runner.on((event) => events.push(event));

const run = await runner.execute(
baseConfig({
errorHandling: { strategy: 'fail-fast' },
workflows: [
{
name: 'default',
steps: [
{
name: 'write-artifact',
agent: 'fixer',
task: 'Write a structured workflow artifact.',
retries: 0,
verification: {
type: 'output_contains',
value: 'RICKY_MASTER_CHILD_RUN_VERIFIED',
},
},
],
},
],
}),
'default'
);

expect(run.status, run.error).toBe('completed');
expect(executeAgentStep).toHaveBeenCalledTimes(2);
expect(events.some((event) => event.type === 'step:retrying')).toBe(false);
});

it('repairs malformed agent artifacts before retrying the agent step', async () => {
const executeAgentStep = vi.fn(async (step) => {
if (step.name.includes('-repair-')) return 'patched artifact instructions';
Expand Down
177 changes: 176 additions & 1 deletion packages/core/src/__tests__/workflow-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ vi.mock('@relaycast/sdk', () => ({
let waitForExitFn: (ms?: number) => Promise<'exited' | 'timeout' | 'released'>;
let waitForIdleFn: (ms?: number) => Promise<'idle' | 'timeout' | 'exited'>;
let mockSpawnOutputs: string[] = [];
const mockHarnessDriverSpawn = vi.fn(async () => mockRelayInstance);

// Spawned-agent handle shaped like harness-driver's SpawnedAgentHandle, but
// driven by the test's waitForExitFn/waitForIdleFn.
Expand Down Expand Up @@ -146,6 +147,7 @@ const mockRelayInstance = {
eventListeners.add(cb);
return () => eventListeners.delete(cb);
}),
onBrokerExit: vi.fn(() => () => {}),
connectEvents: vi.fn(),
getStatus: vi.fn().mockResolvedValue({ status: 'running' }),
listAgents: vi.fn().mockResolvedValue([]),
Expand All @@ -161,7 +163,7 @@ vi.mock('@agent-relay/harness-driver', async (importOriginal) => {
...actual,
HarnessDriverClient: {
connect: vi.fn(() => mockRelayInstance),
spawn: vi.fn(async () => mockRelayInstance),
spawn: mockHarnessDriverSpawn,
},
};
});
Expand Down Expand Up @@ -293,6 +295,7 @@ describe('WorkflowRunner', () => {
waitForExitFn = vi.fn().mockResolvedValue('exited');
waitForIdleFn = vi.fn().mockImplementation(() => never());
mockSpawnOutputs = [];
mockHarnessDriverSpawn.mockImplementation(async () => mockRelayInstance);
mockRelayInstance.spawnPty.mockImplementation(defaultSpawnPtyImplementation);
eventListeners.clear();
db = makeDb();
Expand Down Expand Up @@ -575,6 +578,178 @@ agents:
expect(startedSteps).toHaveLength(2);
});

it('restarts the broker when a PTY spawn fails with a transient transport error', async () => {
const tmpDir = mkdtempSync(path.join(os.tmpdir(), 'relayflows-broker-recovery-'));
const failingRelay = {
...mockRelayInstance,
spawnPty: vi.fn(async () => {
const error = new Error('fetch failed');
(error as Error & { retryable?: boolean }).retryable = true;
throw error;
}),
shutdown: vi.fn().mockResolvedValue(undefined),
};
const recoveredRelay = {
...mockRelayInstance,
spawnPty: vi.fn().mockImplementation(defaultSpawnPtyImplementation),
shutdown: vi.fn().mockResolvedValue(undefined),
};
mockHarnessDriverSpawn
.mockImplementationOnce(async () => failingRelay as any)
.mockImplementationOnce(async () => recoveredRelay as any);

try {
const isolatedRunner = new WorkflowRunner({
db,
workspaceId: 'ws-test',
cwd: tmpDir,
relay: {
env: {
AGENT_RELAY_WORKFLOW_DISABLE_RELAYCAST: '1',
AGENT_RELAY_STATE_DIR: path.join(tmpDir, '.agentworkforce', 'relay'),
},
},
});

const run = await isolatedRunner.execute(
makeConfig({
workflows: [{ name: 'default', steps: [{ name: 'step-1', agent: 'agent-a', task: 'Do step 1' }] }],
}),
'default'
);

expect(run.status, run.error).toBe('completed');
expect(mockHarnessDriverSpawn).toHaveBeenCalledTimes(2);
expect(failingRelay.shutdown).toHaveBeenCalledTimes(1);
expect(recoveredRelay.spawnPty).toHaveBeenCalledTimes(1);
} finally {
rmSync(tmpDir, { recursive: true, force: true });
}
});

it('recovers when a broker exit clears the relay client before the next spawn', async () => {
const tmpDir = mkdtempSync(path.join(os.tmpdir(), 'relayflows-broker-exit-'));
const exitingRelay = {
...mockRelayInstance,
brokerPid: 41,
onBrokerExit: vi.fn((cb: (info: { pid?: number; code?: number; signal?: string }) => void) => {
queueMicrotask(() => cb({ pid: 41, code: 1, signal: 'SIGTERM' }));
return () => {};
}),
spawnPty: vi.fn().mockImplementation(defaultSpawnPtyImplementation),
shutdown: vi.fn().mockResolvedValue(undefined),
};
const recoveredRelay = {
...mockRelayInstance,
spawnPty: vi.fn().mockImplementation(defaultSpawnPtyImplementation),
shutdown: vi.fn().mockResolvedValue(undefined),
};
mockHarnessDriverSpawn
.mockImplementationOnce(async () => exitingRelay as any)
.mockImplementationOnce(async () => recoveredRelay as any);

try {
const isolatedRunner = new WorkflowRunner({
db,
workspaceId: 'ws-test',
cwd: tmpDir,
relay: {
env: {
AGENT_RELAY_WORKFLOW_DISABLE_RELAYCAST: '1',
AGENT_RELAY_STATE_DIR: path.join(tmpDir, '.agentworkforce', 'relay'),
},
},
});

const run = await isolatedRunner.execute(
makeConfig({
workflows: [{ name: 'default', steps: [{ name: 'step-1', agent: 'agent-a', task: 'Do step 1' }] }],
}),
'default'
);

expect(run.status, run.error).toBe('completed');
expect(mockHarnessDriverSpawn).toHaveBeenCalledTimes(2);
expect(exitingRelay.onBrokerExit).toHaveBeenCalledTimes(1);
expect(recoveredRelay.spawnPty).toHaveBeenCalledTimes(1);
} finally {
rmSync(tmpDir, { recursive: true, force: true });
}
});

it('refuses broker recovery while workflow agents are still active', async () => {
const candidate = new WorkflowRunner({ db, workspaceId: 'ws-test' });
(candidate as any).currentBrokerContext = {
runId: 'run-active',
brokerName: 'relayflows-run-active',
channel: 'wf-active',
relaycastDisabled: true,
};
(candidate as any).activeAgentHandles.set('active-agent', { name: 'active-agent' });

await expect((candidate as any).recoverBroker('spawn failed')).rejects.toThrow(
'Broker recovery is unsafe while 1 agent is still active: active-agent'
);
expect(mockHarnessDriverSpawn).not.toHaveBeenCalled();
});

it('releases the live PTY before replaying a transient same-attempt network failure', async () => {
const tmpDir = mkdtempSync(path.join(os.tmpdir(), 'relayflows-transient-replay-'));
const firstHandle = {
...makeMockHandle('step-1-transient'),
release: vi.fn().mockResolvedValue({ name: 'step-1-transient' }),
};
const secondHandle = {
...makeMockHandle('step-1-transient'),
release: vi.fn().mockResolvedValue({ name: 'step-1-transient' }),
};
let spawnCount = 0;
const replayRelay = {
...mockRelayInstance,
spawnPty: vi.fn(async ({ name }: { name: string }) => {
spawnCount += 1;
const handle = spawnCount === 1 ? firstHandle : secondHandle;
if (spawnCount === 2) {
queueMicrotask(() => {
emitMockEvent('workerOutput', { name, chunk: 'STEP_COMPLETE:step-1\n' });
});
}
return handle as any;
}),
};
mockHarnessDriverSpawn.mockImplementation(async () => replayRelay as any);

try {
const isolatedRunner = new WorkflowRunner({
db,
workspaceId: 'ws-test',
cwd: tmpDir,
relay: {
env: {
AGENT_RELAY_WORKFLOW_DISABLE_RELAYCAST: '1',
AGENT_RELAY_STATE_DIR: path.join(tmpDir, '.agentworkforce', 'relay'),
},
},
});
vi.spyOn(isolatedRunner as any, 'waitForExitWithIdleNudging')
.mockRejectedValueOnce(Object.assign(new Error('fetch failed'), { retryable: true }))
.mockResolvedValueOnce('exited');

const run = await isolatedRunner.execute(
makeConfig({
workflows: [{ name: 'default', steps: [{ name: 'step-1', agent: 'agent-a', task: 'Do step 1' }] }],
}),
'default'
);

expect(run.status, run.error).toBe('completed');
expect(replayRelay.spawnPty).toHaveBeenCalledTimes(2);
expect(firstHandle.release).toHaveBeenCalledTimes(1);
} finally {
rmSync(tmpDir, { recursive: true, force: true });
}
});

it('should emit owner assignment and review completion events for interactive steps', async () => {
const events: Array<{ type: string; stepName?: string }> = [];
runner.on((event) =>
Expand Down
Loading