Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 79 additions & 1 deletion packages/factory-sdk/src/cli/fleet.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ describe('fleet CLI runtime', () => {
try {
const configPath = await writeConfig(root)
const listeners = new Map<string, () => void>()
const calls: string[] = []
const processLike = {
once(signal: string, listener: () => void) {
listeners.set(signal, listener)
Expand All @@ -456,7 +457,9 @@ describe('fleet CLI runtime', () => {
}
const factory = {
start: vi.fn(async () => {}),
stop: vi.fn(async () => {}),
stop: vi.fn(async () => {
calls.push('stop')
}),
runLoop: vi.fn(async () => []),
runOnce: vi.fn(),
status: vi.fn(),
Expand All @@ -466,6 +469,7 @@ describe('fleet CLI runtime', () => {
dispose: vi.fn(),
} as unknown as Factory
const createFactory = vi.fn(() => factory)
const daemonExits: number[] = []

const run = runFleetCli([
'factory',
Expand All @@ -479,6 +483,13 @@ describe('fleet CLI runtime', () => {
mount: new FakeMountClient(),
createFactory,
stopSignalProcessLike: processLike as unknown as Pick<NodeJS.Process, 'once' | 'off'>,
flushDaemonOutput: async () => {
calls.push('flush')
},
daemonExit: (code) => {
calls.push('exit')
daemonExits.push(code)
},
stdout: buffer(),
stderr: buffer(),
})
Expand All @@ -489,12 +500,79 @@ describe('fleet CLI runtime', () => {

await expect(run).resolves.toBe(0)
expect(factory.stop).toHaveBeenCalledTimes(1)
expect(calls).toEqual(['stop', 'flush', 'exit'])
expect(daemonExits).toEqual([0])
expect(listeners.size).toBe(0)
} finally {
await rm(root, { recursive: true, force: true })
}
})

it('does not force process exit for one-shot factory commands', async () => {
const root = await mkdtemp(join(tmpdir(), 'fleet-cli-one-shot-no-force-exit-'))
try {
const configPath = await writeConfig(root)
const daemonExits: number[] = []
const daemonFlushes: string[] = []
const runOnceFactory = {
start: vi.fn(),
stop: vi.fn(),
runLoop: vi.fn(async () => []),
runOnce: vi.fn(async () => ({ pulled: [], triaged: [], dispatched: [], skipped: [], dryRun: true })),
status: vi.fn(),
triageIssue: vi.fn(),
dispatch: vi.fn(),
on: vi.fn(),
dispose: vi.fn(),
} as unknown as Factory

const runOnceCode = await runFleetCli([
'--dry-run',
'factory',
'run-once',
'--config',
configPath,
], {
fleet: new FakeFleetClient(),
mount: new FakeMountClient(),
createFactory: () => runOnceFactory,
daemonExit: (code) => {
daemonExits.push(code)
},
flushDaemonOutput: async () => {
daemonFlushes.push('flush')
},
stdout: buffer(),
stderr: buffer(),
})

const reapCode = await runFleetCli([
'factory',
'reap-orphans',
'--config',
configPath,
], {
fleet: new FakeFleetClient(),
mount: new FakeMountClient(),
daemonExit: (code) => {
daemonExits.push(code)
},
flushDaemonOutput: async () => {
daemonFlushes.push('flush')
},
stdout: buffer(),
stderr: buffer(),
})

expect(runOnceCode).toBe(0)
expect(reapCode).toBe(0)
expect(daemonExits).toEqual([])
expect(daemonFlushes).toEqual([])
} finally {
await rm(root, { recursive: true, force: true })
}
})

it('factory kill-loop sends SIGTERM to the heartbeat pid', async () => {
const root = await mkdtemp(join(tmpdir(), 'fleet-cli-kill-'))
const originalKill = process.kill
Expand Down
30 changes: 29 additions & 1 deletion packages/factory-sdk/src/cli/fleet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ interface FleetCliDeps {
probeCloser?: ProbeCloser
now?: () => number
stopSignalProcessLike?: Pick<NodeJS.Process, 'once' | 'off'>
daemonExit?: (code: number) => void
flushDaemonOutput?: () => Promise<void>
}

interface GlobalOptions {
Expand Down Expand Up @@ -215,10 +217,19 @@ async function runFactoryCommand(
if (command.action === 'start') {
const waiter = createStopSignalWaiter()
let stoppedBySignal = false
const flushAndExit = async (code: number): Promise<void> => {
try {
await (deps.flushDaemonOutput ?? flushProcessOutput)()
} finally {
const daemonExit = deps.daemonExit ?? ((exitCode: number) => process.exit(exitCode))
daemonExit(code)
waiter.resolve(code)
}
}
Comment on lines +220 to +228

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If deps.flushDaemonOutput or flushProcessOutput rejects, the error will propagate out of flushAndExit because there is no catch block. Since flushAndExit is called as a floating promise (void flushAndExit(code)), this will trigger an unhandled promise rejection. This is especially problematic in tests where daemonExit does not terminate the process.

Adding a catch block ensures that any flush errors are safely ignored and do not cause unhandled rejections.

      const flushAndExit = async (code: number): Promise<void> => {
        try {
          await (deps.flushDaemonOutput ?? flushProcessOutput)()
        } catch {
          // Ignore flush errors to ensure we still exit cleanly
        } finally {
          const daemonExit = deps.daemonExit ?? ((exitCode: number) => process.exit(exitCode))
          daemonExit(code)
          waiter.resolve(code)
        }
      }

const removeSignalHandlers = installFactoryStopSignalHandlers(factory, {
exit: (code) => {
stoppedBySignal = true
waiter.resolve(code)
void flushAndExit(code)
},
processLike: deps.stopSignalProcessLike,
})
Expand Down Expand Up @@ -467,6 +478,23 @@ function writeJson(out: Pick<NodeJS.WriteStream, 'write'>, value: unknown): void
out.write(`${JSON.stringify(value, null, 2)}\n`)
}

async function flushProcessOutput(): Promise<void> {
await Promise.all([
flushWritable(process.stdout),
flushWritable(process.stderr),
])
}

function flushWritable(stream: NodeJS.WriteStream): Promise<void> {
if (stream.destroyed || stream.writableEnded || stream.writable === false) {
return Promise.resolve()
}

return new Promise((resolve) => {
stream.write('', () => resolve())
})
}
Comment on lines +481 to +496

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If the output streams (process.stdout or process.stderr) are piped to another process that has terminated or stopped reading, writing to them can block indefinitely or the callback might never be invoked. This would cause the daemon to hang indefinitely during shutdown, defeating the purpose of a fast and graceful exit.

To prevent this, we should:

  1. Introduce a timeout (e.g., 200ms) to the flush operation so that we always proceed to exit even if the streams are clogged.
  2. Safely catch any synchronous errors during stream.write to avoid unhandled promise rejections.
async function flushProcessOutput(): Promise<void> {
  let timeoutId: NodeJS.Timeout | undefined
  const flushPromise = Promise.all([
    flushWritable(process.stdout),
    flushWritable(process.stderr),
  ])
  const timeoutPromise = new Promise<void>((resolve) => {
    timeoutId = setTimeout(resolve, 200)
  })
  await Promise.race([flushPromise, timeoutPromise])
  if (timeoutId) {
    clearTimeout(timeoutId)
  }
}

function flushWritable(stream: NodeJS.WriteStream): Promise<void> {
  if (stream.destroyed || stream.writableEnded || stream.writable === false) {
    return Promise.resolve()
  }

  return new Promise<void>((resolve) => {
    try {
      stream.write('', () => resolve())
    } catch {
      resolve()
    }
  })
}


function isCapability(value: string | undefined): value is Capability {
return value === 'spawn:codex' || value === 'spawn:claude'
}
Expand Down
Loading