Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions packages/factory-sdk/src/orchestrator/factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,90 @@ describe('FactoryLoop', () => {
}
})

it('start live writes and refreshes a running loop heartbeat, then marks stopping on stop', async () => {
const root = await mkdtemp(join(tmpdir(), 'factory-live-heartbeat-'))
const heartbeatPath = join(root, 'heartbeat.json')
const registryPath = join(root, 'registry.json')
try {
const clock = new ManualClock()
const factory = createFactory(config({
loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 1_000 },
}), {
mount: new FakeMountClient(),
fleet: new FakeFleetClient(),
triage: new StaticTriage(),
clock,
})

await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } })

const initial = await readFactoryLoopHeartbeat(heartbeatPath)
expect(initial).toMatchObject({
status: 'running',
iteration: 0,
maxIterations: 0,
updatedAtMs: 0,
registryPath,
})
expect(checkFactoryLoopLiveness(initial, { nowMs: 900, staleMs: 1_000 })).toMatchObject({
ok: true,
stale: false,
})

clock.advance(500)
await new Promise((resolve) => setTimeout(resolve, 650))

const refreshed = await readFactoryLoopHeartbeat(heartbeatPath)
expect(refreshed).toMatchObject({
status: 'running',
updatedAtMs: 500,
})

await factory.stop()

const stopped = await readFactoryLoopHeartbeat(heartbeatPath)
expect(stopped).toMatchObject({
status: 'stopping',
updatedAtMs: 500,
})
} finally {
await rm(root, { recursive: true, force: true })
}
})

it('start live marks the heartbeat stopping before releasing in-flight agents', async () => {
const root = await mkdtemp(join(tmpdir(), 'factory-live-heartbeat-stop-order-'))
const heartbeatPath = join(root, 'heartbeat.json')
const registryPath = join(root, 'registry.json')
try {
const mount = new FakeMountClient({ [issuePath(62)]: issueFile(62) })
const heartbeatStatusesAtRelease: string[] = []
class HeartbeatObservingFleetClient extends FakeFleetClient {
override async release(name: string, reason?: string): Promise<void> {
heartbeatStatusesAtRelease.push((await readFactoryLoopHeartbeat(heartbeatPath))?.status ?? 'missing')
await super.release(name, reason)
}
}
const fleet = new HeartbeatObservingFleetClient()
const factory = createFactory(config({
loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 10_000 },
}), {
mount,
fleet,
triage: new StaticTriage(),
})

await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } })
await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(62), issueFile(62))))
await factory.stop()

expect(heartbeatStatusesAtRelease).toEqual(['stopping', 'stopping'])
expect((await readFactoryInFlightRegistry(registryPath))?.agents).toEqual([])
} finally {
await rm(root, { recursive: true, force: true })
}
})

it('writes a durable in-flight registry with agent PID identity signatures', async () => {
const root = await mkdtemp(join(tmpdir(), 'factory-loop-registry-'))
const heartbeatPath = join(root, 'heartbeat.json')
Expand Down
72 changes: 72 additions & 0 deletions packages/factory-sdk/src/orchestrator/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ const STOP_TEARDOWN_TIMEOUT_MS = 2_500
const MERGE_GATE_MAX_ATTEMPTS = 12
const MERGE_GATE_POLL_DELAY_MS = 10_000
const DISPATCH_FAILURE_HANDOFF_UNRESOLVED_TTL_MS = 5 * 60_000
const DEFAULT_LIVE_HEARTBEAT_INTERVAL_MS = 15_000
export const DEFAULT_FACTORY_LOOP_HEARTBEAT_PATH = '/tmp/factory-run/factory-loop-heartbeat.json'
export const DEFAULT_FACTORY_LOOP_REGISTRY_PATH = '/tmp/factory-run/factory-loop-registry.json'

Expand Down Expand Up @@ -145,6 +146,10 @@ export class FactoryLoop implements Factory {
#liveEventHighWatermark?: string
#liveConnectStartedAtMs = 0
#liveReplaySkewMarginMs = 0
#liveHeartbeatTimer?: ReturnType<typeof setTimeout>
#liveHeartbeatActive = false
#liveHeartbeatInFlight = false
#liveHeartbeatRefresh?: Promise<void>
readonly #seenLiveEvents = new Set<string>()
#offAgentExit?: () => void
#offDeliveryFailed?: () => void
Expand Down Expand Up @@ -216,6 +221,7 @@ export class FactoryLoop implements Factory {
return
} catch (error) {
this.#started = false
await this.#stopLiveHeartbeat('stopping')
throw error
}
}
Expand All @@ -230,6 +236,7 @@ export class FactoryLoop implements Factory {
async stop(): Promise<void> {
this.#started = false
this.#stopping = true
await this.#stopLiveHeartbeat('stopping')
await this.#releaseInFlightAgents('factory-stopped')
if (this.#livePollTimer) clearTimeout(this.#livePollTimer)
this.#livePollTimer = undefined
Expand Down Expand Up @@ -280,6 +287,7 @@ export class FactoryLoop implements Factory {

async #startLiveSubscription(overrides: Partial<FactoryLiveSubscriptionOptions> = {}): Promise<void> {
const options = this.#liveOptions(overrides)
await this.#startLiveHeartbeat()
this.#liveConnectStartedAtMs = this.#clock.now()
this.#liveReplaySkewMarginMs = options.replaySkewMarginMs
this.#liveEventHighWatermark = await this.#currentEventHighWatermark()
Expand All @@ -302,6 +310,67 @@ export class FactoryLoop implements Factory {
}
}

async #startLiveHeartbeat(): Promise<void> {
this.#liveHeartbeatActive = true
await this.#writeLiveHeartbeat('running')
this.#scheduleLiveHeartbeatRefresh()
}

async #stopLiveHeartbeat(status: FactoryLoopHeartbeat['status']): Promise<void> {
if (!this.#liveHeartbeatActive && !this.#liveHeartbeatTimer) {
return
}
this.#liveHeartbeatActive = false
if (this.#liveHeartbeatTimer) {
clearTimeout(this.#liveHeartbeatTimer)
this.#liveHeartbeatTimer = undefined
}
await this.#liveHeartbeatRefresh
await this.#writeLiveHeartbeat(status)
}

#scheduleLiveHeartbeatRefresh(): void {
if (!this.#liveHeartbeatActive || this.#liveHeartbeatTimer) {
return
}
// This heartbeat proves daemon process liveness for the external crash reaper.
// MountClient subscriptions do not expose connected/keepalive state here, so
// subscription-wedge detection remains a separate watchdog concern.
this.#liveHeartbeatTimer = setTimeout(() => {
this.#liveHeartbeatTimer = undefined
this.#liveHeartbeatRefresh = this.#refreshLiveHeartbeat()
.finally(() => {
this.#liveHeartbeatRefresh = undefined
})
}, liveHeartbeatIntervalMs(this.#config.loop.heartbeatStaleMs))
this.#liveHeartbeatTimer.unref?.()
}

async #refreshLiveHeartbeat(): Promise<void> {
if (!this.#liveHeartbeatActive || this.#liveHeartbeatInFlight) {
return
}
this.#liveHeartbeatInFlight = true
try {
await this.#writeLiveHeartbeat('running')
} catch (error) {
this.#logger.warn?.('[factory] failed to refresh live daemon heartbeat', error)
} finally {
this.#liveHeartbeatInFlight = false
this.#scheduleLiveHeartbeatRefresh()
}
}

async #writeLiveHeartbeat(status: FactoryLoopHeartbeat['status']): Promise<void> {
await this.#writeLoopHeartbeat(
this.#config.loop.heartbeatPath,
this.#config.loop.registryPath,
status,
0,
0,
)
}
Comment on lines +364 to +372

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Every time #writeLiveHeartbeat is called (which happens every 15 seconds by default via the live heartbeat refresh timer), it calls #writeLoopHeartbeat. #writeLoopHeartbeat in turn calls #writeInFlightRegistry, which performs process lookups (#processFinder) and potentially fleet API calls (resolveAgentPid) for every active agent.

This introduces a significant performance and efficiency bottleneck for the live daemon, causing redundant CPU usage and unnecessary API spam every 15 seconds. Since the in-flight registry is already written and kept in sync whenever agents are spawned, exited, or stopped, we can safely write the heartbeat file directly in #writeLiveHeartbeat without rewriting the registry.

  async #writeLiveHeartbeat(status: FactoryLoopHeartbeat['status']): Promise<void> {
    const path = this.#config.loop.heartbeatPath
    const updatedAtMs = this.#clock.now()
    const heartbeat: FactoryLoopHeartbeat = {
      pid: process.pid,
      status,
      iteration: 0,
      maxIterations: 0,
      updatedAt: new Date(updatedAtMs).toISOString(),
      updatedAtMs,
      registryPath: this.#config.loop.registryPath,
    }
    await mkdir(dirname(path), { recursive: true })
    await writeFile(path, `${JSON.stringify(heartbeat, null, 2)}\n`, 'utf8')
  }


#liveOptions(overrides: Partial<FactoryLiveSubscriptionOptions>): FactoryLiveSubscriptionOptions {
return {
transport: overrides.transport ?? this.#config.liveSubscription.transport,
Expand Down Expand Up @@ -2139,6 +2208,9 @@ const stringValue = (value: unknown): string | undefined => typeof value === 'st
const stateNameToId = (name: string | undefined): string | undefined =>
name ? STATE_NAME_TO_ID[name] : undefined

const liveHeartbeatIntervalMs = (staleMs: number): number =>
Math.min(DEFAULT_LIVE_HEARTBEAT_INTERVAL_MS, Math.max(500, Math.floor(staleMs / 4)))

const installFactoryDraftPredicate = (mount: MountClient, config: FactoryConfig): void => {
mount.setDefaultAllowedDraftPredicate?.((path, content, opts) =>
isAllowedFactoryDraft(path, content, opts, mount, config))
Expand Down
Loading