diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 134f64fd..4f3733b7 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -885,6 +885,90 @@ describe('FactoryLoop', () => { } }) + it('start live writes and refreshes a running loop heartbeat, then marks stopping on stop', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-live-heartbeat-')) + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + try { + const clock = new ManualClock() + const factory = createFactory(config({ + loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 1_000 }, + }), { + mount: new FakeMountClient(), + fleet: new FakeFleetClient(), + triage: new StaticTriage(), + clock, + }) + + await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } }) + + const initial = await readFactoryLoopHeartbeat(heartbeatPath) + expect(initial).toMatchObject({ + status: 'running', + iteration: 0, + maxIterations: 0, + updatedAtMs: 0, + registryPath, + }) + expect(checkFactoryLoopLiveness(initial, { nowMs: 900, staleMs: 1_000 })).toMatchObject({ + ok: true, + stale: false, + }) + + clock.advance(500) + await new Promise((resolve) => setTimeout(resolve, 650)) + + const refreshed = await readFactoryLoopHeartbeat(heartbeatPath) + expect(refreshed).toMatchObject({ + status: 'running', + updatedAtMs: 500, + }) + + await factory.stop() + + const stopped = await readFactoryLoopHeartbeat(heartbeatPath) + expect(stopped).toMatchObject({ + status: 'stopping', + updatedAtMs: 500, + }) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + + it('start live marks the heartbeat stopping before releasing in-flight agents', async () => { + const root = await mkdtemp(join(tmpdir(), 'factory-live-heartbeat-stop-order-')) + const heartbeatPath = join(root, 'heartbeat.json') + const registryPath = join(root, 'registry.json') + try { + const mount = new FakeMountClient({ [issuePath(62)]: issueFile(62) }) + const heartbeatStatusesAtRelease: string[] = [] + class HeartbeatObservingFleetClient extends FakeFleetClient { + override async release(name: string, reason?: string): Promise { + heartbeatStatusesAtRelease.push((await readFactoryLoopHeartbeat(heartbeatPath))?.status ?? 'missing') + await super.release(name, reason) + } + } + const fleet = new HeartbeatObservingFleetClient() + const factory = createFactory(config({ + loop: { maxIterations: 1, heartbeatPath, registryPath, heartbeatStaleMs: 10_000 }, + }), { + mount, + fleet, + triage: new StaticTriage(), + }) + + await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } }) + await factory.dispatch(await factory.triageIssue(parseLinearIssue(issuePath(62), issueFile(62)))) + await factory.stop() + + expect(heartbeatStatusesAtRelease).toEqual(['stopping', 'stopping']) + expect((await readFactoryInFlightRegistry(registryPath))?.agents).toEqual([]) + } finally { + await rm(root, { recursive: true, force: true }) + } + }) + it('writes a durable in-flight registry with agent PID identity signatures', async () => { const root = await mkdtemp(join(tmpdir(), 'factory-loop-registry-')) const heartbeatPath = join(root, 'heartbeat.json') diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 5175f9ff..c702a158 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -93,6 +93,7 @@ const STOP_TEARDOWN_TIMEOUT_MS = 2_500 const MERGE_GATE_MAX_ATTEMPTS = 12 const MERGE_GATE_POLL_DELAY_MS = 10_000 const DISPATCH_FAILURE_HANDOFF_UNRESOLVED_TTL_MS = 5 * 60_000 +const DEFAULT_LIVE_HEARTBEAT_INTERVAL_MS = 15_000 export const DEFAULT_FACTORY_LOOP_HEARTBEAT_PATH = '/tmp/factory-run/factory-loop-heartbeat.json' export const DEFAULT_FACTORY_LOOP_REGISTRY_PATH = '/tmp/factory-run/factory-loop-registry.json' @@ -145,6 +146,10 @@ export class FactoryLoop implements Factory { #liveEventHighWatermark?: string #liveConnectStartedAtMs = 0 #liveReplaySkewMarginMs = 0 + #liveHeartbeatTimer?: ReturnType + #liveHeartbeatActive = false + #liveHeartbeatInFlight = false + #liveHeartbeatRefresh?: Promise readonly #seenLiveEvents = new Set() #offAgentExit?: () => void #offDeliveryFailed?: () => void @@ -216,6 +221,7 @@ export class FactoryLoop implements Factory { return } catch (error) { this.#started = false + await this.#stopLiveHeartbeat('stopping') throw error } } @@ -230,6 +236,7 @@ export class FactoryLoop implements Factory { async stop(): Promise { this.#started = false this.#stopping = true + await this.#stopLiveHeartbeat('stopping') await this.#releaseInFlightAgents('factory-stopped') if (this.#livePollTimer) clearTimeout(this.#livePollTimer) this.#livePollTimer = undefined @@ -280,6 +287,7 @@ export class FactoryLoop implements Factory { async #startLiveSubscription(overrides: Partial = {}): Promise { const options = this.#liveOptions(overrides) + await this.#startLiveHeartbeat() this.#liveConnectStartedAtMs = this.#clock.now() this.#liveReplaySkewMarginMs = options.replaySkewMarginMs this.#liveEventHighWatermark = await this.#currentEventHighWatermark() @@ -302,6 +310,67 @@ export class FactoryLoop implements Factory { } } + async #startLiveHeartbeat(): Promise { + this.#liveHeartbeatActive = true + await this.#writeLiveHeartbeat('running') + this.#scheduleLiveHeartbeatRefresh() + } + + async #stopLiveHeartbeat(status: FactoryLoopHeartbeat['status']): Promise { + if (!this.#liveHeartbeatActive && !this.#liveHeartbeatTimer) { + return + } + this.#liveHeartbeatActive = false + if (this.#liveHeartbeatTimer) { + clearTimeout(this.#liveHeartbeatTimer) + this.#liveHeartbeatTimer = undefined + } + await this.#liveHeartbeatRefresh + await this.#writeLiveHeartbeat(status) + } + + #scheduleLiveHeartbeatRefresh(): void { + if (!this.#liveHeartbeatActive || this.#liveHeartbeatTimer) { + return + } + // This heartbeat proves daemon process liveness for the external crash reaper. + // MountClient subscriptions do not expose connected/keepalive state here, so + // subscription-wedge detection remains a separate watchdog concern. + this.#liveHeartbeatTimer = setTimeout(() => { + this.#liveHeartbeatTimer = undefined + this.#liveHeartbeatRefresh = this.#refreshLiveHeartbeat() + .finally(() => { + this.#liveHeartbeatRefresh = undefined + }) + }, liveHeartbeatIntervalMs(this.#config.loop.heartbeatStaleMs)) + this.#liveHeartbeatTimer.unref?.() + } + + async #refreshLiveHeartbeat(): Promise { + if (!this.#liveHeartbeatActive || this.#liveHeartbeatInFlight) { + return + } + this.#liveHeartbeatInFlight = true + try { + await this.#writeLiveHeartbeat('running') + } catch (error) { + this.#logger.warn?.('[factory] failed to refresh live daemon heartbeat', error) + } finally { + this.#liveHeartbeatInFlight = false + this.#scheduleLiveHeartbeatRefresh() + } + } + + async #writeLiveHeartbeat(status: FactoryLoopHeartbeat['status']): Promise { + await this.#writeLoopHeartbeat( + this.#config.loop.heartbeatPath, + this.#config.loop.registryPath, + status, + 0, + 0, + ) + } + #liveOptions(overrides: Partial): FactoryLiveSubscriptionOptions { return { transport: overrides.transport ?? this.#config.liveSubscription.transport, @@ -2139,6 +2208,9 @@ const stringValue = (value: unknown): string | undefined => typeof value === 'st const stateNameToId = (name: string | undefined): string | undefined => name ? STATE_NAME_TO_ID[name] : undefined +const liveHeartbeatIntervalMs = (staleMs: number): number => + Math.min(DEFAULT_LIVE_HEARTBEAT_INTERVAL_MS, Math.max(500, Math.floor(staleMs / 4))) + const installFactoryDraftPredicate = (mount: MountClient, config: FactoryConfig): void => { mount.setDefaultAllowedDraftPredicate?.((path, content, opts) => isAllowedFactoryDraft(path, content, opts, mount, config))