From 63e39f5bee52deaaad18eef84815588bf96e5e1e Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 12 Jun 2026 12:00:47 +0200 Subject: [PATCH 1/5] Add live factory subscription pickup --- packages/factory-sdk/src/cli/fleet.ts | 2 +- packages/factory-sdk/src/config/schema.ts | 5 + packages/factory-sdk/src/index.ts | 4 +- .../src/mount/relayfile-cloud-mount-client.ts | 3 +- .../src/orchestrator/factory.test.ts | 104 +++++++++- .../factory-sdk/src/orchestrator/factory.ts | 184 +++++++++++++++++- .../factory-sdk/src/orchestrator/index.ts | 2 +- packages/factory-sdk/src/ports/mount.ts | 10 +- packages/factory-sdk/src/types.ts | 13 +- 9 files changed, 308 insertions(+), 19 deletions(-) diff --git a/packages/factory-sdk/src/cli/fleet.ts b/packages/factory-sdk/src/cli/fleet.ts index 9a6dcf08..346b5ca6 100644 --- a/packages/factory-sdk/src/cli/fleet.ts +++ b/packages/factory-sdk/src/cli/fleet.ts @@ -193,7 +193,7 @@ async function runFactoryCommand( return 0 } - await factory.start() + await factory.start({ mode: 'live' }) writeJson(out, factory.status()) await waitForShutdown(factory) return 0 diff --git a/packages/factory-sdk/src/config/schema.ts b/packages/factory-sdk/src/config/schema.ts index a8a28f79..8b59f497 100644 --- a/packages/factory-sdk/src/config/schema.ts +++ b/packages/factory-sdk/src/config/schema.ts @@ -10,6 +10,11 @@ export const FactoryConfigSchema = z.object({ labels: z.array(z.string()).default([]), assignees: z.array(z.string()).default([]), }).default({}), + liveSubscription: z.object({ + transport: z.enum(['subscribe-and-poll', 'subscribe', 'poll']).default('subscribe-and-poll'), + pollIntervalMs: z.number().int().min(50).default(5_000), + eventLimit: z.number().int().min(1).max(1_000).default(1_000), + }).default({}), repos: z.object({ byLabel: z.record(z.string(), z.string()), byProject: z.record(z.string(), z.string()).default({}), diff --git a/packages/factory-sdk/src/index.ts b/packages/factory-sdk/src/index.ts index 39bbc907..47a0e35d 100644 --- a/packages/factory-sdk/src/index.ts +++ b/packages/factory-sdk/src/index.ts @@ -56,7 +56,7 @@ export type { GithubMergeGatePort, GithubMergeGateVerdict, } from './github' -export { BatchTracker, createFactory, FactoryLoop, issueKey, parseLinearIssue } from './orchestrator' +export { BatchTracker, createFactory, FactoryLoop, issueKey, isRealLinearIssue, parseLinearIssue } from './orchestrator' export type { InFlightIssue, QueuedIssue, TrackedAgent } from './orchestrator' export { HeuristicTriage, @@ -154,7 +154,9 @@ export type { DispatchResult, Factory, FactoryEventPayload, + FactoryLiveSubscriptionOptions, FactoryPorts, + FactoryStartOptions, FactoryStatus, IssueRef, IterationReport, diff --git a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts index aeba3eda..9e6bd968 100644 --- a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts +++ b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts @@ -13,14 +13,13 @@ import { type RelayfileCloudTokenSet, type ResourceAtEventResult, type OperationStatusResponse, - type SubscribeOptions, type Subscription, type TreeResponse, type WriteFileInput, type WriteQueuedResponse, } from '@relayfile/sdk' -import type { EventPage, MountClient } from '../ports' +import type { EventPage, MountClient, SubscribeOptions } from '../ports' import { createWorkspaceScopedEventClient, type RelayfileEventClient, diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 39a585d8..86aeb184 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from 'vitest' +import { describe, expect, it, vi } from 'vitest' import { FactoryConfigSchema, createFactory, parseLinearIssue, type FactoryConfig, type TriageDecision, type TriageEngine } from '../index' import type { ChangeEvent, LinearWriteback, SlackWriteback } from '../ports' @@ -43,6 +43,15 @@ const issueFile = (n: number, stateId = ready) => ({ payload: issuePayload(n, stateId), }) +const realIssueFile = (n: number, stateId = ready, overrides: Record = {}) => ({ + ...issueFile(n, stateId), + payload: { + ...issuePayload(n, stateId), + url: `https://linear.app/agent-relay/issue/AR-${n}/factory-issue-${n}`, + ...overrides, + }, +}) + const flush = async () => { await new Promise((resolve) => setTimeout(resolve, 0)) } @@ -368,6 +377,95 @@ describe('FactoryLoop', () => { await factory.stop() }) + it('live subscription dispatches a newly-arrived in-scope ready issue from subscribe events', async () => { + const path = issuePath(25) + const mount = new FakeMountClient() + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + + await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } }) + mount.files.set(path, { content: realIssueFile(25) }) + mount.emit(changeEvent(path, 'event-live-25')) + await flush() + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-25-impl', 'ar-25-review']) + expect(factory.status().counters.liveEvents).toBe(1) + expect(factory.status().counters.liveArrivalLatencyMsLast).toBeGreaterThanOrEqual(0) + await factory.stop() + }) + + it('live subscription ignores out-of-scope, non-ready, and non-real issue arrivals', async () => { + const mount = new FakeMountClient() + const fleet = new FakeFleetClient() + const triage = new CountingTriage() + const factory = createFactory(config(), { mount, fleet, triage }) + const cases = [ + { n: 26, content: realIssueFile(26, ready, { team: { key: 'OTHER', name: 'Other' } }) }, + { n: 27, content: realIssueFile(27, ready, { title: 'Real AR issue without synthetic marker' }) }, + { n: 28, content: realIssueFile(28, implementing) }, + { n: 29, content: realIssueFile(29, ready, { url: undefined }) }, + ] + + await factory.start({ mode: 'live', liveSubscription: { transport: 'subscribe' } }) + for (const entry of cases) { + const path = issuePath(entry.n) + mount.files.set(path, { content: entry.content }) + mount.emit(changeEvent(path, `event-live-${entry.n}`)) + } + await flush() + + expect(triage.count).toBe(0) + expect(fleet.spawns).toEqual([]) + await factory.stop() + }) + + it('live subscription starts from the current event cursor and does not replay pre-start history', async () => { + vi.useFakeTimers() + try { + const oldPath = issuePath(30) + const newPath = issuePath(31) + const mount = new FakeMountClient({ [oldPath]: realIssueFile(30) }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + mount.emit(changeEvent(oldPath, 'event-before-start-30', new Date(Date.now() + 1_000).toISOString())) + + await factory.start({ mode: 'live', liveSubscription: { transport: 'poll', pollIntervalMs: 10 } }) + await vi.advanceTimersByTimeAsync(0) + + expect(fleet.spawns).toEqual([]) + + mount.files.set(newPath, { content: realIssueFile(31) }) + mount.emit(changeEvent(newPath, 'event-after-start-31')) + await vi.advanceTimersByTimeAsync(10) + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-31-impl', 'ar-31-review']) + await factory.stop() + } finally { + vi.useRealTimers() + } + }) + + it('live subscription dispatches a newly-arrived in-scope ready issue from getEvents polling', async () => { + vi.useFakeTimers() + try { + const path = issuePath(32) + const mount = new FakeMountClient() + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + + await factory.start({ mode: 'live', liveSubscription: { transport: 'poll', pollIntervalMs: 10 } }) + await vi.advanceTimersByTimeAsync(0) + mount.files.set(path, { content: realIssueFile(32) }) + mount.emit(changeEvent(path, 'event-live-poll-32')) + await vi.advanceTimersByTimeAsync(10) + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-32-impl', 'ar-32-review']) + await factory.stop() + } finally { + vi.useRealTimers() + } + }) + it('BatchTracker blocks duplicate invocation ids within and across issue records', async () => { const tracker = new BatchTracker(5) const decisionA = await new StaticTriage().triage(parseLinearIssue(issuePath(12), issueFile(12))) @@ -692,11 +790,11 @@ describe('FactoryLoop', () => { }) }) -const changeEvent = (path: string, id: string) => ({ +const changeEvent = (path: string, id: string, occurredAt = new Date().toISOString()) => ({ id, workspace: 'factory-test', type: 'relayfile.changed', - occurredAt: new Date(0).toISOString(), + occurredAt, resource: { path, kind: 'file', diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 54b8dcfc..ad7f1c58 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -1,7 +1,7 @@ import { FactoryConfigSchema, type FactoryConfig } from '../config/schema' import { LINEAR_STATE_IDS, linearByStatePath } from '../constants/linear' import { GithubMergeGate, closeProbePr, type GithubMergeGate as GithubMergeGatePort } from '../github' -import type { AgentSpec, FleetClient, LinearWriteback, MountClient, SlackWriteback, Subscription } from '../ports' +import type { AgentSpec, ChangeEvent, FleetClient, LinearWriteback, MountClient, SlackWriteback, Subscription } from '../ports' import type { Clock, Logger } from '../ports/system' import { isInFactoryScope } from '../safety/factory-scope' import { HeuristicTriage, TieredTriage } from '../triage' @@ -11,6 +11,8 @@ import type { FactoryEventPayload, FactoryPorts, FactoryStatus, + FactoryStartOptions, + FactoryLiveSubscriptionOptions, IssueRef, IterationReport, LinearIssue, @@ -28,6 +30,8 @@ type Listener = (payload: FactoryEventPayload) => void const ISSUE_ROOT = '/linear/issues' const READY_EVENTS_LIMIT = 100 +const LIVE_ISSUE_GLOB = `${ISSUE_ROOT}/**` +const LIVE_DEDUPE_LIMIT = 5_000 const STATE_NAME_TO_ID: Record = { 'Ready for Agent': LINEAR_STATE_IDS.readyForAgent, 'Agent Implementing': LINEAR_STATE_IDS.agentImplementing, @@ -64,6 +68,10 @@ export class FactoryLoop implements Factory { readonly #resumeInFlight = new Map>() readonly #resumedExitKeys = new Set() #subscription?: Subscription + #livePollTimer?: ReturnType + #livePollInFlight = false + #liveEventCursor?: string + readonly #seenLiveEvents = new Set() #offAgentExit?: () => void #offDeliveryFailed?: () => void #starting?: Promise @@ -90,7 +98,7 @@ export class FactoryLoop implements Factory { this.#wireFleetEvents() } - async start(): Promise { + async start(opts: FactoryStartOptions = {}): Promise { if (this.#started) { return } @@ -99,7 +107,7 @@ export class FactoryLoop implements Factory { return this.#starting } - this.#starting = this.#start() + this.#starting = this.#start(opts) try { await this.#starting } finally { @@ -107,7 +115,7 @@ export class FactoryLoop implements Factory { } } - async #start(): Promise { + async #start(opts: FactoryStartOptions): Promise { const ready = await this.#mount.ensureSubRoot(ISSUE_ROOT, { timeoutMs: 90_000 }) if (ready !== 'ready') { this.#error(new Error(`${ISSUE_ROOT} sub-root is not mounted`)) @@ -116,6 +124,17 @@ export class FactoryLoop implements Factory { this.#wireFleetEvents() + if (opts.mode === 'live') { + this.#started = true + try { + await this.#startLiveSubscription(opts.liveSubscription) + return + } catch (error) { + this.#started = false + throw error + } + } + await this.#backfillReadyIssues() this.#subscription = this.#mount.subscribe([`${ISSUE_ROOT}/**/*.json`], (event) => { void this.#handleChange(event.resource.path) @@ -125,6 +144,9 @@ export class FactoryLoop implements Factory { async stop(): Promise { this.#started = false + if (this.#livePollTimer) clearTimeout(this.#livePollTimer) + this.#livePollTimer = undefined + this.#livePollInFlight = false await this.#subscription?.unsubscribe() this.#subscription = undefined this.#offAgentExit?.() @@ -133,6 +155,101 @@ export class FactoryLoop implements Factory { this.#offDeliveryFailed = undefined } + async #startLiveSubscription(overrides: Partial = {}): Promise { + const options = this.#liveOptions(overrides) + this.#liveEventCursor = await this.#currentEventCursor(options.eventLimit) + this.#seenLiveEvents.clear() + this.#logger.info?.('[factory] live subscription connected from current event cursor', { + cursor: this.#liveEventCursor, + transport: options.transport, + }) + + if (options.transport !== 'poll') { + this.#subscription = this.#mount.subscribe([LIVE_ISSUE_GLOB], (event) => { + void this.#handleLiveChange(event) + }, { from: 'now', coalesce: 'none' }) + } + + if (options.transport !== 'subscribe') { + this.#scheduleLivePoll(0, options) + } + } + + #liveOptions(overrides: Partial): FactoryLiveSubscriptionOptions { + return { + transport: overrides.transport ?? this.#config.liveSubscription.transport, + pollIntervalMs: overrides.pollIntervalMs ?? this.#config.liveSubscription.pollIntervalMs, + eventLimit: overrides.eventLimit ?? this.#config.liveSubscription.eventLimit, + } + } + + async #currentEventCursor(limit: number): Promise { + let cursor: string | undefined + for (;;) { + const page = await this.#mount.getEvents({ cursor, limit }) + cursor = eventCursorAfterPage(cursor, page.events, page.nextCursor) + if (!page.nextCursor) return cursor + } + } + + #scheduleLivePoll(delayMs: number, options: FactoryLiveSubscriptionOptions): void { + if (this.#livePollTimer || !this.#started) return + this.#livePollTimer = setTimeout(() => { + this.#livePollTimer = undefined + void this.#pollLiveEvents(options).finally(() => { + if (this.#started) this.#scheduleLivePoll(options.pollIntervalMs, options) + }) + }, delayMs) + } + + async #pollLiveEvents(options: FactoryLiveSubscriptionOptions): Promise { + if (this.#livePollInFlight) return + this.#livePollInFlight = true + try { + let cursor = this.#liveEventCursor + for (;;) { + const page = await this.#mount.getEvents({ cursor, limit: options.eventLimit }) + for (const event of page.events) { + await this.#handleLiveChange(event) + } + const nextCursor = eventCursorAfterPage(cursor, page.events, page.nextCursor) + this.#liveEventCursor = nextCursor + if (!page.nextCursor || page.nextCursor === cursor) break + cursor = page.nextCursor + } + } catch (error) { + this.#logger.warn?.('[factory] live subscription poll failed', error) + } finally { + this.#livePollInFlight = false + } + } + + async #handleLiveChange(event: ChangeEvent): Promise { + const path = event.resource.path + if (!isIssueFilePath(path)) { + return + } + + const dedupeKey = liveEventDedupeKey(event) + if (dedupeKey) { + if (this.#seenLiveEvents.has(dedupeKey)) { + this.#increment('liveDuplicateEventsSuppressed') + this.#logger.debug?.('[factory] suppressed duplicate live issue event', { + id: event.id, + path, + }) + return + } + rememberLiveEvent(this.#seenLiveEvents, dedupeKey) + } else { + this.#increment('liveEventsMissingIdentity') + this.#logger.warn?.('[factory] live issue event missing stable identity', { path }) + } + + this.#recordArrivalLatency(event) + await this.#handleChange(path, { requireRealIssue: true }) + } + async runOnce(opts: { dryRun?: boolean } = {}): Promise { const dryRun = opts.dryRun ?? this.#config.dryRun const paths = await this.#readyIssuePaths() @@ -307,7 +424,7 @@ export class FactoryLoop implements Factory { } } - async #handleChange(path: string): Promise { + async #handleChange(path: string, opts: { requireRealIssue?: boolean } = {}): Promise { if (!isIssueFilePath(path)) { return } @@ -318,6 +435,10 @@ export class FactoryLoop implements Factory { return } + if (opts.requireRealIssue && !isRealLinearIssue(issue)) { + return + } + if (!isInFactoryScope(issue, this.#config.safety)) { return } @@ -507,6 +628,20 @@ export class FactoryLoop implements Factory { } } + #recordArrivalLatency(event: ChangeEvent): void { + const occurredAt = Date.parse(event.occurredAt) + if (!Number.isFinite(occurredAt)) return + const latencyMs = Math.max(0, this.#clock.now() - occurredAt) + this.#counters.liveEvents = (this.#counters.liveEvents ?? 0) + 1 + this.#counters.liveArrivalLatencyMsLast = latencyMs + this.#counters.liveArrivalLatencyMsMax = Math.max(this.#counters.liveArrivalLatencyMsMax ?? 0, latencyMs) + this.#logger.debug?.('[factory] live issue event latency recorded', { + eventId: event.id, + path: event.resource.path, + latencyMs, + }) + } + async #sendCriticalReviewerMessage(record: InFlightIssue): Promise { if (!this.#fleet.waitForInjected) { return @@ -633,9 +768,7 @@ export function parseLinearIssue(path: string, content: unknown): LinearIssue { } } -const issueRef = (issue: LinearIssue): IssueRef => ({ uuid: issue.uuid, key: issue.key, path: issue.path }) - -const isRealLinearIssue = (issue: LinearIssue): boolean => { +export function isRealLinearIssue(issue: LinearIssue): boolean { const payload = wrappedPayload(issue.raw) const identifier = stringValue(payload.identifier) ?? issue.key return identifier === issue.key && @@ -644,6 +777,8 @@ const isRealLinearIssue = (issue: LinearIssue): boolean => { payload.url.length > 0 } +const issueRef = (issue: LinearIssue): IssueRef => ({ uuid: issue.uuid, key: issue.key, path: issue.path }) + const dispatchComment = (decision: TriageDecision, agents: DispatchResult['agents']): string => [ `Factory dispatch for ${decision.issue.key}`, `Implementers: ${agents.filter((agent) => agent.role === 'implementer').map((agent) => agent.name).join(', ') || 'none'}`, @@ -802,6 +937,39 @@ const scopeIssueFromDraftContent = (content: unknown) => ({ raw: asRecord(content) ?? {}, }) +const eventCursorAfterPage = ( + cursor: string | undefined, + events: ChangeEvent[], + nextCursor?: string | null, +): string | undefined => { + if (nextCursor) return nextCursor + if (events.length === 0) return cursor + const numericCursor = cursor === undefined ? 0 : Number(cursor) + if (Number.isInteger(numericCursor) && numericCursor >= 0) { + return String(numericCursor + events.length) + } + return events.at(-1)?.id ?? cursor +} + +const liveEventDedupeKey = (event: ChangeEvent): string | undefined => { + if (!event.id) return undefined + const resource = asRecord(event.resource) ?? {} + return [ + event.id, + event.type, + event.resource.path, + stringValue(resource.revision) ?? '', + event.digest ?? '', + ].join('\u001f') +} + +const rememberLiveEvent = (seen: Set, key: string): void => { + seen.add(key) + if (seen.size <= LIVE_DEDUPE_LIMIT) return + const oldest = seen.values().next().value + if (oldest) seen.delete(oldest) +} + const recordName = (value: unknown): string | undefined => { if (typeof value === 'string') { return value diff --git a/packages/factory-sdk/src/orchestrator/index.ts b/packages/factory-sdk/src/orchestrator/index.ts index 1bf1e82d..db2e1876 100644 --- a/packages/factory-sdk/src/orchestrator/index.ts +++ b/packages/factory-sdk/src/orchestrator/index.ts @@ -1,3 +1,3 @@ export { BatchTracker, issueKey } from './batch-tracker' export type { InFlightIssue, QueuedIssue, TrackedAgent } from './batch-tracker' -export { createFactory, FactoryLoop, parseLinearIssue } from './factory' +export { createFactory, FactoryLoop, isRealLinearIssue, parseLinearIssue } from './factory' diff --git a/packages/factory-sdk/src/ports/mount.ts b/packages/factory-sdk/src/ports/mount.ts index 8bc6f0bf..147533f0 100644 --- a/packages/factory-sdk/src/ports/mount.ts +++ b/packages/factory-sdk/src/ports/mount.ts @@ -1,12 +1,18 @@ import type { ChangeEvent as RelayFileChangeEvent, - SubscribeOptions as RelayFileSubscribeOptions, Subscription as RelayFileSubscription, } from '@relayfile/sdk' export type ChangeEvent = RelayFileChangeEvent -export type SubscribeOptions = RelayFileSubscribeOptions export type Subscription = RelayFileSubscription +export type SubscribeOptions = { + coalesce?: 'none' | 'fire-once' + coalesceMs?: number + pathScope?: string[] + from?: 'now' | 'legacy' + onCoalesced?: () => void + onQueueDepth?: (depth: number) => void +} export interface EventPage { events: ChangeEvent[] diff --git a/packages/factory-sdk/src/types.ts b/packages/factory-sdk/src/types.ts index 592fe73e..935bf1f7 100644 --- a/packages/factory-sdk/src/types.ts +++ b/packages/factory-sdk/src/types.ts @@ -19,7 +19,7 @@ export interface FactoryPorts { } export interface Factory { - start(): Promise + start(opts?: FactoryStartOptions): Promise stop(): Promise runOnce(opts?: { dryRun?: boolean }): Promise triageIssue(issue: LinearIssue): Promise @@ -31,6 +31,17 @@ export interface Factory { ): () => void } +export interface FactoryStartOptions { + mode?: 'backfill-and-subscribe' | 'live' + liveSubscription?: Partial +} + +export interface FactoryLiveSubscriptionOptions { + transport: 'subscribe-and-poll' | 'subscribe' | 'poll' + pollIntervalMs: number + eventLimit: number +} + export interface LinearIssue { uuid: string key: string From b49f29ad23fea324b111066e74616d8bfda12aa3 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 12 Jun 2026 12:53:59 +0200 Subject: [PATCH 2/5] Avoid default live poll backlog drain --- .../src/orchestrator/factory.test.ts | 29 ++++++++++++++++++- .../factory-sdk/src/orchestrator/factory.ts | 7 ++--- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 86aeb184..75f79ec7 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -1,7 +1,7 @@ import { describe, expect, it, vi } from 'vitest' import { FactoryConfigSchema, createFactory, parseLinearIssue, type FactoryConfig, type TriageDecision, type TriageEngine } from '../index' -import type { ChangeEvent, LinearWriteback, SlackWriteback } from '../ports' +import type { ChangeEvent, EventPage, LinearWriteback, SlackWriteback } from '../ports' import { FakeFleetClient, FakeMountClient } from '../testing' import type { CloseProbePrInput, LinearIssue } from '../index' import { BatchTracker } from './batch-tracker' @@ -99,6 +99,15 @@ class CountingTriage extends StaticTriage { } } +class CountingEventsMount extends FakeMountClient { + getEventsCalls = 0 + + override async getEvents(opts: { cursor?: string; limit?: number }): Promise { + this.getEventsCalls += 1 + return super.getEvents(opts) + } +} + describe('FactoryLoop', () => { it('parses wrapped Linear issue records', () => { expect(parseLinearIssue(issuePath(1), issueFile(1))).toMatchObject({ @@ -394,6 +403,24 @@ describe('FactoryLoop', () => { await factory.stop() }) + it('live subscription default uses subscribe without draining getEvents at startup', async () => { + const path = issuePath(33) + const mount = new CountingEventsMount() + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + + await factory.start({ mode: 'live' }) + + expect(mount.getEventsCalls).toBe(0) + + mount.files.set(path, { content: realIssueFile(33) }) + mount.emit(changeEvent(path, 'event-live-default-33')) + await flush() + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-33-impl', 'ar-33-review']) + await factory.stop() + }) + it('live subscription ignores out-of-scope, non-ready, and non-real issue arrivals', async () => { const mount = new FakeMountClient() const fleet = new FakeFleetClient() diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index ad7f1c58..c8e03145 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -157,10 +157,8 @@ export class FactoryLoop implements Factory { async #startLiveSubscription(overrides: Partial = {}): Promise { const options = this.#liveOptions(overrides) - this.#liveEventCursor = await this.#currentEventCursor(options.eventLimit) this.#seenLiveEvents.clear() - this.#logger.info?.('[factory] live subscription connected from current event cursor', { - cursor: this.#liveEventCursor, + this.#logger.info?.('[factory] live subscription starting', { transport: options.transport, }) @@ -170,7 +168,8 @@ export class FactoryLoop implements Factory { }, { from: 'now', coalesce: 'none' }) } - if (options.transport !== 'subscribe') { + if (options.transport === 'poll') { + this.#liveEventCursor = await this.#currentEventCursor(options.eventLimit) this.#scheduleLivePoll(0, options) } } From bc6de8c5ea41ff0049c6c386859ab3a201883978 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 12 Jun 2026 13:06:17 +0200 Subject: [PATCH 3/5] Suppress replayed live issue events --- .../src/mount/relayfile-cloud-mount-client.ts | 10 +++++ .../src/orchestrator/factory.test.ts | 26 ++++++++++++ .../factory-sdk/src/orchestrator/factory.ts | 42 +++++++++++++++++++ packages/factory-sdk/src/ports/mount.ts | 1 + packages/factory-sdk/src/testing/fakes.ts | 7 ++++ 5 files changed, 86 insertions(+) diff --git a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts index 9e6bd968..6779d297 100644 --- a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts +++ b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts @@ -52,6 +52,7 @@ export type RelayFileClientLike = deleteFile(input: DeleteFileInput): Promise listTree(workspaceId: string, options?: ListTreeOptions): Promise getEvents(workspaceId: string, options?: GetEventsOptions): Promise + listLastNChanges?(limit: number, context?: { workspaceId: string; token?: string }): Promise<{ events: ChangeEvent[] }> getResourceAtEvent(eventId: string, context?: { workspaceId: string; token?: string }): Promise getOp?(workspaceId: string, opId: string): Promise getToken?(): Promise | string @@ -229,6 +230,15 @@ export class RelayfileCloudMountClient implements MountClient { } } + async getEventHighWatermark(opts: { provider?: string } = {}): Promise { + if (!this.#client.listLastNChanges) return undefined + const response = await this.#client.listLastNChanges(10, { workspaceId: this.workspaceId }) + const events = opts.provider + ? response.events.filter((event) => event.resource.provider === opts.provider) + : response.events + return events.map((event) => event.id).sort().at(-1) + } + async confirmWrite( path: string, opts: { timeoutMs?: number } = {}, diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index 75f79ec7..a60fabc0 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -421,6 +421,32 @@ describe('FactoryLoop', () => { await factory.stop() }) + it('live subscription default suppresses replayed pre-connect ready issues and accepts new arrivals', async () => { + const replayPath = issuePath(34) + const newPath = issuePath(35) + const mount = new CountingEventsMount({ [replayPath]: realIssueFile(34) }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + mount.emit(changeEvent(replayPath, '100')) + + await factory.start({ mode: 'live' }) + + expect(mount.getEventsCalls).toBe(0) + + mount.emit(changeEvent(replayPath, '100')) + await flush() + + expect(fleet.spawns).toEqual([]) + expect(factory.status().counters.liveReplayEventsSuppressed).toBe(1) + + mount.files.set(newPath, { content: realIssueFile(35) }) + mount.emit(changeEvent(newPath, '101')) + await flush() + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-35-impl', 'ar-35-review']) + await factory.stop() + }) + it('live subscription ignores out-of-scope, non-ready, and non-real issue arrivals', async () => { const mount = new FakeMountClient() const fleet = new FakeFleetClient() diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index c8e03145..505d43cc 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -71,6 +71,7 @@ export class FactoryLoop implements Factory { #livePollTimer?: ReturnType #livePollInFlight = false #liveEventCursor?: string + #liveEventHighWatermark?: string readonly #seenLiveEvents = new Set() #offAgentExit?: () => void #offDeliveryFailed?: () => void @@ -157,9 +158,11 @@ export class FactoryLoop implements Factory { async #startLiveSubscription(overrides: Partial = {}): Promise { const options = this.#liveOptions(overrides) + this.#liveEventHighWatermark = await this.#currentEventHighWatermark() this.#seenLiveEvents.clear() this.#logger.info?.('[factory] live subscription starting', { transport: options.transport, + highWatermark: this.#liveEventHighWatermark, }) if (options.transport !== 'poll') { @@ -191,6 +194,15 @@ export class FactoryLoop implements Factory { } } + async #currentEventHighWatermark(): Promise { + try { + return await this.#mount.getEventHighWatermark?.() + } catch (error) { + this.#logger.warn?.('[factory] live subscription high-watermark unavailable', error) + return undefined + } + } + #scheduleLivePoll(delayMs: number, options: FactoryLiveSubscriptionOptions): void { if (this.#livePollTimer || !this.#started) return this.#livePollTimer = setTimeout(() => { @@ -229,6 +241,16 @@ export class FactoryLoop implements Factory { return } + if (isAtOrBeforeHighWatermark(event.id, this.#liveEventHighWatermark)) { + this.#increment('liveReplayEventsSuppressed') + this.#logger.debug?.('[factory] suppressed replayed live issue event', { + id: event.id, + highWatermark: this.#liveEventHighWatermark, + path, + }) + return + } + const dedupeKey = liveEventDedupeKey(event) if (dedupeKey) { if (this.#seenLiveEvents.has(dedupeKey)) { @@ -962,6 +984,26 @@ const liveEventDedupeKey = (event: ChangeEvent): string | undefined => { ].join('\u001f') } +const isAtOrBeforeHighWatermark = (eventId: string | undefined, highWatermark: string | undefined): boolean => { + if (!eventId || !highWatermark) return false + if (eventId === highWatermark) return true + const eventSequence = eventSequenceNumber(eventId) + const watermarkSequence = eventSequenceNumber(highWatermark) + if (eventSequence !== undefined && watermarkSequence !== undefined) { + return eventSequence <= watermarkSequence + } + return false +} + +const eventSequenceNumber = (eventId: string): number | undefined => { + const whole = Number(eventId) + if (Number.isFinite(whole)) return whole + const trailing = eventId.match(/(\d+)$/u)?.[1] + if (!trailing) return undefined + const parsed = Number(trailing) + return Number.isFinite(parsed) ? parsed : undefined +} + const rememberLiveEvent = (seen: Set, key: string): void => { seen.add(key) if (seen.size <= LIVE_DEDUPE_LIMIT) return diff --git a/packages/factory-sdk/src/ports/mount.ts b/packages/factory-sdk/src/ports/mount.ts index 147533f0..5a2ee426 100644 --- a/packages/factory-sdk/src/ports/mount.ts +++ b/packages/factory-sdk/src/ports/mount.ts @@ -29,6 +29,7 @@ export interface MountClient { listTree(prefix: string): Promise subscribe(globs: string[], onChange: (event: ChangeEvent) => void, opts?: SubscribeOptions): Subscription getEvents(opts: { cursor?: string; limit?: number }): Promise + getEventHighWatermark?(opts?: { provider?: string }): Promise confirmWrite(path: string, opts?: { timeoutMs?: number }): Promise<'acked' | 'pending' | 'failed' | 'timeout'> ensureSubRoot(prefix: string, opts?: { timeoutMs?: number }): Promise<'ready' | 'absent'> } diff --git a/packages/factory-sdk/src/testing/fakes.ts b/packages/factory-sdk/src/testing/fakes.ts index 7079be9f..2b9898c3 100644 --- a/packages/factory-sdk/src/testing/fakes.ts +++ b/packages/factory-sdk/src/testing/fakes.ts @@ -85,6 +85,13 @@ export class FakeMountClient implements MountClient { } } + async getEventHighWatermark(opts: { provider?: string } = {}): Promise { + const events = opts.provider + ? this.#events.filter((event) => event.resource.provider === opts.provider) + : this.#events + return events.at(-1)?.id + } + async confirmWrite(path: string, _opts?: { timeoutMs?: number }): Promise<'acked' | 'pending' | 'failed' | 'timeout'> { return this.#confirmations.get(path) ?? 'acked' } From f2da103291dcead0853d9977d83aed6349113f6f Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 12 Jun 2026 13:16:01 +0200 Subject: [PATCH 4/5] Harden live event watermark tests --- .../relayfile-cloud-mount-client.test.ts | 36 ++++++++++++++++++- .../src/mount/relayfile-cloud-mount-client.ts | 30 +++++++++++++++- .../src/orchestrator/factory.test.ts | 21 ++++++++--- 3 files changed, 80 insertions(+), 7 deletions(-) diff --git a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.test.ts b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.test.ts index 10a84ddb..48ebb30a 100644 --- a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.test.ts +++ b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, vi } from 'vitest' -import type { OperationStatusResponse } from '@relayfile/sdk' +import type { ChangeEvent, OperationStatusResponse } from '@relayfile/sdk' import { RelayfileCloudMountClient, type RelayFileClientLike } from './relayfile-cloud-mount-client' @@ -91,6 +91,25 @@ class FakeRelayFileClient implements RelayFileClientLike { return { events: this.events, nextCursor: null } } + async listLastNChanges(_limit: number, _context?: { workspaceId: string }): Promise<{ events: ChangeEvent[] }> { + return { + events: this.events.map((event) => ({ + id: event.eventId, + workspace: 'rw_test', + type: 'relayfile.changed' as const, + occurredAt: event.timestamp, + resource: { + path: event.path, + kind: 'file', + id: event.path, + provider: 'linear', + }, + summary: {}, + expand: async () => ({ level: 'summary' as const, path: event.path, summary: {} }), + }) as unknown as ChangeEvent), + } + } + async getResourceAtEvent(eventId: string, context?: { workspaceId: string }) { return { path: `/events/${eventId}.json`, data: context, digest: eventId } } @@ -145,6 +164,21 @@ describe('RelayfileCloudMountClient', () => { expect(fake.getEventsCalls[0]).toEqual({ workspaceId: 'rw_test', opts: { cursor: 'evt-0', limit: 10 } }) }) + it('selects the numeric event high-watermark instead of lexicographic max', async () => { + const fake = new FakeRelayFileClient() + fake.events = ['7', '8', '9', '10', '11'].map((eventId) => ({ + eventId, + type: 'file.updated' as const, + path: `/linear/issues/AR-${eventId}.json`, + revision: eventId, + timestamp: '2026-01-01T00:00:00.000Z', + })) + const mount = new RelayfileCloudMountClient({ workspaceId: 'rw_test', client: fake }) + + await expect(mount.getEventHighWatermark()).resolves.toBe('11') + expect(fake.getEventsCalls).toEqual([]) + }) + it('writes through the RelayFileClient with workspace id and live baseRevision', async () => { const fake = new FakeRelayFileClient() fake.files.set('/linear/issues/AR-1.json', { diff --git a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts index 6779d297..c7d39d65 100644 --- a/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts +++ b/packages/factory-sdk/src/mount/relayfile-cloud-mount-client.ts @@ -236,7 +236,7 @@ export class RelayfileCloudMountClient implements MountClient { const events = opts.provider ? response.events.filter((event) => event.resource.provider === opts.provider) : response.events - return events.map((event) => event.id).sort().at(-1) + return maxEventId(events.map((event) => event.id)) } async confirmWrite( @@ -356,3 +356,31 @@ const errorMessage = (error: unknown): string => error instanceof Error ? error.message : String(error) const sleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)) + +const maxEventId = (ids: string[]): string | undefined => { + let max: string | undefined + for (const id of ids) { + if (!max || compareEventIds(id, max) > 0) { + max = id + } + } + return max +} + +const compareEventIds = (left: string, right: string): number => { + const leftSequence = eventSequenceNumber(left) + const rightSequence = eventSequenceNumber(right) + if (leftSequence !== undefined && rightSequence !== undefined) { + return leftSequence - rightSequence + } + return left.localeCompare(right) +} + +const eventSequenceNumber = (eventId: string): number | undefined => { + const whole = Number(eventId) + if (Number.isFinite(whole)) return whole + const trailing = eventId.match(/(\d+)$/u)?.[1] + if (!trailing) return undefined + const parsed = Number(trailing) + return Number.isFinite(parsed) ? parsed : undefined +} diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index a60fabc0..a83b78ff 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -108,6 +108,12 @@ class CountingEventsMount extends FakeMountClient { } } +class NoWatermarkMount extends FakeMountClient { + override async getEventHighWatermark(): Promise { + return undefined + } +} + describe('FactoryLoop', () => { it('parses wrapped Linear issue records', () => { expect(parseLinearIssue(issuePath(1), issueFile(1))).toMatchObject({ @@ -423,24 +429,29 @@ describe('FactoryLoop', () => { it('live subscription default suppresses replayed pre-connect ready issues and accepts new arrivals', async () => { const replayPath = issuePath(34) + const tipPath = issuePath(36) const newPath = issuePath(35) - const mount = new CountingEventsMount({ [replayPath]: realIssueFile(34) }) + const mount = new CountingEventsMount({ + [replayPath]: realIssueFile(34), + [tipPath]: realIssueFile(36), + }) const fleet = new FakeFleetClient() const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) - mount.emit(changeEvent(replayPath, '100')) + mount.emit(changeEvent(replayPath, '9')) + mount.emit(changeEvent(tipPath, '10')) await factory.start({ mode: 'live' }) expect(mount.getEventsCalls).toBe(0) - mount.emit(changeEvent(replayPath, '100')) + mount.emit(changeEvent(replayPath, '9')) await flush() expect(fleet.spawns).toEqual([]) expect(factory.status().counters.liveReplayEventsSuppressed).toBe(1) mount.files.set(newPath, { content: realIssueFile(35) }) - mount.emit(changeEvent(newPath, '101')) + mount.emit(changeEvent(newPath, '100')) await flush() expect(fleet.spawns.map((spawn) => spawn.name)).toEqual(['ar-35-impl', 'ar-35-review']) @@ -477,7 +488,7 @@ describe('FactoryLoop', () => { try { const oldPath = issuePath(30) const newPath = issuePath(31) - const mount = new FakeMountClient({ [oldPath]: realIssueFile(30) }) + const mount = new NoWatermarkMount({ [oldPath]: realIssueFile(30) }) const fleet = new FakeFleetClient() const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) mount.emit(changeEvent(oldPath, 'event-before-start-30', new Date(Date.now() + 1_000).toISOString())) From 5896cfe86642bc4d89bc1b782c3e69d2689a54e4 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Fri, 12 Jun 2026 13:27:33 +0200 Subject: [PATCH 5/5] Suppress stale live replay by time --- packages/factory-sdk/src/config/schema.ts | 1 + .../src/orchestrator/factory.test.ts | 41 +++++++++++++++++++ .../factory-sdk/src/orchestrator/factory.ts | 31 ++++++++++++++ packages/factory-sdk/src/types.ts | 1 + 4 files changed, 74 insertions(+) diff --git a/packages/factory-sdk/src/config/schema.ts b/packages/factory-sdk/src/config/schema.ts index 8b59f497..a7af7b4f 100644 --- a/packages/factory-sdk/src/config/schema.ts +++ b/packages/factory-sdk/src/config/schema.ts @@ -14,6 +14,7 @@ export const FactoryConfigSchema = z.object({ transport: z.enum(['subscribe-and-poll', 'subscribe', 'poll']).default('subscribe-and-poll'), pollIntervalMs: z.number().int().min(50).default(5_000), eventLimit: z.number().int().min(1).max(1_000).default(1_000), + replaySkewMarginMs: z.number().int().min(0).default(60_000), }).default({}), repos: z.object({ byLabel: z.record(z.string(), z.string()), diff --git a/packages/factory-sdk/src/orchestrator/factory.test.ts b/packages/factory-sdk/src/orchestrator/factory.test.ts index a83b78ff..cf738ad6 100644 --- a/packages/factory-sdk/src/orchestrator/factory.test.ts +++ b/packages/factory-sdk/src/orchestrator/factory.test.ts @@ -114,6 +114,12 @@ class NoWatermarkMount extends FakeMountClient { } } +class ThrowingWatermarkMount extends FakeMountClient { + override async getEventHighWatermark(): Promise { + throw Object.assign(new Error('Route not found'), { status: 404 }) + } +} + describe('FactoryLoop', () => { it('parses wrapped Linear issue records', () => { expect(parseLinearIssue(issuePath(1), issueFile(1))).toMatchObject({ @@ -458,6 +464,41 @@ describe('FactoryLoop', () => { await factory.stop() }) + it('live subscription suppresses old replay by time when high-watermark is unavailable', async () => { + const replayPath = issuePath(37) + const freshPath = issuePath(38) + const skewPath = issuePath(39) + const mount = new ThrowingWatermarkMount({ + [replayPath]: realIssueFile(37), + [freshPath]: realIssueFile(38), + [skewPath]: realIssueFile(39), + }) + const fleet = new FakeFleetClient() + const factory = createFactory(config(), { mount, fleet, triage: new StaticTriage() }) + + await factory.start({ mode: 'live', liveSubscription: { replaySkewMarginMs: 60_000 } }) + + mount.emit(changeEvent(replayPath, '201', new Date(Date.now() - 5 * 60_000).toISOString())) + await flush() + + expect(fleet.spawns).toEqual([]) + expect(factory.status().counters.liveReplayEventsSuppressedByTime).toBe(1) + + mount.emit(changeEvent(skewPath, '202', new Date(Date.now() - 1_000).toISOString())) + await flush() + + mount.emit(changeEvent(freshPath, '203', new Date(Date.now()).toISOString())) + await flush() + + expect(fleet.spawns.map((spawn) => spawn.name)).toEqual([ + 'ar-39-impl', + 'ar-39-review', + 'ar-38-impl', + 'ar-38-review', + ]) + await factory.stop() + }) + it('live subscription ignores out-of-scope, non-ready, and non-real issue arrivals', async () => { const mount = new FakeMountClient() const fleet = new FakeFleetClient() diff --git a/packages/factory-sdk/src/orchestrator/factory.ts b/packages/factory-sdk/src/orchestrator/factory.ts index 505d43cc..d90399c1 100644 --- a/packages/factory-sdk/src/orchestrator/factory.ts +++ b/packages/factory-sdk/src/orchestrator/factory.ts @@ -72,6 +72,8 @@ export class FactoryLoop implements Factory { #livePollInFlight = false #liveEventCursor?: string #liveEventHighWatermark?: string + #liveConnectStartedAtMs = 0 + #liveReplaySkewMarginMs = 0 readonly #seenLiveEvents = new Set() #offAgentExit?: () => void #offDeliveryFailed?: () => void @@ -158,11 +160,14 @@ export class FactoryLoop implements Factory { async #startLiveSubscription(overrides: Partial = {}): Promise { const options = this.#liveOptions(overrides) + this.#liveConnectStartedAtMs = this.#clock.now() + this.#liveReplaySkewMarginMs = options.replaySkewMarginMs this.#liveEventHighWatermark = await this.#currentEventHighWatermark() this.#seenLiveEvents.clear() this.#logger.info?.('[factory] live subscription starting', { transport: options.transport, highWatermark: this.#liveEventHighWatermark, + replaySkewMarginMs: this.#liveReplaySkewMarginMs, }) if (options.transport !== 'poll') { @@ -182,6 +187,7 @@ export class FactoryLoop implements Factory { transport: overrides.transport ?? this.#config.liveSubscription.transport, pollIntervalMs: overrides.pollIntervalMs ?? this.#config.liveSubscription.pollIntervalMs, eventLimit: overrides.eventLimit ?? this.#config.liveSubscription.eventLimit, + replaySkewMarginMs: overrides.replaySkewMarginMs ?? this.#config.liveSubscription.replaySkewMarginMs, } } @@ -198,6 +204,7 @@ export class FactoryLoop implements Factory { try { return await this.#mount.getEventHighWatermark?.() } catch (error) { + this.#increment('liveHighWatermarkUnavailable') this.#logger.warn?.('[factory] live subscription high-watermark unavailable', error) return undefined } @@ -241,8 +248,22 @@ export class FactoryLoop implements Factory { return } + if (isBeforeLiveCutoff(event.occurredAt, this.#liveConnectStartedAtMs, this.#liveReplaySkewMarginMs)) { + this.#increment('liveReplayEventsSuppressed') + this.#increment('liveReplayEventsSuppressedByTime') + this.#logger.debug?.('[factory] suppressed stale live issue event', { + id: event.id, + path, + occurredAt: event.occurredAt, + connectStartedAt: new Date(this.#liveConnectStartedAtMs).toISOString(), + replaySkewMarginMs: this.#liveReplaySkewMarginMs, + }) + return + } + if (isAtOrBeforeHighWatermark(event.id, this.#liveEventHighWatermark)) { this.#increment('liveReplayEventsSuppressed') + this.#increment('liveReplayEventsSuppressedByWatermark') this.#logger.debug?.('[factory] suppressed replayed live issue event', { id: event.id, highWatermark: this.#liveEventHighWatermark, @@ -984,6 +1005,16 @@ const liveEventDedupeKey = (event: ChangeEvent): string | undefined => { ].join('\u001f') } +const isBeforeLiveCutoff = ( + occurredAt: string, + connectStartedAtMs: number, + skewMarginMs: number, +): boolean => { + const occurredAtMs = Date.parse(occurredAt) + if (!Number.isFinite(occurredAtMs)) return false + return occurredAtMs < connectStartedAtMs - skewMarginMs +} + const isAtOrBeforeHighWatermark = (eventId: string | undefined, highWatermark: string | undefined): boolean => { if (!eventId || !highWatermark) return false if (eventId === highWatermark) return true diff --git a/packages/factory-sdk/src/types.ts b/packages/factory-sdk/src/types.ts index 935bf1f7..3eba275b 100644 --- a/packages/factory-sdk/src/types.ts +++ b/packages/factory-sdk/src/types.ts @@ -40,6 +40,7 @@ export interface FactoryLiveSubscriptionOptions { transport: 'subscribe-and-poll' | 'subscribe' | 'poll' pollIntervalMs: number eventLimit: number + replaySkewMarginMs: number } export interface LinearIssue {