From 4e85f0d5cf7dcb08c84109de4b96248fd121ca7d Mon Sep 17 00:00:00 2001 From: Miya Date: Mon, 22 Jun 2026 15:26:48 +0200 Subject: [PATCH] fix(factory): propagate relay workspace to agents --- src/cli/fleet.ts | 4 +-- src/fleet/create-fleet.ts | 2 ++ src/fleet/ensure-relay-broker.test.ts | 40 +++++++++++++++++++++++-- src/fleet/ensure-relay-broker.ts | 22 +++++++------- src/fleet/internal-fleet-client.test.ts | 26 ++++++++++++++++ src/fleet/internal-fleet-client.ts | 33 ++++++++++---------- src/fleet/relay-workspace-key.ts | 26 ++++++++++++++++ src/orchestrator/factory.test.ts | 2 +- src/orchestrator/factory.ts | 22 ++++++++++++-- src/writeback/linear.ts | 29 ++++++++++++------ src/writeback/writeback.test.ts | 39 +++++++++++++++++++++++- 11 files changed, 200 insertions(+), 45 deletions(-) create mode 100644 src/fleet/relay-workspace-key.ts diff --git a/src/cli/fleet.ts b/src/cli/fleet.ts index 9310e34..0e2b77f 100644 --- a/src/cli/fleet.ts +++ b/src/cli/fleet.ts @@ -490,8 +490,8 @@ async function buildFleet(globals: GlobalOptions, loaded: LoadedConfig | undefin if (globals.backend === 'internal') { const stderr = deps.stderr ?? process.stderr const logger = streamLogger(stderr) - const { client, started } = await (deps.ensureRelayBroker ?? ensureRelayBroker)({ cwd, connectionPath, logger }) - return createFleet({ backend: 'internal', cwd, connectionPath }, { harnessClient: client, ownsBroker: started }) + const { client, started, workspaceKey } = await (deps.ensureRelayBroker ?? ensureRelayBroker)({ cwd, connectionPath, logger }) + return createFleet({ backend: 'internal', cwd, connectionPath }, { harnessClient: client, ownsBroker: started, workspaceKey }) } return createFleet({ backend: globals.backend, cwd, connectionPath }) diff --git a/src/fleet/create-fleet.ts b/src/fleet/create-fleet.ts index 685700a..31b68b9 100644 --- a/src/fleet/create-fleet.ts +++ b/src/fleet/create-fleet.ts @@ -14,6 +14,7 @@ export interface CreateFleetDeps { // True when harnessClient owns a broker we spawned, so the fleet shuts it down // on dispose instead of leaving it running. ownsBroker?: boolean + workspaceKey?: string } export function createFleet(options: CreateFleetOptions = {}, deps: CreateFleetDeps = {}) { @@ -26,6 +27,7 @@ export function createFleet(options: CreateFleetOptions = {}, deps: CreateFleetD return new InternalFleetClient({ client: deps.harnessClient, ownsBroker: deps.ownsBroker, + workspaceKey: deps.workspaceKey, cwd: options.cwd, connectionPath: options.connectionPath, }) diff --git a/src/fleet/ensure-relay-broker.test.ts b/src/fleet/ensure-relay-broker.test.ts index 15f8c07..2c98405 100644 --- a/src/fleet/ensure-relay-broker.test.ts +++ b/src/fleet/ensure-relay-broker.test.ts @@ -3,6 +3,8 @@ import { describe, expect, it, vi } from 'vitest' import { ensureRelayBroker } from './ensure-relay-broker' import type { HarnessDriverClientLike } from './internal-fleet-client' +const noStoredWorkspaceKey = () => undefined + const fakeClient = (tag: string): HarnessDriverClientLike => ({ // tag lets a test assert which client (connected vs spawned) came back. brokerPid: tag === 'connected' ? 111 : 222, @@ -42,10 +44,11 @@ describe('ensureRelayBroker', () => { }) const spawn = vi.fn(async () => spawned) - const handle = await ensureRelayBroker({ cwd: '/work', connect, spawn, env: {} }) + const handle = await ensureRelayBroker({ cwd: '/work', connect, spawn, env: {}, resolveWorkspaceKey: noStoredWorkspaceKey }) expect(handle.client).toBe(spawned) expect(handle.started).toBe(true) + expect(handle.workspaceKey).toBeUndefined() expect(spawn).toHaveBeenCalledWith({ cwd: '/work', workspaceKey: undefined }) }) @@ -72,25 +75,56 @@ describe('ensureRelayBroker', () => { spawn: async () => fakeClient('spawned'), logger: { info }, env: {}, + resolveWorkspaceKey: noStoredWorkspaceKey, }) expect(info).toHaveBeenCalledWith('[factory] no relay broker running; starting one', { reason: 'boom', joiningWorkspace: false }) }) it('threads a workspace key (env or option) into spawn so the broker JOINS', async () => { const spawn = vi.fn(async () => fakeClient('spawned')) - await ensureRelayBroker({ connect: () => { throw new Error('no broker') }, spawn, env: { RELAY_WORKSPACE_KEY: 'rk_live_test' } }) + const envHandle = await ensureRelayBroker({ connect: () => { throw new Error('no broker') }, spawn, env: { RELAY_WORKSPACE_KEY: 'rk_live_test' } }) + expect(envHandle.workspaceKey).toBe('rk_live_test') expect(spawn).toHaveBeenCalledWith(expect.objectContaining({ workspaceKey: 'rk_live_test' })) spawn.mockClear() - await ensureRelayBroker({ connect: () => { throw new Error('no broker') }, spawn, workspaceKey: 'rk_live_explicit', env: {} }) + const explicitHandle = await ensureRelayBroker({ connect: () => { throw new Error('no broker') }, spawn, workspaceKey: 'rk_live_explicit', env: {} }) + expect(explicitHandle.workspaceKey).toBe('rk_live_explicit') expect(spawn).toHaveBeenCalledWith(expect.objectContaining({ workspaceKey: 'rk_live_explicit' })) }) + it('uses the active Agent Relay workspace key when env is empty', async () => { + const spawn = vi.fn(async () => fakeClient('spawned')) + const handle = await ensureRelayBroker({ + connect: () => { throw new Error('no broker') }, + spawn, + env: {}, + resolveWorkspaceKey: () => 'rk_live_stored', + }) + + expect(handle.workspaceKey).toBe('rk_live_stored') + expect(spawn).toHaveBeenCalledWith(expect.objectContaining({ workspaceKey: 'rk_live_stored' })) + }) + + it('returns the resolved workspace key when reusing an existing broker', async () => { + const connected = fakeClient('connected') + const handle = await ensureRelayBroker({ + connect: () => connected, + spawn: async () => fakeClient('spawned'), + env: {}, + resolveWorkspaceKey: () => 'rk_live_reused', + }) + + expect(handle.client).toBe(connected) + expect(handle.started).toBe(false) + expect(handle.workspaceKey).toBe('rk_live_reused') + }) + it('fails with actionable guidance when there is no broker and no workspace key', async () => { await expect(ensureRelayBroker({ connect: () => { throw new Error('no broker') }, spawn: async () => { throw new Error('insert into workspaces failed') }, env: {}, + resolveWorkspaceKey: noStoredWorkspaceKey, })).rejects.toThrow(/RELAY_WORKSPACE_KEY/u) }) }) diff --git a/src/fleet/ensure-relay-broker.ts b/src/fleet/ensure-relay-broker.ts index 77d953d..4898f52 100644 --- a/src/fleet/ensure-relay-broker.ts +++ b/src/fleet/ensure-relay-broker.ts @@ -2,6 +2,7 @@ import { HarnessDriverClient } from '@agent-relay/harness-driver' import type { Logger } from '../ports/system' import type { HarnessDriverClientLike } from './internal-fleet-client' +import { resolveRelayWorkspaceKey } from './relay-workspace-key' export interface EnsureRelayBrokerOptions { cwd?: string @@ -18,6 +19,7 @@ export interface EnsureRelayBrokerOptions { connect?: (options: { cwd?: string; connectionPath?: string }) => HarnessDriverClientLike spawn?: (options: { cwd?: string; workspaceKey?: string }) => Promise env?: NodeJS.ProcessEnv + resolveWorkspaceKey?: (env: NodeJS.ProcessEnv) => string | undefined } export interface RelayBrokerHandle { @@ -26,6 +28,7 @@ export interface RelayBrokerHandle { // dispose). False when we reused a broker that was already running — that one // belongs to the operator and must never be killed. started: boolean + workspaceKey?: string } // Resolve the relay broker for the internal fleet backend: reuse the broker that @@ -39,11 +42,16 @@ export async function ensureRelayBroker(options: EnsureRelayBrokerOptions = {}): const connect = options.connect ?? ((opts) => HarnessDriverClient.connect(opts)) const spawn = options.spawn ?? ((opts) => HarnessDriverClient.spawn(opts)) const env = options.env ?? process.env + const workspaceKey = resolveRelayWorkspaceKey({ + workspaceKey: options.workspaceKey, + env, + activeWorkspaceKey: options.resolveWorkspaceKey, + }) try { const client = connect({ cwd: options.cwd, connectionPath: options.connectionPath }) options.logger?.info?.('[factory] reusing the relay broker that is already running') - return { client, started: false } + return { client, started: false, workspaceKey } } catch (error) { if (options.autoStart === false) { throw error @@ -53,17 +61,13 @@ export async function ensureRelayBroker(options: EnsureRelayBrokerOptions = {}): // initialize relaycast session: insert into workspaces"). The workspace key // (rk_live_…) makes the broker join. Pear injects it at spawn; standalone the // operator supplies it via RELAY_WORKSPACE_KEY. - const workspaceKey = nonEmpty(options.workspaceKey) - ?? nonEmpty(env.RELAY_WORKSPACE_KEY) - ?? nonEmpty(env.AGENT_RELAY_WORKSPACE_KEY) - ?? nonEmpty(env.RELAY_API_KEY) options.logger?.info?.('[factory] no relay broker running; starting one', { reason: error instanceof Error ? error.message : String(error), joiningWorkspace: Boolean(workspaceKey), }) try { const client = await spawn({ cwd: options.cwd, workspaceKey }) - return { client, started: true } + return { client, started: true, workspaceKey } } catch (spawnError) { if (!workspaceKey) { throw new Error( @@ -78,9 +82,3 @@ export async function ensureRelayBroker(options: EnsureRelayBrokerOptions = {}): } } } - -const nonEmpty = (value: string | undefined): string | undefined => { - if (typeof value !== 'string') return undefined - const trimmed = value.trim() - return trimmed.length > 0 ? trimmed : undefined -} diff --git a/src/fleet/internal-fleet-client.test.ts b/src/fleet/internal-fleet-client.test.ts index 6c369eb..a9ffb32 100644 --- a/src/fleet/internal-fleet-client.test.ts +++ b/src/fleet/internal-fleet-client.test.ts @@ -142,6 +142,8 @@ describe('InternalFleetClient', () => { cwd: '/worktree', restartPolicy: { max_restarts: 2 }, continueFrom: 'previous-session', + spawnMode: 'task_exit', + exitAfterTask: true, harnessConfig: expect.objectContaining({ runtime: 'pty', command: 'codex', @@ -215,6 +217,28 @@ describe('InternalFleetClient', () => { }) }) + it('threads a resolved workspace key into spawned agent MCP env', async () => { + const harness = new FakeHarnessDriverClient() + const fleet = new InternalFleetClient({ + client: harness, + workspaceKey: 'rk_live_from_broker', + resolveAgentRelayMcpCommand: () => ({ command: '/usr/local/bin/node', args: ['/repo/node_modules/agent-relay/dist/cli/index.js', 'mcp'] }), + }) + + await fleet.spawn({ + name: 'ar-1-impl', + capability: 'spawn:codex', + task: 'do work', + }) + + const env = harness.spawned[0]?.harnessConfig?.env + expect(env).toEqual(expect.objectContaining({ + RELAY_WORKSPACE_KEY: 'rk_live_from_broker', + RELAY_API_KEY: 'rk_live_from_broker', + })) + expect(harness.spawned[0]?.harnessConfig?.args.join('\n')).toContain('"RELAY_WORKSPACE_KEY" = "rk_live_from_broker"') + }) + it('falls back to ordinary spawn when agent-relay MCP cannot be resolved', async () => { const harness = new FakeHarnessDriverClient() const logger = { warn: vi.fn() } @@ -240,6 +264,8 @@ describe('InternalFleetClient', () => { cwd: '/worktree', restartPolicy: undefined, continueFrom: undefined, + spawnMode: 'task_exit', + exitAfterTask: true, }) expect(logger.warn).toHaveBeenCalledWith( '[factory-sdk] agent-relay MCP command not found; spawning without MCP injection', diff --git a/src/fleet/internal-fleet-client.ts b/src/fleet/internal-fleet-client.ts index e29100e..013773e 100644 --- a/src/fleet/internal-fleet-client.ts +++ b/src/fleet/internal-fleet-client.ts @@ -8,6 +8,7 @@ import type { BrokerEvent, ListAgent, SendMessageInput, SpawnPtyInput } from '@a import type { AgentMessage, AgentPidResolution, Capability, FleetClient, RosterEntry, SendInput, SpawnInput, SpawnResult } from '../ports/fleet' import type { Logger } from '../ports/system' +import { resolveRelayWorkspaceKey } from './relay-workspace-key' const requireForResolve = createRequire(import.meta.url) @@ -41,6 +42,7 @@ export interface InternalFleetClientOptions { ownsBroker?: boolean cwd?: string connectionPath?: string + workspaceKey?: string resumeCapability?: Capability logger?: Logger resolveAgentRelayMcpCommand?: () => AgentRelayMcpCommand | undefined @@ -75,6 +77,7 @@ export class InternalFleetClient implements FleetClient { readonly #ownsBroker: boolean readonly #cwd?: string readonly #connectionPath?: string + readonly #workspaceKey?: string readonly #resumeCapability: Capability readonly #logger?: Logger readonly #resolveAgentRelayMcpCommand: () => AgentRelayMcpCommand | undefined @@ -99,6 +102,7 @@ export class InternalFleetClient implements FleetClient { constructor(options: InternalFleetClientOptions = {}) { this.#cwd = options.cwd this.#connectionPath = options.connectionPath + this.#workspaceKey = options.workspaceKey this.#resumeCapability = options.resumeCapability ?? 'spawn:codex' this.#logger = options.logger this.#resolveAgentRelayMcpCommand = options.resolveAgentRelayMcpCommand ?? resolveAgentRelayMcpCommand @@ -118,6 +122,8 @@ export class InternalFleetClient implements FleetClient { cwd: input.cwd ?? this.#cwd, restartPolicy: input.restartPolicy, continueFrom: input.sessionRef, + spawnMode: 'task_exit', + exitAfterTask: true, }) const handle = await this.#client.spawnPty(spawnInput) @@ -159,7 +165,7 @@ export class InternalFleetClient implements FleetClient { return { ...input, - harnessConfig: buildRelayMcpHarnessConfig(input, command), + harnessConfig: buildRelayMcpHarnessConfig(input, command, this.#workspaceKey), } } @@ -555,8 +561,12 @@ export function resolveAgentRelayMcpCommand(): AgentRelayMcpCommand | undefined } } -export function buildRelayMcpHarnessConfig(input: SpawnPtyInput, command: AgentRelayMcpCommand): NonNullable { - const relayEnv = relayMcpEnv(input.name, input.agentToken) +export function buildRelayMcpHarnessConfig( + input: SpawnPtyInput, + command: AgentRelayMcpCommand, + workspaceKey?: string, +): NonNullable { + const relayEnv = relayMcpEnv(input.name, input.agentToken, workspaceKey) return { runtime: 'pty', command: input.cli, @@ -612,18 +622,16 @@ function stdioMcpServer(command: AgentRelayMcpCommand, relayEnv: Record { +function relayMcpEnv(agentName: string, agentToken?: string, workspaceKey?: string): Record { const env: Record = { RELAY_AGENT_NAME: agentName, RELAY_AGENT_TYPE: 'agent', RELAY_STRICT_AGENT_NAME: '1', } - const workspaceKey = nonEmpty(process.env.RELAY_WORKSPACE_KEY) ?? - nonEmpty(process.env.AGENT_RELAY_WORKSPACE_KEY) ?? - relayWorkspaceKeyFromApiKey(process.env.RELAY_API_KEY) - if (workspaceKey) { - env.RELAY_WORKSPACE_KEY = workspaceKey - env.RELAY_API_KEY = workspaceKey + const resolvedWorkspaceKey = resolveRelayWorkspaceKey({ workspaceKey }) + if (resolvedWorkspaceKey) { + env.RELAY_WORKSPACE_KEY = resolvedWorkspaceKey + env.RELAY_API_KEY = resolvedWorkspaceKey } else { // No workspace key in the daemon env: the spawned agent's agent-relay MCP // will boot WITHOUT credentials, so it joins a bare relaycast workspace and @@ -680,11 +688,6 @@ function nonEmpty(value: string | undefined): string | undefined { return trimmed || undefined } -function relayWorkspaceKeyFromApiKey(value: string | undefined): string | undefined { - const trimmed = nonEmpty(value) - return trimmed?.startsWith('rk_live_') ? trimmed : undefined -} - function messageInputFrom(input: SendInput): SendMessageInput { return { to: input.to, diff --git a/src/fleet/relay-workspace-key.ts b/src/fleet/relay-workspace-key.ts new file mode 100644 index 0000000..ac2c508 --- /dev/null +++ b/src/fleet/relay-workspace-key.ts @@ -0,0 +1,26 @@ +import { activeWorkspaceKey } from '@agent-relay/cloud' + +export interface ResolveRelayWorkspaceKeyOptions { + workspaceKey?: string + env?: NodeJS.ProcessEnv + activeWorkspaceKey?: (env: NodeJS.ProcessEnv) => string | undefined +} + +export function resolveRelayWorkspaceKey(options: ResolveRelayWorkspaceKeyOptions = {}): string | undefined { + const env = options.env ?? process.env + return nonEmpty(options.workspaceKey) + ?? nonEmpty(env.RELAY_WORKSPACE_KEY) + ?? nonEmpty(env.AGENT_RELAY_WORKSPACE_KEY) + ?? relayWorkspaceKeyFromApiKey(env.RELAY_API_KEY) + ?? nonEmpty((options.activeWorkspaceKey ?? activeWorkspaceKey)(env)) +} + +export function relayWorkspaceKeyFromApiKey(value: string | undefined): string | undefined { + const trimmed = nonEmpty(value) + return trimmed?.startsWith('rk_live_') ? trimmed : undefined +} + +function nonEmpty(value: string | undefined): string | undefined { + const trimmed = value?.trim() + return trimmed || undefined +} diff --git a/src/orchestrator/factory.test.ts b/src/orchestrator/factory.test.ts index 7577389..5d171ad 100644 --- a/src/orchestrator/factory.test.ts +++ b/src/orchestrator/factory.test.ts @@ -4467,7 +4467,7 @@ describe('FactoryLoop', () => { const mirrorIssue = realIssueFile(724, ready, { labels: [], title: '[factory] GitHub mirror without synced labels', - description: 'Issue body\n\nSource: https://github.com/AgentWorkforce/relayfile-adapters/issues/224', + description: 'Issue body\n\nSource: [https://github.com/AgentWorkforce/relayfile-adapters/issues/224]()', }) const mount = new FakeMountClient({ [issuePath(724)]: mirrorIssue }) const fleet = new FakeFleetClient() diff --git a/src/orchestrator/factory.ts b/src/orchestrator/factory.ts index 78f6192..50767ee 100644 --- a/src/orchestrator/factory.ts +++ b/src/orchestrator/factory.ts @@ -4139,8 +4139,26 @@ function githubMirrorRepoForIssue(issue: LinearIssue): string | undefined { } function githubRepoFromUrl(url: string | undefined): string | undefined { - const match = url?.match(/^https:\/\/github\.com\/([^/]+)\/([^/]+)\/issues\/\d+(?:[/?#].*)?$/iu) - return match?.[1] && match[2] ? `${match[1]}/${match[2]}` : undefined + for (const candidate of githubUrlCandidates(url)) { + const match = candidate.match(/^https:\/\/github\.com\/([^/]+)\/([^/]+)\/issues\/\d+(?:[/?#].*)?$/iu) + if (match?.[1] && match[2]) { + return `${match[1]}/${match[2]}` + } + } + return undefined +} + +function githubUrlCandidates(value: string | undefined): string[] { + if (!value) { + return [] + } + const candidates = new Set() + const trimmed = value.trim() + candidates.add(trimmed.replace(/^<|>$/gu, '')) + for (const match of trimmed.matchAll(/https:\/\/github\.com\/[^\s<>)\]]+/giu)) { + candidates.add(match[0].replace(/[),.;]+$/gu, '')) + } + return [...candidates] } function labelRoutesForIssue( diff --git a/src/writeback/linear.ts b/src/writeback/linear.ts index cd24b69..610c951 100644 --- a/src/writeback/linear.ts +++ b/src/writeback/linear.ts @@ -22,6 +22,8 @@ export interface MountLinearWritebackConfig { requireTeamKey?: string } logger?: Pick + readbackConfirmAttempts?: number + readbackConfirmDelayMs?: number } export interface LinearCreateIssuePayload extends Record { @@ -130,8 +132,8 @@ const createIssuePath = (payload: LinearCreateIssuePayload): string => { const looksLikeProviderIssueIdentifier = (value: string): boolean => /^[A-Z][A-Z0-9]*-/u.test(value) -const READBACK_CONFIRM_ATTEMPTS = 3 -const READBACK_CONFIRM_DELAY_MS = 250 +const READBACK_CONFIRM_ATTEMPTS = 30 +const READBACK_CONFIRM_DELAY_MS = 1000 const delay = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)) const confirmWriteback = async ( @@ -139,6 +141,7 @@ const confirmWriteback = async ( path: string, verify: () => Promise, logger: Pick, + options: { attempts: number; delayMs: number }, ): Promise => { await assertWritebackAcked(mount, path) // getOp can return a FAKED success on a busy/wedged mount ("workspace write @@ -146,7 +149,7 @@ const confirmWriteback = async ( // eventual-consistency lag; if it never confirms, the write did NOT land — // throw instead of silently faking success (which previously left issues // un-advanced while the factory believed they had advanced). - for (let attempt = 0; attempt < READBACK_CONFIRM_ATTEMPTS; attempt += 1) { + for (let attempt = 0; attempt < options.attempts; attempt += 1) { let confirmed = false try { confirmed = await verify() @@ -156,9 +159,9 @@ const confirmWriteback = async ( if (confirmed) { return } - if (attempt < READBACK_CONFIRM_ATTEMPTS - 1) { - logger.warn?.(`[factory-sdk] Linear writeback read-back for ${path} not yet confirmed (attempt ${attempt + 1}/${READBACK_CONFIRM_ATTEMPTS}); retrying`) - await delay(READBACK_CONFIRM_DELAY_MS) + if (attempt < options.attempts - 1) { + logger.warn?.(`[factory-sdk] Linear writeback read-back for ${path} not yet confirmed (attempt ${attempt + 1}/${options.attempts}); retrying`) + await delay(options.delayMs) } } throw new Error(`[factory-sdk] Linear writeback for ${path} acked but the read-back never confirmed it landed; the write did not propagate`) @@ -174,12 +177,20 @@ const assertWritebackAcked = async ( } } +function positiveInteger(value: unknown, fallback: number): number { + return typeof value === 'number' && Number.isInteger(value) && value > 0 ? value : fallback +} + export const MountLinearWriteback = ( mount: MountClient, configOrStateIds?: LinearStateIds | MountLinearWritebackConfig, ) => { const safety = safetyFromConfig(configOrStateIds) const logger = (asRecord(configOrStateIds)?.logger as Pick | undefined) ?? console + const readbackConfirm = { + attempts: positiveInteger(asRecord(configOrStateIds)?.readbackConfirmAttempts, READBACK_CONFIRM_ATTEMPTS), + delayMs: positiveInteger(asRecord(configOrStateIds)?.readbackConfirmDelayMs, READBACK_CONFIRM_DELAY_MS), + } const canonicalByPath = new Map() const seedCanonical = ( @@ -255,7 +266,7 @@ export const MountLinearWriteback = ( stateId, }, { guarded: true }) updateCanonicalState(path, issue, canonical, stateId) - await confirmWriteback(mount, path, () => verifyStateReadback(mount, issue, stateId), logger) + await confirmWriteback(mount, path, () => verifyStateReadback(mount, issue, stateId), logger, readbackConfirm) }, async postComment(issue: LinearIssue, body: string): Promise { @@ -264,7 +275,7 @@ export const MountLinearWriteback = ( const name = linearCommentName(issue, body) const path = linearCommentPath(issuePath(issue), name) await mount.writeFile(path, linearCommentPayload(issue, body), { guarded: true }) - await confirmWriteback(mount, path, () => verifyCommentReadback(mount, issue, name), logger) + await confirmWriteback(mount, path, () => verifyCommentReadback(mount, issue, name), logger, readbackConfirm) }, async createIssue(payload: LinearCreateIssuePayload): Promise<{ path: string }> { @@ -279,7 +290,7 @@ export const MountLinearWriteback = ( } catch { return false } - }, logger) + }, logger, readbackConfirm) return { path } }, diff --git a/src/writeback/writeback.test.ts b/src/writeback/writeback.test.ts index 98feacc..5721c99 100644 --- a/src/writeback/writeback.test.ts +++ b/src/writeback/writeback.test.ts @@ -245,6 +245,39 @@ describe('MountLinearWriteback', () => { await expect(linear.verify(issue, { stateId: 'implementing-state' })).rejects.toThrow(/not acked/) }) + it('waits through delayed Linear read-back propagation', async () => { + class EventuallyConsistentMountClient extends FakeMountClient { + staleReadsRemaining = new Map() + + override async writeFile(path: string, content: unknown): Promise { + await super.writeFile(path, content) + this.staleReadsRemaining.set(path, 3) + } + + override async readFile(path: string): Promise<{ content: unknown; revision?: string }> { + const remaining = this.staleReadsRemaining.get(path) ?? 0 + if (remaining > 0) { + this.staleReadsRemaining.set(path, remaining - 1) + return { content: { stateId: 'old-state' } } + } + return super.readFile(path) + } + } + + const mount = new EventuallyConsistentMountClient({ + [issuePath]: wrappedIssueRecord(), + }) + const warn = () => {} + const linear = MountLinearWriteback(mount, { + logger: { warn }, + readbackConfirmAttempts: 5, + readbackConfirmDelayMs: 1, + }) + + await expect(linear.setState(issue, 'implementing-state')).resolves.toBeUndefined() + expect(await linear.verify(issue, { stateId: 'implementing-state' })).toBe(true) + }) + it('throws (no faked success) when the read-back never confirms the write landed', async () => { // A getOp ack can be faked-success by a busy/wedged mount; if the read-back // never reflects the write, it did NOT land — the writeback must fail loudly @@ -265,7 +298,11 @@ describe('MountLinearWriteback', () => { const mount = new StaleMountClient({ [issuePath]: wrappedIssueRecord(), }) - const linear = MountLinearWriteback(mount, { logger: { warn: () => {} } }) + const linear = MountLinearWriteback(mount, { + logger: { warn: () => {} }, + readbackConfirmAttempts: 3, + readbackConfirmDelayMs: 1, + }) await expect(linear.setState(issue, 'implementing-state')).rejects.toThrow(/read-back never confirmed it landed/u) await expect(linear.postComment(issue, 'Agent dispatched after stale mirror')).rejects.toThrow(/read-back never confirmed it landed/u)