Harden factory loop dedup and teardown#248
Conversation
📝 WalkthroughWalkthroughThis PR extends the factory-sdk with bounded loop execution, heartbeat persistence, dispatch attempt gating with backoff/retry logic, and new CLI actions. It introduces loop types and configuration, implements ChangesFactory Loop Execution and Management
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ESLint
ESLint install failed due to a network error. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a factory loop runner (runLoop) with heartbeat-backed status monitoring, loop-status, and kill-loop CLI commands, alongside dispatch attempt deduplication, error backoff, and Slack watcher cleanup. The review feedback focuses on improving the graceful shutdown of the factory loop. Key recommendations include implementing a dispose() method that awaits the active loop promise, tracking this promise with a private field, updating signal handlers and CLI commands to call dispose() instead of stop(), and enhancing runLoop to support graceful cancellation and robust error handling.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| if (!stopping) { | ||
| stopping = factory.stop() | ||
| } |
There was a problem hiding this comment.
Call factory.dispose() instead of factory.stop() in the signal handler. This ensures that if a SIGINT or SIGTERM is received while the factory loop is running, the process will wait for the current iteration to finish gracefully before exiting, rather than terminating abruptly mid-execution.
| if (!stopping) { | |
| stopping = factory.stop() | |
| } | |
| if (!stopping) { | |
| stopping = factory.dispose() | |
| } |
| async dispose(): Promise<void> { | ||
| await this.stop() | ||
| } |
There was a problem hiding this comment.
Implement dispose() to await the active loop promise if it exists. This ensures that when the factory is disposed (e.g., during a graceful shutdown or signal handling), the current loop iteration completes cleanly before the process exits.
async dispose(): Promise<void> {
await this.stop()
if (this.#activeLoopPromise) {
try {
await this.#activeLoopPromise
} catch {
// ignore
}
}
}| async runLoop(opts: FactoryLoopRunOptions = {}): Promise<IterationReport[]> { | ||
| const maxIterations = Math.min(5, Math.max(1, Math.trunc(opts.maxIterations ?? this.#config.loop.maxIterations))) | ||
| const heartbeatPath = opts.heartbeatPath ?? this.#config.loop.heartbeatPath | ||
| const reports: IterationReport[] = [] | ||
| let completed = false | ||
| try { | ||
| for (let iteration = 0; iteration < maxIterations; iteration += 1) { | ||
| await this.#writeLoopHeartbeat(heartbeatPath, 'running', iteration, maxIterations) | ||
| reports.push(await this.runOnce({ dryRun: opts.dryRun })) | ||
| await this.#writeLoopHeartbeat(heartbeatPath, 'running', iteration + 1, maxIterations) | ||
| } | ||
| this.#increment('loopIdle') | ||
| await this.#writeLoopHeartbeat(heartbeatPath, 'idle', reports.length, maxIterations) | ||
| completed = true | ||
| return reports | ||
| } finally { | ||
| if (!completed) { | ||
| await this.#writeLoopHeartbeat(heartbeatPath, 'stopping', reports.length, maxIterations) | ||
| } | ||
| await this.stop() | ||
| } | ||
| } |
There was a problem hiding this comment.
Enhance runLoop to support graceful cancellation and robust error handling:
- Set
this.#started = trueat the start of the loop and checkif (!this.#started) breakto allow graceful early exit whenstop()is called. - Wrap
runOncein atry/catchblock so that a single iteration failure (e.g., due to a transient network or dispatch error) does not crash the entire loop runner. - Track the active loop promise in
this.#activeLoopPromiseand clear it in thefinallyblock to avoid deadlocks during shutdown.
async runLoop(opts: FactoryLoopRunOptions = {}): Promise<IterationReport[]> {
this.#started = true
const maxIterations = Math.min(5, Math.max(1, Math.trunc(opts.maxIterations ?? this.#config.loop.maxIterations)))
const heartbeatPath = opts.heartbeatPath ?? this.#config.loop.heartbeatPath
const reports: IterationReport[] = []
let completed = false
const promise = (async () => {
for (let iteration = 0; iteration < maxIterations; iteration += 1) {
if (!this.#started) {
break
}
await this.#writeLoopHeartbeat(heartbeatPath, 'running', iteration, maxIterations)
try {
reports.push(await this.runOnce({ dryRun: opts.dryRun }))
} catch (error) {
this.#logger.error?.(`[factory] loop iteration ${iteration} failed`, error)
}
await this.#writeLoopHeartbeat(heartbeatPath, 'running', iteration + 1, maxIterations)
}
if (this.#started) {
this.#increment('loopIdle')
await this.#writeLoopHeartbeat(heartbeatPath, 'idle', reports.length, maxIterations)
completed = true
}
return reports
})()
this.#activeLoopPromise = promise
try {
return await promise
} finally {
this.#activeLoopPromise = undefined
if (!completed) {
await this.#writeLoopHeartbeat(heartbeatPath, 'stopping', reports.length, maxIterations)
}
await this.stop()
}
}| removeSignalHandlers() | ||
| await factory.stop() |
There was a problem hiding this comment.
| readonly #dispatchAttempts = new Map<string, DispatchAttemptState>() | ||
| #subscription?: Subscription | ||
| #livePollTimer?: ReturnType<typeof setTimeout> | ||
| #livePollInFlight = false |
There was a problem hiding this comment.
Add a private field #activeLoopPromise to track the active loop execution promise so that graceful shutdown can await its completion.
| readonly #dispatchAttempts = new Map<string, DispatchAttemptState>() | |
| #subscription?: Subscription | |
| #livePollTimer?: ReturnType<typeof setTimeout> | |
| #livePollInFlight = false | |
| readonly #dispatchAttempts = new Map<string, DispatchAttemptState>() | |
| #activeLoopPromise?: Promise<unknown> | |
| #subscription?: Subscription | |
| #livePollTimer?: ReturnType<typeof setTimeout> | |
| #livePollInFlight = false |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/factory-sdk/src/orchestrator/factory.ts (1)
461-500:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftRelease partially spawned agents before abandoning the batch record.
If any spawn succeeds and a later step in this
tryfails, Line 498 forgets the issue without releasing the agents already recorded onrecord.agents. That leaks live workers and allows the next retry to dispatch the same issue again while the first wave is still running.Suggested direction
} catch (error) { + const spawnedAgents = [...record.agents.keys()] + await Promise.allSettled( + spawnedAgents.map((name) => this.#fleet.release(name, 'dispatch-failed')), + ) this.#recordDispatchFailure(decision.issue) this.#batch.abandon(decision.issue) this.#error(error, decision.issue) throw error }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/factory-sdk/src/orchestrator/factory.ts` around lines 461 - 500, If any agents are successfully spawned by this.#spawnAgent but a later step in the try block fails, the code currently abandons the batch (this.#batch.abandon) and records failure (this.#recordDispatchFailure) without releasing the live agents recorded on record.agents, leaking workers; fix this by, in the catch path before abandoning the batch and rethrowing, iterating the spawned agents (use the local agents array or record.agents) and call the appropriate release/cleanup for each (e.g., await this.#releaseAgent(...) or the agent.release method), handle any release errors with logging, then proceed to call this.#recordDispatchFailure and this.#batch.abandon and rethrow the original error.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/factory-sdk/src/cli/fleet.ts`:
- Line 111: The loop-status and kill-loop commands should be handled before
constructing runtime dependencies (fleet, mount, factory): detect when command
=== 'loop-status' or 'kill-loop' (and the same logic referenced in the 183-223
region) and short-circuit to read loaded.config.loop.* and return the
heartbeat/kill result immediately instead of calling runFactoryCommand or
initializing fleet/mount/factory; move that branch above the code that builds
factory/fleet/mount so these actions only use loaded.config and globals and do
not trigger runtime bootstrapping.
- Around line 205-211: For the 'kill-loop' branch (command.action ===
'kill-loop'), don't blindly trust readFactoryLoopHeartbeat; call
checkFactoryLoopLiveness(heartbeat, { staleMs: config.loop.heartbeatStaleMs })
and if it reports not alive or returns false/throws, reject by throwing an error
(refuse stale/missing heartbeats). Only after liveness check succeeds, call
process.kill(heartbeat.pid, 'SIGTERM') and writeJson(out, { killed:
heartbeat.pid, signal: 'SIGTERM' }); use the existing symbols
readFactoryLoopHeartbeat, checkFactoryLoopLiveness,
config.loop.heartbeatStaleMs, process.kill, and writeJson to locate and
implement the change.
- Around line 418-425: stopAndExit currently attaches a new .finally() each time
it's called, so multiple signals can trigger multiple exits; change it so the
function is single-fire: as soon as shutdown starts (when setting the stopping
promise from factory.stop()), immediately remove the signal listeners (or call
remove()) and return early on subsequent invocations by checking if stopping is
already set (use the stopping variable as the gate). Ensure you only attach the
.finally() once to the stopping promise so remove() and exit(code) run exactly
once; reference stopAndExit, stopping, factory.stop(), remove(), and exit() when
making this change.
In `@packages/factory-sdk/src/orchestrator/factory.ts`:
- Around line 1205-1233: The heartbeat reading/trust issue: ensure malformed
JSON is rejected by validating the parsed object in readFactoryLoopHeartbeat
(and/or at the start of checkFactoryLoopLiveness) so bad files are treated as
"missing" not "live"; specifically, after parseJsonContent in
readFactoryLoopHeartbeat validate that the returned object has a finite numeric
updatedAtMs, an allowed status string (e.g., 'running'|'stopping' if those are
used), and if present a numeric pid, otherwise return undefined; alternatively
add the same sanity checks at the top of checkFactoryLoopLiveness (verify
Number.isFinite(heartbeat.updatedAtMs) before computing ageMs and reject
non-finite values) so NaN ages and invalid pids cannot cause ok:true or be used
by kill-loop.
---
Outside diff comments:
In `@packages/factory-sdk/src/orchestrator/factory.ts`:
- Around line 461-500: If any agents are successfully spawned by
this.#spawnAgent but a later step in the try block fails, the code currently
abandons the batch (this.#batch.abandon) and records failure
(this.#recordDispatchFailure) without releasing the live agents recorded on
record.agents, leaking workers; fix this by, in the catch path before abandoning
the batch and rethrowing, iterating the spawned agents (use the local agents
array or record.agents) and call the appropriate release/cleanup for each (e.g.,
await this.#releaseAgent(...) or the agent.release method), handle any release
errors with logging, then proceed to call this.#recordDispatchFailure and
this.#batch.abandon and rethrow the original error.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: c608c53b-4eb9-4871-a600-bd832cfc8d59
📒 Files selected for processing (9)
packages/factory-sdk/src/cli/fleet.test.tspackages/factory-sdk/src/cli/fleet.tspackages/factory-sdk/src/config/schema.tspackages/factory-sdk/src/index.tspackages/factory-sdk/src/orchestrator/batch-tracker.tspackages/factory-sdk/src/orchestrator/factory.test.tspackages/factory-sdk/src/orchestrator/factory.tspackages/factory-sdk/src/orchestrator/index.tspackages/factory-sdk/src/types.ts
| const mount = await buildMount(loaded, deps) | ||
| const factory = createFactory(loaded.config, { mount, fleet }) | ||
| return await runFactoryCommand(command, factory, mount, globals, out) | ||
| return await runFactoryCommand(command, factory, mount, loaded.config, globals, out) |
There was a problem hiding this comment.
Decouple loop-status / kill-loop from Fleet/Mount bootstrap.
These two actions only read config.loop.*, but this path still requires a fully initialized fleet, mount, and factory before it can reach the heartbeat logic. That makes the recovery commands fail on unrelated broker or mount startup errors, which is exactly when operators need them to keep working. Short-circuit these actions before runtime dependency construction.
Also applies to: 183-223
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/factory-sdk/src/cli/fleet.ts` at line 111, The loop-status and
kill-loop commands should be handled before constructing runtime dependencies
(fleet, mount, factory): detect when command === 'loop-status' or 'kill-loop'
(and the same logic referenced in the 183-223 region) and short-circuit to read
loaded.config.loop.* and return the heartbeat/kill result immediately instead of
calling runFactoryCommand or initializing fleet/mount/factory; move that branch
above the code that builds factory/fleet/mount so these actions only use
loaded.config and globals and do not trigger runtime bootstrapping.
| if (command.action === 'kill-loop') { | ||
| const heartbeat = await readFactoryLoopHeartbeat(config.loop.heartbeatPath) | ||
| if (!heartbeat?.pid) { | ||
| throw new Error(`No factory loop heartbeat at ${config.loop.heartbeatPath}`) | ||
| } | ||
| process.kill(heartbeat.pid, 'SIGTERM') | ||
| writeJson(out, { killed: heartbeat.pid, signal: 'SIGTERM' }) |
There was a problem hiding this comment.
Reject stale heartbeats before sending SIGTERM.
kill-loop currently trusts any PID found in the heartbeat file. If that file is stale and the OS has already reused the PID, this can terminate an unrelated process. Gate the kill with checkFactoryLoopLiveness(..., { staleMs: config.loop.heartbeatStaleMs }) and refuse stale/missing heartbeats.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/factory-sdk/src/cli/fleet.ts` around lines 205 - 211, For the
'kill-loop' branch (command.action === 'kill-loop'), don't blindly trust
readFactoryLoopHeartbeat; call checkFactoryLoopLiveness(heartbeat, { staleMs:
config.loop.heartbeatStaleMs }) and if it reports not alive or returns
false/throws, reject by throwing an error (refuse stale/missing heartbeats).
Only after liveness check succeeds, call process.kill(heartbeat.pid, 'SIGTERM')
and writeJson(out, { killed: heartbeat.pid, signal: 'SIGTERM' }); use the
existing symbols readFactoryLoopHeartbeat, checkFactoryLoopLiveness,
config.loop.heartbeatStaleMs, process.kill, and writeJson to locate and
implement the change.
| const stopAndExit = (code: number) => { | ||
| if (!stopping) { | ||
| stopping = factory.stop() | ||
| } | ||
| process.once('SIGINT', stop) | ||
| process.once('SIGTERM', stop) | ||
| }) | ||
| void stopping.finally(() => { | ||
| remove() | ||
| exit(code) | ||
| }) |
There was a problem hiding this comment.
Make shutdown handling single-fire across repeated signals.
If a second SIGINT/SIGTERM arrives before factory.stop() resolves, this attaches another .finally() to the same promise, so exit() can run multiple times with conflicting codes when a custom exit callback is injected. Remove the listeners or return early as soon as shutdown starts, not after the stop promise settles.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/factory-sdk/src/cli/fleet.ts` around lines 418 - 425, stopAndExit
currently attaches a new .finally() each time it's called, so multiple signals
can trigger multiple exits; change it so the function is single-fire: as soon as
shutdown starts (when setting the stopping promise from factory.stop()),
immediately remove the signal listeners (or call remove()) and return early on
subsequent invocations by checking if stopping is already set (use the stopping
variable as the gate). Ensure you only attach the .finally() once to the
stopping promise so remove() and exit(code) run exactly once; reference
stopAndExit, stopping, factory.stop(), remove(), and exit() when making this
change.
| export async function readFactoryLoopHeartbeat( | ||
| path = DEFAULT_FACTORY_LOOP_HEARTBEAT_PATH, | ||
| ): Promise<FactoryLoopHeartbeat | undefined> { | ||
| try { | ||
| return parseJsonContent(await readFile(path, 'utf8')) as FactoryLoopHeartbeat | ||
| } catch { | ||
| return undefined | ||
| } | ||
| } | ||
|
|
||
| export function checkFactoryLoopLiveness( | ||
| heartbeat: FactoryLoopHeartbeat | undefined, | ||
| opts: { nowMs?: number; staleMs?: number } = {}, | ||
| ): FactoryLoopLiveness { | ||
| if (!heartbeat) { | ||
| return { ok: false, stale: true, reason: 'heartbeat missing' } | ||
| } | ||
|
|
||
| const nowMs = opts.nowMs ?? Date.now() | ||
| const staleMs = opts.staleMs ?? 60_000 | ||
| const ageMs = Math.max(0, nowMs - heartbeat.updatedAtMs) | ||
| const stale = ageMs > staleMs | ||
| if (stale) { | ||
| return { ok: false, stale: true, ageMs, heartbeat, reason: 'heartbeat stale' } | ||
| } | ||
| if (heartbeat.status === 'stopping') { | ||
| return { ok: false, stale: false, ageMs, heartbeat, reason: 'loop stopping' } | ||
| } | ||
| return { ok: true, stale: false, ageMs, heartbeat } |
There was a problem hiding this comment.
Validate the heartbeat file before using it for liveness or PID-based control.
This helper trusts arbitrary JSON as FactoryLoopHeartbeat. A malformed file can make checkFactoryLoopLiveness() report ok: true because nowMs - heartbeat.updatedAtMs becomes NaN, and the same unchecked pid is what the CLI later uses for kill-loop. Invalid heartbeat content should be rejected as unreadable, not treated as live.
Suggested direction
export async function readFactoryLoopHeartbeat(
path = DEFAULT_FACTORY_LOOP_HEARTBEAT_PATH,
): Promise<FactoryLoopHeartbeat | undefined> {
try {
- return parseJsonContent(await readFile(path, 'utf8')) as FactoryLoopHeartbeat
+ const parsed = parseJsonContent(await readFile(path, 'utf8'))
+ if (
+ parsed &&
+ typeof parsed === 'object' &&
+ typeof (parsed as Record<string, unknown>).pid === 'number' &&
+ typeof (parsed as Record<string, unknown>).updatedAtMs === 'number' &&
+ typeof (parsed as Record<string, unknown>).updatedAt === 'string' &&
+ typeof (parsed as Record<string, unknown>).iteration === 'number' &&
+ typeof (parsed as Record<string, unknown>).maxIterations === 'number' &&
+ ['running', 'idle', 'stopping'].includes(String((parsed as Record<string, unknown>).status))
+ ) {
+ return parsed as FactoryLoopHeartbeat
+ }
+ return undefined
} catch {
return undefined
}
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/factory-sdk/src/orchestrator/factory.ts` around lines 1205 - 1233,
The heartbeat reading/trust issue: ensure malformed JSON is rejected by
validating the parsed object in readFactoryLoopHeartbeat (and/or at the start of
checkFactoryLoopLiveness) so bad files are treated as "missing" not "live";
specifically, after parseJsonContent in readFactoryLoopHeartbeat validate that
the returned object has a finite numeric updatedAtMs, an allowed status string
(e.g., 'running'|'stopping' if those are used), and if present a numeric pid,
otherwise return undefined; alternatively add the same sanity checks at the top
of checkFactoryLoopLiveness (verify Number.isFinite(heartbeat.updatedAtMs)
before computing ageMs and reject non-finite values) so NaN ages and invalid
pids cannot cause ok:true or be used by kill-loop.
|
Implemented one scoped fix in the PR:
Addressed comments
Advisory NotesNone. Validation run:
Also checked for merge conflict markers with |
Summary
Implements V2FIX-2 hardening for the factory loop:
ts.dispose(), loopfinallystop, Slack watcher timer cleanup, and CLI SIGINT/SIGTERM stop-before-exit handling.factory loop-statusandfactory kill-loopsingle-command kill path.V0 Evidence
npx vitest run packages/factory-sdk-> 205 passed / 18 files.npx tsc --noEmit -p tsconfig.node.json-> exit 0.V1 Handoff
Ready for factory-verify live checks:
[factory-e2e]issue dispatches once, backs off on error, and does not storm.factory kill-loopstops via SIGTERM.Guardrails
src/mainchanges.