diff --git a/packages/factory-sdk/src/cli/fleet.ts b/packages/factory-sdk/src/cli/fleet.ts index 9a6dcf08..346b5ca6 100644 --- a/packages/factory-sdk/src/cli/fleet.ts +++ b/packages/factory-sdk/src/cli/fleet.ts @@ -193,7 +193,7 @@ async function runFactoryCommand( return 0 } - await factory.start() + await factory.start({ mode: 'live' }) writeJson(out, factory.status()) await waitForShutdown(factory) return 0 diff --git a/packages/factory-sdk/src/config/schema.ts b/packages/factory-sdk/src/config/schema.ts index a8a28f79..a7af7b4f 100644 --- a/packages/factory-sdk/src/config/schema.ts +++ b/packages/factory-sdk/src/config/schema.ts @@ -10,6 +10,12 @@ export const FactoryConfigSchema = z.object({ labels: z.array(z.string()).default([]), assignees: z.array(z.string()).default([]), }).default({}), + liveSubscription: z.object({ + transport: z.enum(['subscribe-and-poll', 'subscribe', 'poll']).default('subscribe-and-poll'), + pollIntervalMs: z.number().int().min(50).default(5_000), + eventLimit: z.number().int().min(1).max(1_000).default(1_000), + replaySkewMarginMs: z.number().int().min(0).default(60_000), + }).default({}), repos: z.object({ byLabel: z.record(z.string(), z.string()), byProject: z.record(z.string(), z.string()).default({}), diff --git a/packages/factory-sdk/src/index.ts b/packages/factory-sdk/src/index.ts index 39bbc907..47a0e35d 100644 --- a/packages/factory-sdk/src/index.ts +++ b/packages/factory-sdk/src/index.ts @@ -56,7 +56,7 @@ export type { GithubMergeGatePort, GithubMergeGateVerdict, } from './github' -export { BatchTracker, createFactory, FactoryLoop, issueKey, parseLinearIssue } from './orchestrator' +export { BatchTracker, createFactory, FactoryLoop, issueKey, isRealLinearIssue, parseLinearIssue } from './orchestrator' export type { InFlightIssue, QueuedIssue, TrackedAgent } from './orchestrator' export { HeuristicTriage, @@ -154,7 +154,9 @@ export type { DispatchResult, Factory, FactoryEventPayload, + FactoryLiveSubscriptionOptions, FactoryPorts, + FactoryStartOptions, FactoryStatus, IssueRef, IterationReport, diff --git a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.test.ts b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.test.ts index 10a84ddb..48ebb30a 100644 --- a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.test.ts +++ b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, vi } from 'vitest' -import type { OperationStatusResponse } from '@relayfile/sdk' +import type { ChangeEvent, OperationStatusResponse } from '@relayfile/sdk' import { RelayfileCloudMountClient, type RelayFileClientLike } from './relayfile-cloud-mount-client' @@ -91,6 +91,25 @@ class FakeRelayFileClient implements RelayFileClientLike { return { events: this.events, nextCursor: null } } + async listLastNChanges(_limit: number, _context?: { workspaceId: string }): Promise<{ events: ChangeEvent[] }> { + return { + events: this.events.map((event) => ({ + id: event.eventId, + workspace: 'rw_test', + type: 'relayfile.changed' as const, + occurredAt: event.timestamp, + resource: { + path: event.path, + kind: 'file', + id: event.path, + provider: 'linear', + }, + summary: {}, + expand: async () => ({ level: 'summary' as const, path: event.path, summary: {} }), + }) as unknown as ChangeEvent), + } + } + async getResourceAtEvent(eventId: string, context?: { workspaceId: string }) { return { path: `/events/${eventId}.json`, data: context, digest: eventId } } @@ -145,6 +164,21 @@ describe('RelayfileCloudMountClient', () => { expect(fake.getEventsCalls[0]).toEqual({ workspaceId: 'rw_test', opts: { cursor: 'evt-0', limit: 10 } }) }) + it('selects the numeric event high-watermark instead of lexicographic max', async () => { + const fake = new FakeRelayFileClient() + fake.events = ['7', '8', '9', '10', '11'].map((eventId) => ({ + eventId, + type: 'file.updated' as const, + path: `/linear/issues/AR-${eventId}.json`, + revision: eventId, + timestamp: '2026-01-01T00:00:00.000Z', + })) + const mount = new RelayfileCloudMountClient({ workspaceId: 'rw_test', client: fake }) + + await expect(mount.getEventHighWatermark()).resolves.toBe('11') + expect(fake.getEventsCalls).toEqual([]) + }) + it('writes through the RelayFileClient with workspace id and live baseRevision', async () => { const fake = new FakeRelayFileClient() fake.files.set('/linear/issues/AR-1.json', { diff --git a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts index aeba3eda..c7d39d65 100644 --- a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts +++ b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts @@ -13,14 +13,13 @@ import { type RelayfileCloudTokenSet, type ResourceAtEventResult, type OperationStatusResponse, - type SubscribeOptions, type Subscription, type TreeResponse, type WriteFileInput, type WriteQueuedResponse, } from '@relayfile/sdk' -import type { EventPage, MountClient } from '../ports' +import type { EventPage, MountClient, SubscribeOptions } from '../ports' import { createWorkspaceScopedEventClient, type RelayfileEventClient, @@ -53,6 +52,7 @@ export type RelayFileClientLike = deleteFile(input: DeleteFileInput): Promise listTree(workspaceId: string, options?: ListTreeOptions): Promise getEvents(workspaceId: string, options?: GetEventsOptions): Promise + listLastNChanges?(limit: number, context?: { workspaceId: string; token?: string }): Promise<{ events: ChangeEvent[] }> getResourceAtEvent(eventId: string, context?: { workspaceId: string; token?: string }): Promise getOp?(workspaceId: string, opId: string): Promise getToken?(): Promise | string @@ -230,6 +230,15 @@ export class RelayfileCloudMountClient implements MountClient { } } + async getEventHighWatermark(opts: { provider?: string } = {}): Promise { + if (!this.#client.listLastNChanges) return undefined + const response = await this.#client.listLastNChanges(10, { workspaceId: this.workspaceId }) + const events = opts.provider + ? response.events.filter((event) => event.resource.provider === opts.provider) + : response.events + return maxEventId(events.map((event) => event.id)) + } + async confirmWrite( path: string, opts: { timeoutMs?: number } = {}, @@ -347,3 +356,31 @@ const errorMessage = (error: unknown): string => error instanceof Error ? error.message : String(error) const sleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)) + +const maxEventId = (ids: string[]): string | undefined => { + let max: string | undefined + for (const id of ids) { + if (!max || compareEventIds(id, max) > 0) { + max = id + } + } + return max +} + +const compareEventIds = (left: string, right: string): number => { + const leftSequence = eventSequenceNumber(left) + const rightSequence = eventSequenceNumber(right) + if (leftSequence !== undefined && rightSequence !== undefined) { + return leftSequence - rightSequence + } + return left.localeCompare(right) +} + +const eventSequenceNumber = (eventId: string): number | undefined => { + const whole = Number(eventId) + if (Number.isFinite(whole)) return whole + const trailing = eventId.match(/(\d+)$/u)?.[1] + if (!trailing) return undefined + const parsed = Number(trailing) + return Number.isFinite(parsed) ? parsed : undefined +} diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 39a585d8..cf738ad6 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -1,7 +1,7 @@ -import { describe, expect, it } from 'vitest' +import { describe, expect, it, vi } from 'vitest' import { FactoryConfigSchema, createFactory, parseLinearIssue, type FactoryConfig, type TriageDecision, type TriageEngine } from '../index' -import type { ChangeEvent, LinearWriteback, SlackWriteback } from '../ports' +import type { ChangeEvent, EventPage, LinearWriteback, SlackWriteback } from '../ports' import { FakeFleetClient, FakeMountClient } from '../testing' import type { CloseProbePrInput, LinearIssue } from '../index' import { BatchTracker } from './batch-tracker' @@ -43,6 +43,15 @@ const issueFile = (n: number, stateId = ready) => ({ payload: issuePayload(n, stateId), }) +const realIssueFile = (n: number, stateId = ready, overrides: Record = {}) => ({ + ...issueFile(n, stateId), + payload: { + ...issuePayload(n, stateId), + url: `https://linear.app/agent-relay/issue/AR-${n}/factory-issue-${n}`, + ...overrides, + }, +}) + const flush = async () => { await new Promise((resolve) => setTimeout(resolve, 0)) } @@ -90,6 +99,27 @@ class CountingTriage extends StaticTriage { } } +class CountingEventsMount extends FakeMountClient { + getEventsCalls = 0 + + override async getEvents(opts: { cursor?: string; limit?: number }): Promise { + this.getEventsCalls += 1 + return super.getEvents(opts) + } +} + +class NoWatermarkMount extends FakeMountClient { + override async getEventHighWatermark(): Promise { + return undefined + } +} + +class ThrowingWatermarkMount extends FakeMountClient { + override async getEventHighWatermark(): Promise { + throw Object.assign(new Error('Route not found'), { status: 404 }) + } +} + describe('FactoryLoop', () => { it('parses wrapped Linear issue records', () => { expect(parseLinearIssue(issuePath(1), issueFile(1))).toMatchObject({ @@ -368,6 +398,179 @@ describe('FactoryLoop', () => { await factory.stop() }) + it('live subscription dispatches a newly-arrived in-scope ready issue from subscribe events', async () => { + const path = issuePath(25) + const mount = new FakeMountClient() + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + + await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } }) + mount.files.set(path, { content: realIssueFile(25) }) + mount.emit(changeEvent(path, 'event-live-25')) + await flush() + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-25-impl', 'ar-25-review']) + expect(factory.status().counters.liveEvents).toBe(1) + expect(factory.status().counters.liveArrivalLatencyMsLast).toBeGreaterThanOrEqual(0) + await factory.stop() + }) + + it('live subscription default uses subscribe without draining getEvents at startup', async () => { + const path = issuePath(33) + const mount = new CountingEventsMount() + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + + await factory.start({ mode: 'live' }) + + expect(mount.getEventsCalls).toBe(0) + + mount.files.set(path, { content: realIssueFile(33) }) + mount.emit(changeEvent(path, 'event-live-default-33')) + await flush() + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-33-impl', 'ar-33-review']) + await factory.stop() + }) + + it('live subscription default suppresses replayed pre-connect ready issues and accepts new arrivals', async () => { + const replayPath = issuePath(34) + const tipPath = issuePath(36) + const newPath = issuePath(35) + const mount = new CountingEventsMount({ + [replayPath]: realIssueFile(34), + [tipPath]: realIssueFile(36), + }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + mount.emit(changeEvent(replayPath, '9')) + mount.emit(changeEvent(tipPath, '10')) + + await factory.start({ mode: 'live' }) + + expect(mount.getEventsCalls).toBe(0) + + mount.emit(changeEvent(replayPath, '9')) + await flush() + + expect(fleet.spawns).toEqual([]) + expect(factory.status().counters.liveReplayEventsSuppressed).toBe(1) + + mount.files.set(newPath, { content: realIssueFile(35) }) + mount.emit(changeEvent(newPath, '100')) + await flush() + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-35-impl', 'ar-35-review']) + await factory.stop() + }) + + it('live subscription suppresses old replay by time when high-watermark is unavailable', async () => { + const replayPath = issuePath(37) + const freshPath = issuePath(38) + const skewPath = issuePath(39) + const mount = new ThrowingWatermarkMount({ + [replayPath]: realIssueFile(37), + [freshPath]: realIssueFile(38), + [skewPath]: realIssueFile(39), + }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + + await factory.start({ mode: 'live', liveSubscription: { replaySkewMarginMs: 60_000 } }) + + mount.emit(changeEvent(replayPath, '201', new Date(Date.now() - 5 * 60_000).toISOString())) + await flush() + + expect(fleet.spawns).toEqual([]) + expect(factory.status().counters.liveReplayEventsSuppressedByTime).toBe(1) + + mount.emit(changeEvent(skewPath, '202', new Date(Date.now() - 1_000).toISOString())) + await flush() + + mount.emit(changeEvent(freshPath, '203', new Date(Date.now()).toISOString())) + await flush() + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual([ + 'ar-39-impl', + 'ar-39-review', + 'ar-38-impl', + 'ar-38-review', + ]) + await factory.stop() + }) + + it('live subscription ignores out-of-scope, non-ready, and non-real issue arrivals', async () => { + const mount = new FakeMountClient() + const fleet = new FakeFleetClient() + const triage = new CountingTriage() + const factory = createFactory(config(), { mount, fleet, triage }) + const cases = [ + { n: 26, content: realIssueFile(26, ready, { team: { key: 'OTHER', name: 'Other' } }) }, + { n: 27, content: realIssueFile(27, ready, { title: 'Real AR issue without synthetic marker' }) }, + { n: 28, content: realIssueFile(28, implementing) }, + { n: 29, content: realIssueFile(29, ready, { url: undefined }) }, + ] + + await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } }) + for (const entry of cases) { + const path = issuePath(entry.n) + mount.files.set(path, { content: entry.content }) + mount.emit(changeEvent(path, `event-live-${entry.n}`)) + } + await flush() + + expect(triage.count).toBe(0) + expect(fleet.spawns).toEqual([]) + await factory.stop() + }) + + it('live subscription starts from the current event cursor and does not replay pre-start history', async () => { + vi.useFakeTimers() + try { + const oldPath = issuePath(30) + const newPath = issuePath(31) + const mount = new NoWatermarkMount({ [oldPath]: realIssueFile(30) }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + mount.emit(changeEvent(oldPath, 'event-before-start-30', new Date(Date.now() + 1_000).toISOString())) + + await factory.start({ mode: 'live', liveSubscription: { transport: 'poll', pollIntervalMs: 10 } }) + await vi.advanceTimersByTimeAsync(0) + + expect(fleet.spawns).toEqual([]) + + mount.files.set(newPath, { content: realIssueFile(31) }) + mount.emit(changeEvent(newPath, 'event-after-start-31')) + await vi.advanceTimersByTimeAsync(10) + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-31-impl', 'ar-31-review']) + await factory.stop() + } finally { + vi.useRealTimers() + } + }) + + it('live subscription dispatches a newly-arrived in-scope ready issue from getEvents polling', async () => { + vi.useFakeTimers() + try { + const path = issuePath(32) + const mount = new FakeMountClient() + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + + await factory.start({ mode: 'live', liveSubscription: { transport: 'poll', pollIntervalMs: 10 } }) + await vi.advanceTimersByTimeAsync(0) + mount.files.set(path, { content: realIssueFile(32) }) + mount.emit(changeEvent(path, 'event-live-poll-32')) + await vi.advanceTimersByTimeAsync(10) + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-32-impl', 'ar-32-review']) + await factory.stop() + } finally { + vi.useRealTimers() + } + }) + it('BatchTracker blocks duplicate invocation ids within and across issue records', async () => { const tracker = new BatchTracker(5) const decisionA = await new StaticTriage().triage(parseLinearIssue(issuePath(12), issueFile(12))) @@ -692,11 +895,11 @@ describe('FactoryLoop', () => { }) }) -const changeEvent = (path: string, id: string) => ({ +const changeEvent = (path: string, id: string, occurredAt = new Date().toISOString()) => ({ id, workspace: 'factory-test', type: 'relayfile.changed', - occurredAt: new Date(0).toISOString(), + occurredAt, resource: { path, kind: 'file', diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 54b8dcfc..d90399c1 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -1,7 +1,7 @@ import { FactoryConfigSchema, type FactoryConfig } from '../config/schema' import { LINEAR_STATE_IDS, linearByStatePath } from '../constants/linear' import { GithubMergeGate, closeProbePr, type GithubMergeGate as GithubMergeGatePort } from '../github' -import type { AgentSpec, FleetClient, LinearWriteback, MountClient, SlackWriteback, Subscription } from '../ports' +import type { AgentSpec, ChangeEvent, FleetClient, LinearWriteback, MountClient, SlackWriteback, Subscription } from '../ports' import type { Clock, Logger } from '../ports/system' import { isInFactoryScope } from '../safety/factory-scope' import { HeuristicTriage, TieredTriage } from '../triage' @@ -11,6 +11,8 @@ import type { FactoryEventPayload, FactoryPorts, FactoryStatus, + FactoryStartOptions, + FactoryLiveSubscriptionOptions, IssueRef, IterationReport, LinearIssue, @@ -28,6 +30,8 @@ type Listener = (payload: FactoryEventPayload) => void const ISSUE_ROOT = '/linear/issues' const READY_EVENTS_LIMIT = 100 +const LIVE_ISSUE_GLOB = `${ISSUE_ROOT}/**` +const LIVE_DEDUPE_LIMIT = 5_000 const STATE_NAME_TO_ID: Record = { 'Ready for Agent': LINEAR_STATE_IDS.readyForAgent, 'Agent Implementing': LINEAR_STATE_IDS.agentImplementing, @@ -64,6 +68,13 @@ export class FactoryLoop implements Factory { readonly #resumeInFlight = new Map>() readonly #resumedExitKeys = new Set() #subscription?: Subscription + #livePollTimer?: ReturnType + #livePollInFlight = false + #liveEventCursor?: string + #liveEventHighWatermark?: string + #liveConnectStartedAtMs = 0 + #liveReplaySkewMarginMs = 0 + readonly #seenLiveEvents = new Set() #offAgentExit?: () => void #offDeliveryFailed?: () => void #starting?: Promise @@ -90,7 +101,7 @@ export class FactoryLoop implements Factory { this.#wireFleetEvents() } - async start(): Promise { + async start(opts: FactoryStartOptions = {}): Promise { if (this.#started) { return } @@ -99,7 +110,7 @@ export class FactoryLoop implements Factory { return this.#starting } - this.#starting = this.#start() + this.#starting = this.#start(opts) try { await this.#starting } finally { @@ -107,7 +118,7 @@ export class FactoryLoop implements Factory { } } - async #start(): Promise { + async #start(opts: FactoryStartOptions): Promise { const ready = await this.#mount.ensureSubRoot(ISSUE_ROOT, { timeoutMs: 90_000 }) if (ready !== 'ready') { this.#error(new Error(`${ISSUE_ROOT} sub-root is not mounted`)) @@ -116,6 +127,17 @@ export class FactoryLoop implements Factory { this.#wireFleetEvents() + if (opts.mode === 'live') { + this.#started = true + try { + await this.#startLiveSubscription(opts.liveSubscription) + return + } catch (error) { + this.#started = false + throw error + } + } + await this.#backfillReadyIssues() this.#subscription = this.#mount.subscribe([`${ISSUE_ROOT}/**/*.json`], (event) => { void this.#handleChange(event.resource.path) @@ -125,6 +147,9 @@ export class FactoryLoop implements Factory { async stop(): Promise { this.#started = false + if (this.#livePollTimer) clearTimeout(this.#livePollTimer) + this.#livePollTimer = undefined + this.#livePollInFlight = false await this.#subscription?.unsubscribe() this.#subscription = undefined this.#offAgentExit?.() @@ -133,6 +158,140 @@ export class FactoryLoop implements Factory { this.#offDeliveryFailed = undefined } + async #startLiveSubscription(overrides: Partial = {}): Promise { + const options = this.#liveOptions(overrides) + this.#liveConnectStartedAtMs = this.#clock.now() + this.#liveReplaySkewMarginMs = options.replaySkewMarginMs + this.#liveEventHighWatermark = await this.#currentEventHighWatermark() + this.#seenLiveEvents.clear() + this.#logger.info?.('[factory] live subscription starting', { + transport: options.transport, + highWatermark: this.#liveEventHighWatermark, + replaySkewMarginMs: this.#liveReplaySkewMarginMs, + }) + + if (options.transport !== 'poll') { + this.#subscription = this.#mount.subscribe([LIVE_ISSUE_GLOB], (event) => { + void this.#handleLiveChange(event) + }, { from: 'now', coalesce: 'none' }) + } + + if (options.transport === 'poll') { + this.#liveEventCursor = await this.#currentEventCursor(options.eventLimit) + this.#scheduleLivePoll(0, options) + } + } + + #liveOptions(overrides: Partial): FactoryLiveSubscriptionOptions { + return { + transport: overrides.transport ?? this.#config.liveSubscription.transport, + pollIntervalMs: overrides.pollIntervalMs ?? this.#config.liveSubscription.pollIntervalMs, + eventLimit: overrides.eventLimit ?? this.#config.liveSubscription.eventLimit, + replaySkewMarginMs: overrides.replaySkewMarginMs ?? this.#config.liveSubscription.replaySkewMarginMs, + } + } + + async #currentEventCursor(limit: number): Promise { + let cursor: string | undefined + for (;;) { + const page = await this.#mount.getEvents({ cursor, limit }) + cursor = eventCursorAfterPage(cursor, page.events, page.nextCursor) + if (!page.nextCursor) return cursor + } + } + + async #currentEventHighWatermark(): Promise { + try { + return await this.#mount.getEventHighWatermark?.() + } catch (error) { + this.#increment('liveHighWatermarkUnavailable') + this.#logger.warn?.('[factory] live subscription high-watermark unavailable', error) + return undefined + } + } + + #scheduleLivePoll(delayMs: number, options: FactoryLiveSubscriptionOptions): void { + if (this.#livePollTimer || !this.#started) return + this.#livePollTimer = setTimeout(() => { + this.#livePollTimer = undefined + void this.#pollLiveEvents(options).finally(() => { + if (this.#started) this.#scheduleLivePoll(options.pollIntervalMs, options) + }) + }, delayMs) + } + + async #pollLiveEvents(options: FactoryLiveSubscriptionOptions): Promise { + if (this.#livePollInFlight) return + this.#livePollInFlight = true + try { + let cursor = this.#liveEventCursor + for (;;) { + const page = await this.#mount.getEvents({ cursor, limit: options.eventLimit }) + for (const event of page.events) { + await this.#handleLiveChange(event) + } + const nextCursor = eventCursorAfterPage(cursor, page.events, page.nextCursor) + this.#liveEventCursor = nextCursor + if (!page.nextCursor || page.nextCursor === cursor) break + cursor = page.nextCursor + } + } catch (error) { + this.#logger.warn?.('[factory] live subscription poll failed', error) + } finally { + this.#livePollInFlight = false + } + } + + async #handleLiveChange(event: ChangeEvent): Promise { + const path = event.resource.path + if (!isIssueFilePath(path)) { + return + } + + if (isBeforeLiveCutoff(event.occurredAt, this.#liveConnectStartedAtMs, this.#liveReplaySkewMarginMs)) { + this.#increment('liveReplayEventsSuppressed') + this.#increment('liveReplayEventsSuppressedByTime') + this.#logger.debug?.('[factory] suppressed stale live issue event', { + id: event.id, + path, + occurredAt: event.occurredAt, + connectStartedAt: new Date(this.#liveConnectStartedAtMs).toISOString(), + replaySkewMarginMs: this.#liveReplaySkewMarginMs, + }) + return + } + + if (isAtOrBeforeHighWatermark(event.id, this.#liveEventHighWatermark)) { + this.#increment('liveReplayEventsSuppressed') + this.#increment('liveReplayEventsSuppressedByWatermark') + this.#logger.debug?.('[factory] suppressed replayed live issue event', { + id: event.id, + highWatermark: this.#liveEventHighWatermark, + path, + }) + return + } + + const dedupeKey = liveEventDedupeKey(event) + if (dedupeKey) { + if (this.#seenLiveEvents.has(dedupeKey)) { + this.#increment('liveDuplicateEventsSuppressed') + this.#logger.debug?.('[factory] suppressed duplicate live issue event', { + id: event.id, + path, + }) + return + } + rememberLiveEvent(this.#seenLiveEvents, dedupeKey) + } else { + this.#increment('liveEventsMissingIdentity') + this.#logger.warn?.('[factory] live issue event missing stable identity', { path }) + } + + this.#recordArrivalLatency(event) + await this.#handleChange(path, { requireRealIssue: true }) + } + async runOnce(opts: { dryRun?: boolean } = {}): Promise { const dryRun = opts.dryRun ?? this.#config.dryRun const paths = await this.#readyIssuePaths() @@ -307,7 +466,7 @@ export class FactoryLoop implements Factory { } } - async #handleChange(path: string): Promise { + async #handleChange(path: string, opts: { requireRealIssue?: boolean } = {}): Promise { if (!isIssueFilePath(path)) { return } @@ -318,6 +477,10 @@ export class FactoryLoop implements Factory { return } + if (opts.requireRealIssue && !isRealLinearIssue(issue)) { + return + } + if (!isInFactoryScope(issue, this.#config.safety)) { return } @@ -507,6 +670,20 @@ export class FactoryLoop implements Factory { } } + #recordArrivalLatency(event: ChangeEvent): void { + const occurredAt = Date.parse(event.occurredAt) + if (!Number.isFinite(occurredAt)) return + const latencyMs = Math.max(0, this.#clock.now() - occurredAt) + this.#counters.liveEvents = (this.#counters.liveEvents ?? 0) + 1 + this.#counters.liveArrivalLatencyMsLast = latencyMs + this.#counters.liveArrivalLatencyMsMax = Math.max(this.#counters.liveArrivalLatencyMsMax ?? 0, latencyMs) + this.#logger.debug?.('[factory] live issue event latency recorded', { + eventId: event.id, + path: event.resource.path, + latencyMs, + }) + } + async #sendCriticalReviewerMessage(record: InFlightIssue): Promise { if (!this.#fleet.waitForInjected) { return @@ -633,9 +810,7 @@ export function parseLinearIssue(path: string, content: unknown): LinearIssue { } } -const issueRef = (issue: LinearIssue): IssueRef => ({ uuid: issue.uuid, key: issue.key, path: issue.path }) - -const isRealLinearIssue = (issue: LinearIssue): boolean => { +export function isRealLinearIssue(issue: LinearIssue): boolean { const payload = wrappedPayload(issue.raw) const identifier = stringValue(payload.identifier) ?? issue.key return identifier === issue.key && @@ -644,6 +819,8 @@ const isRealLinearIssue = (issue: LinearIssue): boolean => { payload.url.length > 0 } +const issueRef = (issue: LinearIssue): IssueRef => ({ uuid: issue.uuid, key: issue.key, path: issue.path }) + const dispatchComment = (decision: TriageDecision, agents: DispatchResult['agents']): string => [ `Factory dispatch for ${decision.issue.key}`, `Implementers: ${agents.filter((agent) => agent.role === 'implementer').map((agent) => agent.name).join(', ') || 'none'}`, @@ -802,6 +979,69 @@ const scopeIssueFromDraftContent = (content: unknown) => ({ raw: asRecord(content) ?? {}, }) +const eventCursorAfterPage = ( + cursor: string | undefined, + events: ChangeEvent[], + nextCursor?: string | null, +): string | undefined => { + if (nextCursor) return nextCursor + if (events.length === 0) return cursor + const numericCursor = cursor === undefined ? 0 : Number(cursor) + if (Number.isInteger(numericCursor) && numericCursor >= 0) { + return String(numericCursor + events.length) + } + return events.at(-1)?.id ?? cursor +} + +const liveEventDedupeKey = (event: ChangeEvent): string | undefined => { + if (!event.id) return undefined + const resource = asRecord(event.resource) ?? {} + return [ + event.id, + event.type, + event.resource.path, + stringValue(resource.revision) ?? '', + event.digest ?? '', + ].join('\u001f') +} + +const isBeforeLiveCutoff = ( + occurredAt: string, + connectStartedAtMs: number, + skewMarginMs: number, +): boolean => { + const occurredAtMs = Date.parse(occurredAt) + if (!Number.isFinite(occurredAtMs)) return false + return occurredAtMs < connectStartedAtMs - skewMarginMs +} + +const isAtOrBeforeHighWatermark = (eventId: string | undefined, highWatermark: string | undefined): boolean => { + if (!eventId || !highWatermark) return false + if (eventId === highWatermark) return true + const eventSequence = eventSequenceNumber(eventId) + const watermarkSequence = eventSequenceNumber(highWatermark) + if (eventSequence !== undefined && watermarkSequence !== undefined) { + return eventSequence <= watermarkSequence + } + return false +} + +const eventSequenceNumber = (eventId: string): number | undefined => { + const whole = Number(eventId) + if (Number.isFinite(whole)) return whole + const trailing = eventId.match(/(\d+)$/u)?.[1] + if (!trailing) return undefined + const parsed = Number(trailing) + return Number.isFinite(parsed) ? parsed : undefined +} + +const rememberLiveEvent = (seen: Set, key: string): void => { + seen.add(key) + if (seen.size <= LIVE_DEDUPE_LIMIT) return + const oldest = seen.values().next().value + if (oldest) seen.delete(oldest) +} + const recordName = (value: unknown): string | undefined => { if (typeof value === 'string') { return value diff --git a/packages/factory-sdk/src/orchestrator/index.ts b/packages/factory-sdk/src/orchestrator/index.ts index 1bf1e82d..db2e1876 100644 --- a/packages/factory-sdk/src/orchestrator/index.ts +++ b/packages/factory-sdk/src/orchestrator/index.ts @@ -1,3 +1,3 @@ export { BatchTracker, issueKey } from './batch-tracker' export type { InFlightIssue, QueuedIssue, TrackedAgent } from './batch-tracker' -export { createFactory, FactoryLoop, parseLinearIssue } from './factory' +export { createFactory, FactoryLoop, isRealLinearIssue, parseLinearIssue } from './factory' diff --git a/packages/factory-sdk/src/ports/mount.ts b/packages/factory-sdk/src/ports/mount.ts index 8bc6f0bf..5a2ee426 100644 --- a/packages/factory-sdk/src/ports/mount.ts +++ b/packages/factory-sdk/src/ports/mount.ts @@ -1,12 +1,18 @@ import type { ChangeEvent as RelayFileChangeEvent, - SubscribeOptions as RelayFileSubscribeOptions, Subscription as RelayFileSubscription, } from '@relayfile/sdk' export type ChangeEvent = RelayFileChangeEvent -export type SubscribeOptions = RelayFileSubscribeOptions export type Subscription = RelayFileSubscription +export type SubscribeOptions = { + coalesce?: 'none' | 'fire-once' + coalesceMs?: number + pathScope?: string[] + from?: 'now' | 'legacy' + onCoalesced?: () => void + onQueueDepth?: (depth: number) => void +} export interface EventPage { events: ChangeEvent[] @@ -23,6 +29,7 @@ export interface MountClient { listTree(prefix: string): Promise subscribe(globs: string[], onChange: (event: ChangeEvent) => void, opts?: SubscribeOptions): Subscription getEvents(opts: { cursor?: string; limit?: number }): Promise + getEventHighWatermark?(opts?: { provider?: string }): Promise confirmWrite(path: string, opts?: { timeoutMs?: number }): Promise<'acked' | 'pending' | 'failed' | 'timeout'> ensureSubRoot(prefix: string, opts?: { timeoutMs?: number }): Promise<'ready' | 'absent'> } diff --git a/packages/factory-sdk/src/testing/fakes.ts b/packages/factory-sdk/src/testing/fakes.ts index 7079be9f..2b9898c3 100644 --- a/packages/factory-sdk/src/testing/fakes.ts +++ b/packages/factory-sdk/src/testing/fakes.ts @@ -85,6 +85,13 @@ export class FakeMountClient implements MountClient { } } + async getEventHighWatermark(opts: { provider?: string } = {}): Promise { + const events = opts.provider + ? this.#events.filter((event) => event.resource.provider === opts.provider) + : this.#events + return events.at(-1)?.id + } + async confirmWrite(path: string, _opts?: { timeoutMs?: number }): Promise<'acked' | 'pending' | 'failed' | 'timeout'> { return this.#confirmations.get(path) ?? 'acked' } diff --git a/packages/factory-sdk/src/types.ts b/packages/factory-sdk/src/types.ts index 592fe73e..3eba275b 100644 --- a/packages/factory-sdk/src/types.ts +++ b/packages/factory-sdk/src/types.ts @@ -19,7 +19,7 @@ export interface FactoryPorts { } export interface Factory { - start(): Promise + start(opts?: FactoryStartOptions): Promise stop(): Promise runOnce(opts?: { dryRun?: boolean }): Promise triageIssue(issue: LinearIssue): Promise @@ -31,6 +31,18 @@ export interface Factory { ): () => void } +export interface FactoryStartOptions { + mode?: 'backfill-and-subscribe' | 'live' + liveSubscription?: Partial +} + +export interface FactoryLiveSubscriptionOptions { + transport: 'subscribe-and-poll' | 'subscribe' | 'poll' + pollIntervalMs: number + eventLimit: number + replaySkewMarginMs: number +} + export interface LinearIssue { uuid: string key: string