diff --git a/README.md b/README.md index 45f7491..e825263 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,7 @@ factory factory run-once --config ./factory.config.json --dry-run | `factory status` | Print current factory status as JSON. | | `factory triage ` | Triage one issue and print the decision. | | `factory dispatch ` | Triage + dispatch one issue. Honors `--dry-run`. | +| `factory canary ` | Assert a known "Ready for Agent" issue is dispatch-ready by the real dry-run triage path. Prints `{ok,issue,status,reason}`; exits non-zero (with the skip reason) if it isn't. | Global options work anywhere in the args: `--config `, `--dry-run`, `--backend `. The internal backend reuses a relay broker that's @@ -113,6 +114,20 @@ already running for your workspace, and starts one if none is. (There are a few more operational commands — `loop-status`, `kill-loop`, `reap-orphans`, `close-probe` — for running the daemon in production.) +### Scheduled sync-fidelity canary + +`factory canary` is the regression detector for upstream sync drift: if a synced +issue stops carrying enough state to be dispatchable (e.g. the Linear sync +regresses to records without `state.id`), a known-good issue flips from +dispatch-ready to skipped. Run it on a schedule against a standing "Ready for +Agent" canary issue and alert on failure. + +`scripts/factory-canary.sh` wraps the command for cron/launchd: it runs from your +deployment dir (reusing the running relay broker), bounds a hung run, and posts a +Slack alert via `FACTORY_CANARY_SLACK_WEBHOOK` on failure. See +`scripts/com.agentrelay.factory-canary.plist.example` for an every-6h launchd +template. + ## Tell it what to work on Two ways to hand the factory an issue — both are just labeling/titling, nothing diff --git a/package.json b/package.json index cc81042..5ee9955 100644 --- a/package.json +++ b/package.json @@ -20,6 +20,8 @@ "files": [ "dist", "bin/factory.mjs", + "scripts/factory-canary.sh", + "scripts/com.agentrelay.factory-canary.plist.example", "package.json", "README.md" ], diff --git a/scripts/com.agentrelay.factory-canary.plist.example b/scripts/com.agentrelay.factory-canary.plist.example new file mode 100644 index 0000000..eb743c6 --- /dev/null +++ b/scripts/com.agentrelay.factory-canary.plist.example @@ -0,0 +1,53 @@ + + + + + Label + com.agentrelay.factory-canary + + ProgramArguments + + /bin/bash + __FACTORY_WORKDIR__/scripts/factory-canary.sh + + + WorkingDirectory + __FACTORY_WORKDIR__ + + EnvironmentVariables + + FACTORY_CANARY_ISSUE + AR-305 + FACTORY_WORKDIR + __FACTORY_WORKDIR__ + FACTORY_BIN + __FACTORY_BIN__ + FACTORY_CANARY_SLACK_WEBHOOK + + + + StartInterval + 21600 + RunAtLoad + + StandardOutPath + __FACTORY_WORKDIR__/.factory-canary.log + StandardErrorPath + __FACTORY_WORKDIR__/.factory-canary.log + + diff --git a/scripts/factory-canary.sh b/scripts/factory-canary.sh new file mode 100755 index 0000000..55412e8 --- /dev/null +++ b/scripts/factory-canary.sh @@ -0,0 +1,89 @@ +#!/usr/bin/env bash +# +# factory-canary.sh — scheduled sync-fidelity regression detector. +# +# Runs `factory canary ` against the LIVE relayfile mount and asserts a +# known "Ready for Agent" issue is still classified dispatch-ready by the real +# triage path. If it ever flips to "skipped" (e.g. the Linear sync regresses to +# sparse records with no state.id), this exits non-zero and alerts — catching the +# regression before it silently blocks every factory dispatch. +# +# Run it on a schedule (cron/launchd) from your factory deployment directory — +# the one holding factory.config.json, where the relayfile mount + relay broker +# already live (so the canary reuses the running broker rather than spawning one). +# See scripts/com.agentrelay.factory-canary.plist.example for a launchd template. +# +# Config (env vars): +# FACTORY_CANARY_ISSUE Linear issue key to check (default: the first arg) +# FACTORY_WORKDIR deployment dir with factory.config.json (default: cwd) +# FACTORY_CONFIG config path, relative to FACTORY_WORKDIR (default: factory.config.json) +# FACTORY_BIN path to factory.mjs (default: this repo's bin/factory.mjs) +# FACTORY_BACKEND --backend value (default: internal) +# FACTORY_CANARY_TIMEOUT seconds before the canary is considered hung (default: 180) +# FACTORY_CANARY_SLACK_WEBHOOK optional Slack incoming-webhook URL for failure alerts +# +# Exit codes: 0 = dispatch-ready (healthy); 1 = NOT ready / error / hung. + +set -uo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ISSUE="${FACTORY_CANARY_ISSUE:-${1:-}}" +WORKDIR="${FACTORY_WORKDIR:-$PWD}" +CONFIG="${FACTORY_CONFIG:-factory.config.json}" +BIN="${FACTORY_BIN:-$SCRIPT_DIR/../bin/factory.mjs}" +BACKEND="${FACTORY_BACKEND:-internal}" +TIMEOUT="${FACTORY_CANARY_TIMEOUT:-180}" +TS="$(date -u +%Y-%m-%dT%H:%M:%SZ)" + +if [[ -z "$ISSUE" ]]; then + echo "[$TS] factory-canary: no issue key (set FACTORY_CANARY_ISSUE or pass an arg)" >&2 + exit 1 +fi +if [[ ! -f "$BIN" ]]; then + echo "[$TS] factory-canary: factory bin not found at $BIN" >&2 + exit 1 +fi +cd "$WORKDIR" || { echo "[$TS] factory-canary: cannot cd to $WORKDIR" >&2; exit 1; } + +# The canary runs the real dry-run triage path (no agents spawned) and prints a +# JSON verdict {ok,issue,status,reason}; exit code mirrors ok. A hung run +# (broker/mount wedge) is bounded by FACTORY_CANARY_TIMEOUT. +RUN=(node "$BIN" factory canary "$ISSUE" --config "$CONFIG" --backend "$BACKEND") +# A hung run (broker/mount wedge) MUST be bounded — an unbounded canary on a +# scheduler (launchd/cron) can wedge the slot forever and suppress later alerts. +# macOS has no `timeout` by default; coreutils ships it as `gtimeout`. If neither +# is present, fail closed rather than run without a deadline. +TIMEOUT_BIN="" +if command -v timeout >/dev/null 2>&1; then + TIMEOUT_BIN="timeout" +elif command -v gtimeout >/dev/null 2>&1; then + TIMEOUT_BIN="gtimeout" +fi +if [[ -z "$TIMEOUT_BIN" ]]; then + echo "[$TS] factory-canary: no timeout utility found (install coreutils for 'timeout'/'gtimeout'); refusing to run unbounded" >&2 + exit 1 +fi +OUT="$("$TIMEOUT_BIN" "$TIMEOUT" "${RUN[@]}" 2>/dev/null)" +CODE=$? +if [[ $CODE -eq 124 ]]; then + echo "[$TS] factory-canary: TIMED OUT after ${TIMEOUT}s (broker/mount may be wedged)" >&2 +fi + +# The CLI prints a pretty-printed (multi-line) JSON verdict, so parse the whole +# output — not just the last line (which is only the closing `}`). +echo "[$TS] factory-canary $ISSUE -> exit $CODE" +[[ $CODE -eq 0 ]] && exit 0 + +REASON="$(printf '%s' "$OUT" | node -e 'let s="";process.stdin.on("data",d=>s+=d).on("end",()=>{try{const v=JSON.parse(s);console.log(`${v.status||"error"}: ${v.reason||"unknown"}`)}catch{console.log("unparseable verdict")}})' 2>/dev/null)" +MSG=":rotating_light: factory canary FAILED for ${ISSUE} — ${REASON}. Sync fidelity may have regressed (issue no longer dispatch-ready)." +echo "[$TS] $MSG" >&2 + +if [[ -n "${FACTORY_CANARY_SLACK_WEBHOOK:-}" ]]; then + curl -sS -m 15 -X POST -H 'Content-type: application/json' \ + --data "$(node -e 'process.stdout.write(JSON.stringify({text:process.argv[1]}))' "$MSG")" \ + "$FACTORY_CANARY_SLACK_WEBHOOK" >/dev/null 2>&1 \ + && echo "[$TS] factory-canary: posted Slack alert" >&2 \ + || echo "[$TS] factory-canary: Slack alert post failed" >&2 +fi + +exit 1 diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 5a0ba0d..c1c1fd3 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -42,7 +42,18 @@ describe('FactoryConfigSchema', () => { // No hardcoded state defaults: omitted stateIds resolve to {} and are filled // at runtime from linear.states (by name) or explicit stateIds. expect(parsed.stateIds).toEqual({}) - expect(parsed.linear).toEqual({ states: {}, statesByTeam: {} }) + // The factory ships its workflow-state NAME conventions as defaults (so a + // consumer needn't configure them); statesByTeam stays empty. + expect(parsed.linear).toEqual({ + states: { + readyForAgent: 'Ready for Agent', + agentImplementing: 'Agent Implementing', + done: 'Done', + inPlanning: 'In Planning', + humanReview: 'In Human Review', + }, + statesByTeam: {}, + }) expect(parsed.safety).toEqual({ requireTitlePrefix: '[factory-e2e]', requireLabel: 'factory', diff --git a/src/config/schema.ts b/src/config/schema.ts index 9551c55..55d800f 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -98,10 +98,22 @@ const babysitterSchema = z.object({ enabled: z.boolean().default(false), }).default({}) +// The factory owns its workflow-state NAME conventions; consumers (e.g. pear) +// don't hand-configure them. These names let the factory resolve a role from a +// synced record that carries state.name but no state.id (sparse-sync fallback). +// A workspace that names states differently can override via config. +const DEFAULT_LINEAR_STATE_NAMES = { + readyForAgent: 'Ready for Agent', + agentImplementing: 'Agent Implementing', + done: 'Done', + inPlanning: 'In Planning', + humanReview: 'In Human Review', +} + const linearSchema = z.object({ - states: linearRoleNamesSchema, + states: linearRoleNamesSchema.default(DEFAULT_LINEAR_STATE_NAMES), statesByTeam: z.record(z.string(), linearRoleNamesSchema).default({}), -}).default({}) +}).default({ states: DEFAULT_LINEAR_STATE_NAMES, statesByTeam: {} }) const stateIdsSchema = z.object({ readyForAgent: z.string().optional(), diff --git a/src/dispatch/templates.test.ts b/src/dispatch/templates.test.ts index 33c29eb..4e77f87 100644 --- a/src/dispatch/templates.test.ts +++ b/src/dispatch/templates.test.ts @@ -52,7 +52,7 @@ describe('renderAgentTask', () => { expect(task).toContain('GitHub repo: AgentWorkforce/pear') expect(task).toContain('Wait for a DM from the implementer(s): ar-123-impl-ui, ar-123-impl-broker.') - expect(task).toContain('Read the PR diff via `.integrations/github/repos`.') + expect(task).toContain('Read the PR diff via .integrations/github/repos.') expect(task).toContain('Post review comments via the GitHub writeback path.') expect(task).toContain('DM the implementer with specific feedback if changes needed, or approve if good.') expect(task).toContain('DM `broker` when the review cycle is complete.') @@ -67,7 +67,7 @@ describe('renderAgentTask', () => { reviewerName: 'ar-123-review', implementerNames: ['ar-123-impl'], pr: { number: 482, url: 'https://github.com/AgentWorkforce/pear/pull/482' }, - slackDispatchThread: { channel: 'C123', threadId: '170.000' }, + slackDispatchThread: { channel: 'C123', threadId: '170.000', mountRoot: '/work/.integrations' }, }) // Carries the original spec (definition of done) and the open PR ref. @@ -195,12 +195,12 @@ describe('renderAgentTask', () => { role: 'implementer', config: baseConfig, reviewerName: 'ar-123-review', - slackDispatchThread: { channel: 'C123', threadId: '169.000' }, + slackDispatchThread: { channel: 'C123', threadId: '169.000', mountRoot: '/work/.integrations' }, }) // The Slack-thread writeback replaces the old relay DM pattern. expect(task).toContain('write your question to this issue\'s Slack dispatch thread via the .integrations mount') - expect(task).toContain('Write path: .integrations/slack/channels/C123/messages/169_000/replies/question.json') + expect(task).toContain('Write path: /work/.integrations/slack/channels/C123/messages/169_000/replies/question.json') expect(task).toContain('Write a JSON object with a "text" field') expect(task).toContain('Continue with safe reversible work while waiting for a reply.') // No relay DM or legacy patterns. diff --git a/src/dispatch/templates.ts b/src/dispatch/templates.ts index 05519e7..c52b992 100644 --- a/src/dispatch/templates.ts +++ b/src/dispatch/templates.ts @@ -30,13 +30,30 @@ export interface RenderAgentTaskInput { slackDispatchThread?: { channel: string threadId: string + /** + * Absolute path to the .integrations mount root the agent can write to. The + * agent runs in its repo clonePath, NOT the daemon's cwd where .integrations + * lives, so a bare relative `.integrations/...` path is unreachable — the + * writeback path must be absolute. + */ + mountRoot: string } /** Pre-rendered writeback instructions for connected integrations. */ integrationInstructions?: string + /** + * Absolute path to the .integrations mount root. The agent runs in its repo + * clonePath, not the daemon cwd where .integrations lives, so every + * `.integrations/...` reference (github reads, slack writes) must be absolute. + * Falls back to the bare relative root when absent (e.g. tests). + */ + integrationsMountRoot?: string } export function renderAgentTask(input: RenderAgentTaskInput): string { const repo = normalizeRepo(input.route.repo) + // Absolute mount root for every .integrations reference (the agent's cwd is + // its repo clone, where a relative .integrations/... does not resolve). + const mountRoot = input.integrationsMountRoot ?? '.integrations' const cloneInstruction = input.route.clonePath ? `Repo path: ${input.route.clonePath}` : `Clone/worktree: clone AgentWorkforce/${repo} and work in your own isolated git worktree before editing.` @@ -68,8 +85,11 @@ export function renderAgentTask(input: RenderAgentTaskInput): string { ? [ '', 'If you are blocked or need a human answer mid-task, write your question to this issue\'s Slack dispatch thread via the .integrations mount.', - `Write path: .integrations/slack/channels/${input.slackDispatchThread.channel}/messages/${input.slackDispatchThread.threadId.replace('.', '_')}/replies/question.json`, + // Absolute path: the agent runs in its repo clone, not the daemon cwd + // where .integrations lives, so a relative path would be unreachable. + `Write path: ${input.slackDispatchThread.mountRoot}/slack/channels/${input.slackDispatchThread.channel}/messages/${input.slackDispatchThread.threadId.replaceAll('.', '_')}/replies/question.json`, 'Write a JSON object with a "text" field containing your question.', + 'The human\'s reply will be delivered to you as an `` system message injected into your session — wait for it, do not poll.', 'Continue with safe reversible work while waiting for a reply.', ] : [] @@ -79,7 +99,7 @@ export function renderAgentTask(input: RenderAgentTaskInput): string { ? `PR #${input.pr.number}${input.pr.url ? ` (${input.pr.url})` : ''}` : 'the open PR for this issue' const chatLine = input.slackDispatchThread - ? 'You can also use this issue\'s Slack dispatch thread to discuss the PR with the human (status, trade-offs, open questions) — proactively write via .integrations/slack if it would help.' + ? `You can also use this issue's Slack dispatch thread to discuss the PR with the human (status, trade-offs, open questions) — proactively write via ${mountRoot}/slack if it would help.` : 'If a human can be reached, proactively offer to discuss the PR (status, trade-offs, open questions) via the .integrations writeback path.' // Match the prompt to where the issue actually lands so the babysitter is not // told to "stop at Human Review" while the factory is configured to finish at @@ -100,7 +120,7 @@ export function renderAgentTask(input: RenderAgentTaskInput): string { `You are the PR babysitter for ${input.issue.key}. A PR is already open: ${prRef}.`, jobLine, 'Unlike a conservative reviewer, you SHOULD fix things directly and aggressively — you hold the original issue spec as the definition of done, and you have the rest of the dispatched team to draw on.', - 'Read the PR diff, CI checks, and review threads via `.integrations/github/repos`.', + `Read the PR diff, CI checks, and review threads via ${mountRoot}/github/repos.`, 'Address every review comment for real — make substantive code changes when the feedback calls for it, not just lint/format touch-ups.', 'Resolve any merge conflicts: rebase onto the base branch and reconcile using judgment anchored in the issue spec; never weaken tests or flip safety defaults just to force a merge.', 'Fix failing CI — change the code and tests as needed until the checks pass. A red check is not done.', @@ -122,7 +142,7 @@ export function renderAgentTask(input: RenderAgentTaskInput): string { ...questionInstructions, '', `Wait for a DM from the implementer(s): ${implementers}.`, - 'Read the PR diff via `.integrations/github/repos`.', + `Read the PR diff via ${mountRoot}/github/repos.`, 'Post review comments via the GitHub writeback path.', 'DM the implementer with specific feedback if changes needed, or approve if good.', 'DM `broker` when the review cycle is complete.', diff --git a/src/fleet/ensure-relay-broker.test.ts b/src/fleet/ensure-relay-broker.test.ts index 646a845..15f8c07 100644 --- a/src/fleet/ensure-relay-broker.test.ts +++ b/src/fleet/ensure-relay-broker.test.ts @@ -42,11 +42,11 @@ describe('ensureRelayBroker', () => { }) const spawn = vi.fn(async () => spawned) - const handle = await ensureRelayBroker({ cwd: '/work', connect, spawn }) + const handle = await ensureRelayBroker({ cwd: '/work', connect, spawn, env: {} }) expect(handle.client).toBe(spawned) expect(handle.started).toBe(true) - expect(spawn).toHaveBeenCalledWith({ cwd: '/work' }) + expect(spawn).toHaveBeenCalledWith({ cwd: '/work', workspaceKey: undefined }) }) it('surfaces the connect error without spawning when autoStart is false', async () => { @@ -71,7 +71,26 @@ describe('ensureRelayBroker', () => { }, spawn: async () => fakeClient('spawned'), logger: { info }, + env: {}, }) - expect(info).toHaveBeenCalledWith('[factory] no relay broker running; starting one', { reason: 'boom' }) + expect(info).toHaveBeenCalledWith('[factory] no relay broker running; starting one', { reason: 'boom', joiningWorkspace: false }) + }) + + it('threads a workspace key (env or option) into spawn so the broker JOINS', async () => { + const spawn = vi.fn(async () => fakeClient('spawned')) + await ensureRelayBroker({ connect: () => { throw new Error('no broker') }, spawn, env: { RELAY_WORKSPACE_KEY: 'rk_live_test' } }) + expect(spawn).toHaveBeenCalledWith(expect.objectContaining({ workspaceKey: 'rk_live_test' })) + + spawn.mockClear() + await ensureRelayBroker({ connect: () => { throw new Error('no broker') }, spawn, workspaceKey: 'rk_live_explicit', env: {} }) + expect(spawn).toHaveBeenCalledWith(expect.objectContaining({ workspaceKey: 'rk_live_explicit' })) + }) + + it('fails with actionable guidance when there is no broker and no workspace key', async () => { + await expect(ensureRelayBroker({ + connect: () => { throw new Error('no broker') }, + spawn: async () => { throw new Error('insert into workspaces failed') }, + env: {}, + })).rejects.toThrow(/RELAY_WORKSPACE_KEY/u) }) }) diff --git a/src/fleet/ensure-relay-broker.ts b/src/fleet/ensure-relay-broker.ts index 208f710..77d953d 100644 --- a/src/fleet/ensure-relay-broker.ts +++ b/src/fleet/ensure-relay-broker.ts @@ -9,10 +9,15 @@ export interface EnsureRelayBrokerOptions { // When false, never start a broker — surface the connect error instead. This // lets callers opt back into strict reuse-only behavior. autoStart?: boolean + // Workspace key the spawned broker uses to JOIN the existing workspace instead + // of creating a new (colliding) one. Defaults to RELAY_WORKSPACE_KEY / + // AGENT_RELAY_WORKSPACE_KEY / RELAY_API_KEY from the env. + workspaceKey?: string logger?: Logger // Seams for tests so they never connect to or spawn a real broker. connect?: (options: { cwd?: string; connectionPath?: string }) => HarnessDriverClientLike - spawn?: (options: { cwd?: string }) => Promise + spawn?: (options: { cwd?: string; workspaceKey?: string }) => Promise + env?: NodeJS.ProcessEnv } export interface RelayBrokerHandle { @@ -33,6 +38,7 @@ export interface RelayBrokerHandle { export async function ensureRelayBroker(options: EnsureRelayBrokerOptions = {}): Promise { const connect = options.connect ?? ((opts) => HarnessDriverClient.connect(opts)) const spawn = options.spawn ?? ((opts) => HarnessDriverClient.spawn(opts)) + const env = options.env ?? process.env try { const client = connect({ cwd: options.cwd, connectionPath: options.connectionPath }) @@ -42,10 +48,39 @@ export async function ensureRelayBroker(options: EnsureRelayBrokerOptions = {}): if (options.autoStart === false) { throw error } + // Spawn a broker. It must JOIN the existing workspace, not create a new one: + // a keyless `init` tries to create a workspace and collides ("failed to + // initialize relaycast session: insert into workspaces"). The workspace key + // (rk_live_…) makes the broker join. Pear injects it at spawn; standalone the + // operator supplies it via RELAY_WORKSPACE_KEY. + const workspaceKey = nonEmpty(options.workspaceKey) + ?? nonEmpty(env.RELAY_WORKSPACE_KEY) + ?? nonEmpty(env.AGENT_RELAY_WORKSPACE_KEY) + ?? nonEmpty(env.RELAY_API_KEY) options.logger?.info?.('[factory] no relay broker running; starting one', { reason: error instanceof Error ? error.message : String(error), + joiningWorkspace: Boolean(workspaceKey), }) - const client = await spawn({ cwd: options.cwd }) - return { client, started: true } + try { + const client = await spawn({ cwd: options.cwd, workspaceKey }) + return { client, started: true } + } catch (spawnError) { + if (!workspaceKey) { + throw new Error( + 'Failed to start a relay broker and no workspace key was available to join the existing workspace. ' + + 'Set RELAY_WORKSPACE_KEY (your rk_live_… workspace key) so the broker can JOIN your workspace, ' + + 'or start a broker first. ' + + `Underlying error: ${spawnError instanceof Error ? spawnError.message : String(spawnError)}`, + { cause: spawnError }, + ) + } + throw spawnError + } } } + +const nonEmpty = (value: string | undefined): string | undefined => { + if (typeof value !== 'string') return undefined + const trimmed = value.trim() + return trimmed.length > 0 ? trimmed : undefined +} diff --git a/src/fleet/internal-fleet-client.ts b/src/fleet/internal-fleet-client.ts index 057f3ab..e29100e 100644 --- a/src/fleet/internal-fleet-client.ts +++ b/src/fleet/internal-fleet-client.ts @@ -624,6 +624,16 @@ function relayMcpEnv(agentName: string, agentToken?: string): Record { states: { readyForAgent: 'Done', agentImplementing: 'Done', inPlanning: 'Done', done: 'Done' }, })).rejects.toThrow(/ambiguous/) }) + + it('tolerates a team-less-ambiguous global default when each subscribed team resolves', async () => { + // The factory's default state NAMES exist in BOTH teams, so a team-less + // lookup is ambiguous — but with teams configured, per-team resolution is + // authoritative and startup must NOT fail (regression guard for the + // global-default-forces-teamless-lookup P1). + const reader = makeReader( + [{ id: 'ar-r' }, { id: 'ar-i' }, { id: 'ar-p' }, { id: 'ar-d' }, { id: 'eng-r' }, { id: 'eng-i' }, { id: 'eng-p' }, { id: 'eng-d' }], + { + 'ar-r': { name: 'Ready for Agent', team_key: 'AR' }, + 'ar-i': { name: 'Agent Implementing', team_key: 'AR' }, + 'ar-p': { name: 'In Planning', team_key: 'AR' }, + 'ar-d': { name: 'Done', team_key: 'AR' }, + 'eng-r': { name: 'Ready for Agent', team_key: 'ENG' }, + 'eng-i': { name: 'Agent Implementing', team_key: 'ENG' }, + 'eng-p': { name: 'In Planning', team_key: 'ENG' }, + 'eng-d': { name: 'Done', team_key: 'ENG' }, + }, + ) + const states = await resolveFactoryStates(reader, { + states: { readyForAgent: 'Ready for Agent', agentImplementing: 'Agent Implementing', inPlanning: 'In Planning', done: 'Done' }, + teams: ['AR', 'ENG'], + }) + expect(states.idFor('AR', 'readyForAgent')).toBe('ar-r') + expect(states.idFor('ENG', 'done')).toBe('eng-d') + }) + + it('surfaces an ambiguous OPTIONAL role instead of silently disabling it', async () => { + // humanReview is optional, but an ambiguous NAME is a real misconfig — it must + // throw, not resolve to undefined (which would quietly drop humanReview). + const reader = makeReader( + [{ id: 's1' }, { id: 's2' }, { id: 's3' }, { id: 's4' }, { id: 'h1' }, { id: 'h2' }], + { + s1: { name: 'Ready for Agent' }, + s2: { name: 'Agent Implementing' }, + s3: { name: 'In Planning' }, + s4: { name: 'Done' }, + h1: { name: 'In Human Review', team_key: 'AR' }, + h2: { name: 'In Human Review', team_key: 'ENG' }, + }, + ) + await expect(resolveFactoryStates(reader, { + states: { readyForAgent: 'Ready for Agent', agentImplementing: 'Agent Implementing', inPlanning: 'In Planning', done: 'Done', humanReview: 'In Human Review' }, + })).rejects.toThrow(/ambiguous/) + }) + + it('leaves an OPTIONAL role unresolved when the states catalog is unavailable', async () => { + // No /linear/states (e.g. unreadable under the mount token): required roles + // fall back to pinned UUIDs and the optional humanReview is left unresolved, + // rather than aborting startup. + const reader: LinearStateReader = { + async readFile() { + throw new Error('mount has no /linear/states') + }, + } + const states = await resolveFactoryStates(reader, { + states: { humanReview: 'In Human Review' }, + stateIds: { readyForAgent: 'x1', agentImplementing: 'x2', inPlanning: 'x3', done: 'x4' }, + }) + expect(states.idFor(undefined, 'readyForAgent')).toBe('x1') + expect(states.optionalIdFor(undefined, 'humanReview')).toBeUndefined() + expect(states.hasHumanReview()).toBe(false) + }) }) describe('stateResolutionFromIds', () => { diff --git a/src/linear/state-resolver.ts b/src/linear/state-resolver.ts index 142118c..90d23e4 100644 --- a/src/linear/state-resolver.ts +++ b/src/linear/state-resolver.ts @@ -123,6 +123,14 @@ const str = (value: unknown): string | undefined => (typeof value === 'string' & // Lazily load and cache the /linear/states catalog. Only invoked when at least // one role is configured by name, so explicit-UUID setups never read it. +// The states catalog could not be read at all (e.g. /linear/states unreadable +// under the mount token). Distinct from a *resolution* failure (ambiguous name, +// cross-team, no match) so optional roles can tolerate an absent catalog without +// also swallowing genuine misconfiguration. +class CatalogUnavailableError extends Error {} + +const isCatalogUnavailable = (error: unknown): boolean => error instanceof CatalogUnavailableError + class StateCatalog { #records: StateRecord[] | undefined constructor(private readonly reader: LinearStateReader) {} @@ -133,7 +141,7 @@ class StateCatalog { try { index = (await this.reader.readFile(STATES_INDEX_PATH)).content } catch (error) { - throw new Error( + throw new CatalogUnavailableError( `Cannot resolve Linear state names: ${STATES_INDEX_PATH} is unavailable ` + `(${error instanceof Error ? error.message : String(error)}). ` + `Deploy the workflow-states resource or pin stateIds (UUIDs) in config.`, @@ -213,14 +221,29 @@ export async function resolveFactoryStates( // the states catalog is unavailable (e.g. /linear/states unreadable under the // mount token), fall back to the pinned UUID rather than failing startup — the // name still feeds the reverse name->id map below for read-side backfill. - const resolveRole = async (teamToken: string | undefined, role: FactoryStateRole): Promise => { + // `tolerant` is for the team-less default pass when subscribed teams exist: + // each team is resolved authoritatively below, so a team-less ambiguity must + // not abort startup (addresses the global-default-forces-teamless-lookup P1). + const resolveRole = async ( + teamToken: string | undefined, + role: FactoryStateRole, + tolerant: boolean, + ): Promise => { const perTeamName = teamToken ? byTeamNames[norm(teamToken)]?.[role] : undefined const name = perTeamName ?? globalNames[role] if (name) { try { return await catalog.resolve(name, teamToken) } catch (error) { + // Fall back to the pinned UUID when the catalog can't resolve the name. if (explicitIds[role]) return explicitIds[role] + // Best-effort team-less default pass: another team will fill this role. + if (tolerant) return undefined + // An OPTIONAL role (e.g. humanReview) tolerates an *absent* catalog + // (no /linear/states under the mount token) — but a real resolution + // failure (ambiguous name, cross-team, no match) must still surface, even + // for optional roles, rather than silently disabling the role. + if (!REQUIRED_ROLES.includes(role) && isCatalogUnavailable(error)) return undefined throw error } } @@ -236,10 +259,14 @@ export async function resolveFactoryStates( } const byTeam = new Map() - const resolveAllRoles = async (teamToken: string | undefined, enforce: boolean): Promise => { + const resolveAllRoles = async ( + teamToken: string | undefined, + enforce: boolean, + tolerant = false, + ): Promise => { const resolved: RoleIds = {} for (const role of FACTORY_STATE_ROLES) { - const id = await resolveRole(teamToken, role) + const id = await resolveRole(teamToken, role, tolerant) if (id) resolved[role] = id } const missing = REQUIRED_ROLES.filter((role) => !resolved[role]) @@ -256,8 +283,15 @@ export async function resolveFactoryStates( // global config meant to fill it; a per-team-only setup leaves it best-effort // (each subscribed team is enforced below, and idFor() throws at use if an // unconfigured team's issue ever appears). + // + // When teams ARE configured, the team-less default pass is tolerant: the + // factory's default `linear.states` names are global, so resolving them + // team-lessly would be ambiguous in a multi-team workspace and wrongly fail + // startup — even though each subscribed team resolves cleanly. Per-team passes + // (below) are authoritative; the team-less default is only a fallback. const hasGlobalConfig = Object.keys(globalNames).length > 0 || Object.keys(explicitIds).length > 0 - const defaultIds = await resolveAllRoles(undefined, hasGlobalConfig) + const hasTeamScopes = teamTokenByNorm.size > 0 + const defaultIds = await resolveAllRoles(undefined, hasGlobalConfig && !hasTeamScopes, hasTeamScopes) for (const [key, team] of teamTokenByNorm) { byTeam.set(key, await resolveAllRoles(team, true)) } diff --git a/src/mount/local-mount-preflight.test.ts b/src/mount/local-mount-preflight.test.ts index a5ce71a..44985b6 100644 --- a/src/mount/local-mount-preflight.test.ts +++ b/src/mount/local-mount-preflight.test.ts @@ -5,6 +5,7 @@ import { tmpdir } from 'node:os' import { afterEach, describe, expect, it, vi } from 'vitest' import { ensureLocalMount } from './local-mount-preflight' +import { resolveRelayfileCli } from './relayfile-binary' const spawnMock = vi.hoisted(() => vi.fn()) @@ -12,6 +13,16 @@ vi.mock('node:child_process', () => ({ spawn: spawnMock, })) +// Default the CLI resolver OFF so the existing tests exercise the raw-binary +// path deterministically (a real `relayfile` on PATH would otherwise be picked). +// CLI-path tests opt in by overriding this mock. +vi.mock('./relayfile-binary', async (importOriginal) => { + const actual = await importOriginal() + return { ...actual, resolveRelayfileCli: vi.fn(() => undefined) } +}) + +const resolveCliMock = vi.mocked(resolveRelayfileCli) + const originalRelayfileMountBin = process.env.RELAYFILE_MOUNT_BIN afterEach(() => { @@ -21,6 +32,8 @@ afterEach(() => { process.env.RELAYFILE_MOUNT_BIN = originalRelayfileMountBin } spawnMock.mockReset() + resolveCliMock.mockReset() + resolveCliMock.mockReturnValue(undefined) }) async function withTempDir(fn: (dir: string) => Promise): Promise { @@ -82,6 +95,142 @@ describe('ensureLocalMount', () => { }) }) + it('auto-refreshes a stale mount by re-spawning so writebacks propagate', async () => { + await withTempDir(async (dir) => { + await installFakeBinary(dir) + const stateDir = join(dir, '.integrations', '.relay') + const statePath = join(stateDir, 'state.json') + await mkdir(stateDir, { recursive: true }) + // stale: last reconcile well past the staleness threshold. + await writeFile(statePath, JSON.stringify({ + workspaceId: 'rw_test', + lastReconcileAt: new Date(Date.now() - 30 * 60 * 1000).toISOString(), + pid: process.pid, + }), 'utf8') + mockSuccessfulSpawn(async () => { + await writeFile(statePath, JSON.stringify({ + workspaceId: 'rw_test', + lastReconcileAt: new Date().toISOString(), + pid: process.pid, + }), 'utf8') + }) + + await expect(ensureLocalMount('rw_test', dir, { + stateWaitTimeoutMs: 100, + stateWaitPollMs: 1, + })).resolves.toBeUndefined() + expect(spawnMock).toHaveBeenCalled() + }) + }) + + it('warns without re-spawning a stale mount when refreshStaleMount is false', async () => { + await withTempDir(async (dir) => { + await installFakeBinary(dir) + const stateDir = join(dir, '.integrations', '.relay') + await mkdir(stateDir, { recursive: true }) + await writeFile(join(stateDir, 'state.json'), JSON.stringify({ + workspaceId: 'rw_test', + lastReconcileAt: new Date(Date.now() - 30 * 60 * 1000).toISOString(), + pid: process.pid, + }), 'utf8') + mockSuccessfulSpawn() + + await expect(ensureLocalMount('rw_test', dir, { refreshStaleMount: false })).resolves.toBeUndefined() + expect(spawnMock).not.toHaveBeenCalled() + }) + }) + + it('prefers the relayfile CLI when available: stop then start, no token plumbing', async () => { + await withTempDir(async (dir) => { + const cli = join(dir, 'relayfile') + await writeFile(cli, '#!/bin/sh\n', 'utf8') + await chmod(cli, 0o755) + resolveCliMock.mockReturnValue(cli) + const stateDir = join(dir, '.integrations', '.relay') + const statePath = join(stateDir, 'state.json') + + // stop -> exit 0 (no state); start -> writes a fresh state then exit 0. + spawnMock.mockImplementation((_cmd: string, args: string[]) => { + const child = new EventEmitter() as EventEmitter & { stderr: EventEmitter } + child.stderr = new EventEmitter() + const isStart = args[0] === 'start' + setTimeout(() => { + const finish = async (): Promise => { + if (isStart) { + await mkdir(stateDir, { recursive: true }) + await writeFile(statePath, JSON.stringify({ + workspaceId: 'rw_test', + lastReconcileAt: new Date().toISOString(), + pid: process.pid, + }), 'utf8') + } + } + void finish().then(() => child.emit('close', 0), () => child.emit('close', 1)) + }, 0) + return child + }) + + await expect(ensureLocalMount('rw_test', dir, { + stateWaitTimeoutMs: 100, + stateWaitPollMs: 1, + })).resolves.toBeUndefined() + + // raw relayfile-mount binary must NOT be used when the CLI is present. + expect(spawnMock).toHaveBeenCalledWith(cli, ['stop'], expect.objectContaining({ cwd: dir })) + expect(spawnMock).toHaveBeenCalledWith( + cli, + ['start', 'rw_test', '.integrations', '--background'], + expect.objectContaining({ cwd: dir }), + ) + // no --rehome (that's a raw-binary flag) and no RELAYFILE_MOUNT_BIN reliance. + const startCall = spawnMock.mock.calls.find((c) => c[1]?.[0] === 'start') + expect(startCall?.[1]).not.toContain('--rehome') + }) + }) + + it('tolerates a failing best-effort stop and still starts via the CLI', async () => { + await withTempDir(async (dir) => { + const cli = join(dir, 'relayfile') + await writeFile(cli, '#!/bin/sh\n', 'utf8') + await chmod(cli, 0o755) + resolveCliMock.mockReturnValue(cli) + const stateDir = join(dir, '.integrations', '.relay') + const statePath = join(stateDir, 'state.json') + + spawnMock.mockImplementation((_cmd: string, args: string[]) => { + const child = new EventEmitter() as EventEmitter & { stderr: EventEmitter } + child.stderr = new EventEmitter() + const isStart = args[0] === 'start' + setTimeout(() => { + if (!isStart) { + // stop fails (nothing mounted) — must be swallowed. + child.stderr.emit('data', Buffer.from('no mount running')) + child.emit('close', 1) + return + } + void mkdir(stateDir, { recursive: true }) + .then(() => writeFile(statePath, JSON.stringify({ + workspaceId: 'rw_test', + lastReconcileAt: new Date().toISOString(), + pid: process.pid, + }), 'utf8')) + .then(() => child.emit('close', 0), () => child.emit('close', 1)) + }, 0) + return child + }) + + await expect(ensureLocalMount('rw_test', dir, { + stateWaitTimeoutMs: 100, + stateWaitPollMs: 1, + })).resolves.toBeUndefined() + expect(spawnMock).toHaveBeenCalledWith( + cli, + ['start', 'rw_test', '.integrations', '--background'], + expect.objectContaining({ cwd: dir }), + ) + }) + }) + it('rejects a malformed state file instead of silently continuing', async () => { await withTempDir(async (dir) => { await installFakeBinary(dir) diff --git a/src/mount/local-mount-preflight.ts b/src/mount/local-mount-preflight.ts index dea8aaa..d364a1d 100644 --- a/src/mount/local-mount-preflight.ts +++ b/src/mount/local-mount-preflight.ts @@ -2,7 +2,7 @@ import { readFile } from 'node:fs/promises' import { join } from 'node:path' import { spawn } from 'node:child_process' -import { checkMountStaleness, resolveRelayfileMountBinary } from './relayfile-binary' +import { checkMountStaleness, resolveRelayfileCli, resolveRelayfileMountBinary } from './relayfile-binary' const STATE_FILE = '.integrations/.relay/state.json' @@ -13,6 +13,12 @@ interface EnsureLocalMountOptions { // `rw_` handle). Passed through to the staleness check so a handle-vs-UUID // state.json does not register as a spurious mismatch. acceptableWorkspaceIds?: readonly string[] + // When true (default), a stale mount is auto-refreshed (re-spawned) instead of + // merely warned about. A standalone `relayfile start` mount has no supervisor, + // so it can silently stop reconciling — and the factory would then ship + // writebacks into a mirror that never propagates them. Set false to restore + // warn-only behavior. + refreshStaleMount?: boolean } export async function ensureLocalMount( @@ -35,11 +41,33 @@ export async function ensureLocalMount( } const staleness = checkMountStaleness(stateFilePath, workspaceId, options.acceptableWorkspaceIds) - if (staleness.stale) { - const suffix = staleness.reason !== undefined ? ` (${staleness.reason})` : '' - process.stderr.write( - `[factory] local mount is stale${suffix}; writeback may not propagate. Run: relayfile stop && relayfile start ${workspaceId} .integrations --background\n`, + if (!staleness.stale) return + + const suffix = staleness.reason !== undefined ? ` (${staleness.reason})` : '' + const manualHint = `Run: relayfile stop && relayfile start ${workspaceId} .integrations --background` + + if (options.refreshStaleMount === false) { + process.stderr.write(`[factory] local mount is stale${suffix}; writeback may not propagate. ${manualHint}\n`) + return + } + + // Self-heal: re-spawn the mount so writebacks propagate, rather than silently + // shipping them into a stale mirror. spawnMount runs the relayfile-mount + // binary with --rehome, which re-establishes the mount in place. + process.stderr.write(`[factory] local mount is stale${suffix}; refreshing\n`) + try { + await spawnMount(workspaceId, startDir) + await waitForStateFile( + stateFilePath, + workspaceId, + options.stateWaitTimeoutMs, + options.stateWaitPollMs, + options.acceptableWorkspaceIds, ) + process.stderr.write('[factory] local mount refreshed\n') + } catch (error) { + const reason = error instanceof Error ? error.message : String(error) + process.stderr.write(`[factory] local mount is stale${suffix} and auto-refresh failed (${reason}); writeback may not propagate. ${manualHint}\n`) } } @@ -54,10 +82,37 @@ async function isMountStatePresent(stateFilePath: string): Promise { } async function spawnMount(workspaceId: string, startDir: string): Promise { - const binaryPath = resolveRelayfileMountBinary() + // Prefer the relayfile CLI: it resolves workspace credentials itself (the raw + // relayfile-mount binary requires a --token the factory has no clean way to + // supply) and bundles an up-to-date mount, so the factory can self-start the + // writeback mount unattended. Fall back to the raw binary only when no CLI is + // available. + const cli = resolveRelayfileCli() + if (cli) { + await spawnMountViaCli(cli, workspaceId, startDir) + return + } + await spawnMountViaRawBinary(workspaceId, startDir) +} + +async function spawnMountViaCli(cli: string, workspaceId: string, startDir: string): Promise { + // Best-effort stop first so a stale/dead mount registration does not make + // `start` reject (matches the documented `relayfile stop && relayfile start` + // recovery). A no-op when nothing is mounted here. + await runRelayfile(cli, ['stop'], startDir, workspaceId).catch(() => {}) + await runRelayfile(cli, ['start', workspaceId, '.integrations', '--background'], startDir, workspaceId) +} + +async function spawnMountViaRawBinary(workspaceId: string, startDir: string): Promise { + // Search from the deployment dir (where factory.config.json + the bundled + // @relayfile/mount live), not this module's install location. + const binaryPath = resolveRelayfileMountBinary(startDir) + await runRelayfile(binaryPath, ['start', workspaceId, '.integrations', '--background', '--rehome'], startDir, workspaceId) +} +function runRelayfile(command: string, args: string[], startDir: string, workspaceId: string): Promise { return new Promise((resolve, reject) => { - const child = spawn(binaryPath, ['start', workspaceId, '.integrations', '--background', '--rehome'], { + const child = spawn(command, args, { cwd: startDir, stdio: ['ignore', 'ignore', 'pipe'], }) @@ -74,14 +129,14 @@ async function spawnMount(workspaceId: string, startDir: string): Promise return } if (code !== 0) { - reject(new Error(`[factory] relayfile mount start failed (exit ${code ?? 'null'}): ${stderr.trim()}`)) + reject(new Error(`[factory] relayfile mount ${args[0]} failed (exit ${code ?? 'null'}): ${stderr.trim()}`)) return } resolve() }) child.on('error', (err: Error) => { - reject(new Error(`[factory] relayfile mount start error: ${err.message}`)) + reject(new Error(`[factory] relayfile mount ${args[0]} error: ${err.message}`)) }) }) } diff --git a/src/mount/relayfile-binary.ts b/src/mount/relayfile-binary.ts index 84e35a3..88adfcd 100644 --- a/src/mount/relayfile-binary.ts +++ b/src/mount/relayfile-binary.ts @@ -1,5 +1,10 @@ import { accessSync, constants, readFileSync } from 'node:fs' import { dirname, join, resolve } from 'node:path' +import { fileURLToPath } from 'node:url' + +// ESM has no __dirname; derive this module's directory for the default search +// start (used when no caller-supplied deployment dir is available). +const MODULE_DIR = dirname(fileURLToPath(import.meta.url)) const STALE_RECONCILE_MS = 15 * 60 * 1000 const NOT_FOUND_ERROR = '[factory] relayfile-mount binary not found. Install dependencies or run: npm run relayfile-mount:install' @@ -72,7 +77,7 @@ function optionalPackageCandidates(pearRoot: string): string[] { ] } -export function resolveRelayfileMountBinary(startDir = __dirname): string { +export function resolveRelayfileMountBinary(startDir = MODULE_DIR): string { if (process.env.RELAYFILE_MOUNT_BIN) { const explicitBinary = resolve(process.env.RELAYFILE_MOUNT_BIN) if (canExecute(explicitBinary)) return explicitBinary @@ -95,6 +100,29 @@ export function resolveRelayfileMountBinary(startDir = __dirname): string { throw new Error(NOT_FOUND_ERROR) } +function cliName(): string { + return process.platform === 'win32' ? 'relayfile.exe' : 'relayfile' +} + +// The `relayfile` CLI (as opposed to the raw relayfile-mount binary) resolves +// workspace credentials itself and bundles an up-to-date mount, so the factory +// can start/refresh the writeback mount without plumbing a token — and without +// depending on a possibly-stale @relayfile/mount-* node_modules binary. Prefer +// it; fall back to the raw binary only when no CLI is on PATH. +export function resolveRelayfileCli(): string | undefined { + const explicit = process.env.RELAYFILE_BIN + if (explicit) return canExecute(explicit) ? resolve(explicit) : undefined + + const exe = cliName() + const separator = process.platform === 'win32' ? ';' : ':' + for (const dir of (process.env.PATH ?? '').split(separator)) { + if (!dir) continue + const candidate = join(dir, exe) + if (canExecute(candidate)) return resolve(candidate) + } + return undefined +} + export function checkMountStaleness( stateFilePath: string, workspaceId: string, diff --git a/src/orchestrator/factory.test.ts b/src/orchestrator/factory.test.ts index 4eeb23c..35c86a3 100644 --- a/src/orchestrator/factory.test.ts +++ b/src/orchestrator/factory.test.ts @@ -220,7 +220,7 @@ class FailingSlackAnswerFleetClient extends FakeFleetClient { failuresRemaining = 1 override async sendInput(name: string, data: string): Promise { - if (data.startsWith('Slack reply for ') && this.failuresRemaining > 0) { + if (data.startsWith(' 0) { this.failuresRemaining -= 1 throw new Error('sendInput failed') } @@ -2941,10 +2941,11 @@ describe('FactoryLoop', () => { expect(mount.writes).toEqual([]) }) - it('does not dispatch issues that carry only a state_name (no resolvable state id)', async () => { - // Without a resolvable state.id (and no hardcoded name->id map), a - // state_name-only record can't be matched to the readyForAgent role, so it - // is left alone rather than mis-dispatched. + it('does not dispatch issues whose state_name matches no configured role (no resolvable state id)', async () => { + // A state_name-only record whose name is NOT in linear.states (nor a default + // role name) can't be matched to the readyForAgent role, so it is left alone + // rather than mis-dispatched. (A conventional name like "Ready for Agent" + // DOES resolve via the default name map — see the canonical-fallback test.) const path = issuePath(27) const stateNameOnly = { provider: 'linear', @@ -2954,7 +2955,7 @@ describe('FactoryLoop', () => { ...issuePayload(27), stateId: undefined, state: undefined, - state_name: 'Ready for Agent', + state_name: 'Some Workspace-Specific Column', }, } const mount = new FakeMountClient({ [path]: stateNameOnly }) @@ -6543,7 +6544,7 @@ describe('FactoryLoop', () => { await flush() expect(slackAnswerInputs(fleet)).toEqual([ - { name: 'ar-24-impl-pear', data: 'Slack reply for AR-24:\nPlease use the existing retry helper.\r' }, + { name: 'ar-24-impl-pear', data: '\nHuman reply in the Slack thread:\nPlease use the existing retry helper.\n\r' }, ]) expect(slack.replies).toEqual([]) expect(slackReplyWrites(mount)).toEqual([]) @@ -6598,7 +6599,7 @@ describe('FactoryLoop', () => { await flush() expect(slackAnswerInputs(fleet)).toEqual([ - { name: 'ar-36-impl-pear', data: 'Slack reply for AR-36:\nUse the shared retry helper in factory.ts.\r' }, + { name: 'ar-36-impl-pear', data: '\nHuman reply in the Slack thread:\nUse the shared retry helper in factory.ts.\n\r' }, ]) expect(fleet.messages).toHaveLength(2) }) @@ -6727,7 +6728,7 @@ describe('FactoryLoop', () => { await flush() expect(slackAnswerInputs(fleet)).toEqual([ - { name: 'ar-32-impl-pear', data: 'Slack reply for AR-32:\nstatus?\r' }, + { name: 'ar-32-impl-pear', data: '\nHuman reply in the Slack thread:\nstatus?\n\r' }, ]) expect(slack.replies).toEqual([]) expect(slackReplyWrites(mount)).toEqual([]) @@ -6818,7 +6819,7 @@ describe('FactoryLoop', () => { await flush() expect(slackAnswerInputs(fleet)).toEqual([ - { name: 'ar-35-impl-pear', data: 'Slack reply for AR-35:\nstatus?\r' }, + { name: 'ar-35-impl-pear', data: '\nHuman reply in the Slack thread:\nstatus?\n\r' }, ]) expect(slackReplyWrites(mount)).toEqual([]) }) @@ -6914,7 +6915,7 @@ describe('FactoryLoop', () => { await flush() expect(slackAnswerInputs(fleet)).toEqual([ - { name: 'ar-27-impl-pear', data: 'Slack reply for AR-27:\nnew status?\r' }, + { name: 'ar-27-impl-pear', data: '\nHuman reply in the Slack thread:\nnew status?\n\r' }, ]) expect(slackReplyWrites(mount)).toEqual([]) }) @@ -6955,7 +6956,7 @@ describe('FactoryLoop', () => { await flush() expect(slackAnswerInputs(fleet)).toEqual([ - { name: 'ar-80-impl-pear', data: 'Slack reply for AR-80:\nhow is it going?\r' }, + { name: 'ar-80-impl-pear', data: '\nHuman reply in the Slack thread:\nhow is it going?\n\r' }, ]) }) @@ -6999,7 +7000,7 @@ describe('FactoryLoop', () => { await flush() expect(slackAnswerInputs(restartFleet)).toEqual([ - { name: 'ar-81-impl-pear', data: 'Slack reply for AR-81:\nany update?\r' }, + { name: 'ar-81-impl-pear', data: '\nHuman reply in the Slack thread:\nany update?\n\r' }, ]) await restarted.stop() @@ -7052,7 +7053,7 @@ describe('FactoryLoop', () => { await flush() expect(slackAnswerInputs(fleet)).toEqual([ - { name: 'ar-28-impl-pear', data: 'Slack reply for AR-28:\nstatus?\r' }, + { name: 'ar-28-impl-pear', data: '\nHuman reply in the Slack thread:\nstatus?\n\r' }, ]) expect(slackReplyWrites(mount)).toEqual([]) }) @@ -7092,7 +7093,7 @@ describe('FactoryLoop', () => { await flush() expect(slackAnswerInputs(fleet)).toEqual([ - { name: 'ar-42-impl-pear', data: 'Slack reply for AR-42:\nstatus?\r' }, + { name: 'ar-42-impl-pear', data: '\nHuman reply in the Slack thread:\nstatus?\n\r' }, ]) expect(slackReplyWrites(mount)).toEqual([]) }) @@ -7171,7 +7172,7 @@ describe('FactoryLoop', () => { await flush() expect(slackAnswerInputs(fleet)).toEqual([ - { name: 'ar-30-impl-pear', data: 'Slack reply for AR-30:\nstatus again?\r' }, + { name: 'ar-30-impl-pear', data: '\nHuman reply in the Slack thread:\nstatus again?\n\r' }, ]) expect(slackReplyWrites(mount)).toEqual([]) }) @@ -7219,7 +7220,7 @@ describe('FactoryLoop', () => { await flush() expect(slackAnswerInputs(fleet)).toEqual([ - { name: 'ar-31-impl-pear', data: 'Slack reply for AR-31:\nstatus?\r' }, + { name: 'ar-31-impl-pear', data: '\nHuman reply in the Slack thread:\nstatus?\n\r' }, ]) expect(slackReplyWrites(mount)).toEqual([]) expect(warnings.flat()).not.toContain('[factory] Slack reply event missing stable identity; falling back to path/content dedupe') diff --git a/src/orchestrator/factory.ts b/src/orchestrator/factory.ts index 95dadea..00e8ee2 100644 --- a/src/orchestrator/factory.ts +++ b/src/orchestrator/factory.ts @@ -1,5 +1,5 @@ import { mkdir, readFile, writeFile } from 'node:fs/promises' -import { dirname } from 'node:path' +import { dirname, isAbsolute, resolve } from 'node:path' import { FactoryConfigSchema, type FactoryConfig } from '../config/schema' import { linearByStatePath, linearByIdPath, linearByUuidPath } from '../constants/linear' @@ -186,6 +186,10 @@ export class FactoryLoop implements Factory { readonly #resumeInFlight = new Map>() readonly #slackWatchers = new Map() readonly #slackWatcherStarts = new Map>() + // Agents we've already logged an ambiguous-PID-lookup warning for, so the + // reaper doesn't spam the same benign "ambiguous process lookup" line on every + // poll (a joined/cloud agent has no local PID to resolve — expected). + readonly #ambiguousLookupWarned = new Set() // Last invalid-label failure signature we posted per issue, so a stuck Ready // issue (or the comment writeback's own change event) does not re-post the // same notice every cycle. Cleared once the issue dispatches successfully. @@ -318,7 +322,24 @@ export class FactoryLoop implements Factory { }, } const descriptors = await deriveDescriptorsFromMount(reader) - this.#integrationInstructions = prescriptiveInstructions(descriptors) + // The package emits paths relative to the daemon's .integrations mount, + // but the agent runs in its repo clonePath — a relative `.integrations/...` + // path is unreachable from there. Absolutize every writeback path to the + // daemon's mount root so the prescriptive instructions are actionable. + const root = this.#integrationsMountRoot() + const abs = (p: string): string => (isAbsolute(p) ? p : resolve(root, '..', p)) + const absoluteDescriptors = descriptors.map((descriptor) => ({ + ...descriptor, + mountRoot: abs(descriptor.mountRoot), + ...(descriptor.discoveryRoot ? { discoveryRoot: abs(descriptor.discoveryRoot) } : {}), + writableResources: descriptor.writableResources.map((res) => ({ + ...res, + path: abs(res.path), + ...(res.createExamplePath ? { createExamplePath: abs(res.createExamplePath) } : {}), + ...(res.schemaPath ? { schemaPath: abs(res.schemaPath) } : {}), + })), + })) + this.#integrationInstructions = prescriptiveInstructions(absoluteDescriptors) return this.#integrationInstructions } catch { this.#logger.warn?.('[factory] failed to resolve integration instructions from mount') @@ -1896,7 +1917,10 @@ export class FactoryLoop implements Factory { return { pids: [scan.identity.pid], status: 'found' } } if (scan.status === 'ambiguous') { - this.#logger.warn?.(`[factory] ambiguous process lookup for ${agentName}`) + if (!this.#ambiguousLookupWarned.has(agentName)) { + this.#ambiguousLookupWarned.add(agentName) + this.#logger.warn?.(`[factory] ambiguous process lookup for ${agentName} (suppressing repeats)`) + } return { pids: [], status: 'unresolved' } } @@ -2116,6 +2140,12 @@ export class FactoryLoop implements Factory { if (tracked.sessionRef) { const resumeKey = `${issueKey(record.issue)}:${name}:${tracked.sessionRef}` if (await this.#state.isResumed(this.#workspaceId, resumeKey)) { + // Already resumed once and STILL exiting with no completion PR — the + // agent isn't making progress. Escalate so a human notices, instead of + // leaving the issue silently in-flight forever. + if (tracked.spec.role === 'implementer') { + await this.#escalateStalledIssue(record, name) + } return } @@ -2172,6 +2202,30 @@ export class FactoryLoop implements Factory { } } + // An implementer that exited, was resumed once, and STILL produced no PR is + // not making progress. Surface it (counter + a best-effort Slack note to the + // dispatch thread) so a human can step in, instead of the issue sitting + // silently "in flight" with nothing happening. We do NOT fake a Linear state + // change here (the mount may be wedged); the human owns the next step. + async #escalateStalledIssue(record: InFlightIssue, name: string): Promise { + this.#increment('issuesStalledNoPr') + this.#logger.warn?.('[factory] implementer exited without a PR after a resume; escalating for human attention', { + issue: record.issue.key, + agent: name, + }) + try { + const thread = await this.#slackDispatchThreadFor(record) + if (thread && this.#slack) { + await this.#slack.reply( + thread.threadId, + `:warning: ${record.issue.key}: the implementer exited without opening a PR (after a retry). It needs a human look.`, + ) + } + } catch (error) { + this.#logger.warn?.('[factory] failed to post stalled-issue escalation to Slack', error) + } + } + async #issueHasCompletionPr(record: InFlightIssue): Promise { try { const issue = await this.#readIssue(record.issue.path) @@ -2396,6 +2450,7 @@ export class FactoryLoop implements Factory { reviewerName, implementerNames, slackDispatchThread: await this.#slackDispatchThreadFor(record), + integrationsMountRoot: this.#integrationsMountRoot(), integrationInstructions, }), from: 'factory', @@ -2670,6 +2725,7 @@ export class FactoryLoop implements Factory { implementerNames, pr: { number: prRef.prNumber, url: prRef.url }, slackDispatchThread: await this.#slackDispatchThreadFor(record), + integrationsMountRoot: this.#integrationsMountRoot(), integrationInstructions, }) @@ -3391,20 +3447,38 @@ export class FactoryLoop implements Factory { return } - const input = slackAnswerInput(liveRecord.issue, text) for (const recipient of new Set(recipients)) { - await this.#fleet.sendInput(recipient, input) + await this.#injectSlackReplyEvent(recipient, liveRecord.issue, text) this.#increment('slackAnswersInjected') } } - async #slackDispatchThreadFor(record: InFlightIssue): Promise<{ channel: string; threadId: string } | undefined> { + // Inject the human's Slack reply into the agent framed as the + // the spawn prompt tells it to expect (not an ambiguous + // "Slack reply for ..." keystroke), so the agent recognizes it as the awaited + // event. (A broker confirmed-delivery path via waitForInjected is a possible + // robustness follow-up.) + async #injectSlackReplyEvent(recipient: string, issue: IssueRef, text: string): Promise { + await this.#fleet.sendInput?.(recipient, slackReplyEvent(issue, text)) + } + + // Absolute path to the local .integrations mount the daemon manages. The mount + // is created at the daemon's cwd (see ensureLocalMount), and spawned agents run + // in their repo clonePath, so writeback paths handed to agents must be absolute + // against this root rather than a bare relative `.integrations/...`. + #integrationsMountRoot(): string { + return resolve(process.cwd(), '.integrations') + } + + async #slackDispatchThreadFor(record: InFlightIssue): Promise<{ channel: string; threadId: string; mountRoot: string } | undefined> { if (!this.#config.slack) { return undefined } const threadId = await this.#state.getSlackThread(this.#workspaceId, issueKey(record.issue)) - return threadId ? { channel: this.#config.slack.channel, threadId } : undefined + return threadId + ? { channel: this.#config.slack.channel, threadId, mountRoot: this.#integrationsMountRoot() } + : undefined } async #runCompletionMergeGate(issue: LinearIssue): Promise { @@ -4783,6 +4857,12 @@ const triageEscalationReason = (decision: TriageDecision): string | undefined => const slackAnswerInput = (issue: IssueRef, text: string): string => `Slack reply for ${issue.key}:\n${text}\r` +// The human's Slack-thread reply, framed as an the agent is +// told (at spawn) to expect — a recognizable injected event, not an ambiguous +// keystroke. Trailing CR submits it to the agent's PTY. +const slackReplyEvent = (issue: IssueRef, text: string): string => + `\nHuman reply in the Slack thread:\n${text}\n\r` + const isFactoryQuestionTarget = (target: string): boolean => { const normalized = target.trim().replace(/^@/u, '').toLowerCase() return normalized === 'broker' || normalized === 'factory' diff --git a/src/writeback/linear.ts b/src/writeback/linear.ts index 87d649a..cd24b69 100644 --- a/src/writeback/linear.ts +++ b/src/writeback/linear.ts @@ -1,4 +1,4 @@ -import { linearCommentPath, linearIssuePath } from '../constants/linear' +import { linearByIdPath, linearByUuidPath, linearCommentPath, linearIssuePath } from '../constants/linear' import type { MountClient } from '../ports' import type { Logger } from '../ports/system' import { assertInFactoryScope, isInFactoryScope } from '../safety/factory-scope' @@ -70,16 +70,46 @@ const payloadInFactoryScope = ( return isInFactoryScope(scopeIssueFromPayload(payload, 'createIssue payload'), safety) } +const hasGuardFields = (payload: Record): boolean => + typeof payload.title === 'string' || Array.isArray(payload.labels) || asRecord(payload.team) !== undefined + const readIssuePayloadForGuard = async ( mount: MountClient, issue: LinearIssue, ): Promise> => { - const path = issuePath(issue) - try { - return wrappedPayload((await mount.readFile(path)).content) - } catch { - throw new Error(`Refusing Linear writeback for ${issue.key}: unable to read guard fields from ${path}`) + // The primary __.json may be a change-event STUB (no title/labels/ + // team — the sparse-sync case); fall back to the canonical by-id/by-uuid + // records so the factory-scope guard sees the real fields and doesn't refuse a + // legitimately-[factory] issue. + const candidates = [ + issuePath(issue), + ...(issue.key ? [linearByIdPath(issue.key)] : []), + ...(issue.uuid ? [linearByUuidPath(issue.uuid)] : []), + ] + let primaryPayload: Record | undefined + let lastError: unknown + for (const path of candidates) { + try { + const payload = wrappedPayload((await mount.readFile(path)).content) + if (primaryPayload === undefined) { + primaryPayload = payload + } + if (hasGuardFields(payload)) { + return payload + } + } catch (error) { + lastError = error + } } + // No record carried guard fields. Preserve prior behavior: return the primary + // payload (the scope guard then decides) rather than failing the read outright. + if (primaryPayload !== undefined) { + return primaryPayload + } + throw new Error( + `Refusing Linear writeback for ${issue.key}: unable to read guard fields` + + (lastError instanceof Error ? ` (${lastError.message})` : ''), + ) } interface CachedIssuePayload { @@ -100,6 +130,10 @@ const createIssuePath = (payload: LinearCreateIssuePayload): string => { const looksLikeProviderIssueIdentifier = (value: string): boolean => /^[A-Z][A-Z0-9]*-/u.test(value) +const READBACK_CONFIRM_ATTEMPTS = 3 +const READBACK_CONFIRM_DELAY_MS = 250 +const delay = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)) + const confirmWriteback = async ( mount: MountClient, path: string, @@ -107,13 +141,27 @@ const confirmWriteback = async ( logger: Pick, ): Promise => { await assertWritebackAcked(mount, path) - try { - if (!await verify()) { - logger.warn?.(`[factory-sdk] Linear writeback read-back verification failed for ${path}; treating getOp ack as success`) + // getOp can return a FAKED success on a busy/wedged mount ("workspace write + // path is busy"), so the read-back is the source of truth. Retry to absorb + // eventual-consistency lag; if it never confirms, the write did NOT land — + // throw instead of silently faking success (which previously left issues + // un-advanced while the factory believed they had advanced). + for (let attempt = 0; attempt < READBACK_CONFIRM_ATTEMPTS; attempt += 1) { + let confirmed = false + try { + confirmed = await verify() + } catch { + confirmed = false + } + if (confirmed) { + return + } + if (attempt < READBACK_CONFIRM_ATTEMPTS - 1) { + logger.warn?.(`[factory-sdk] Linear writeback read-back for ${path} not yet confirmed (attempt ${attempt + 1}/${READBACK_CONFIRM_ATTEMPTS}); retrying`) + await delay(READBACK_CONFIRM_DELAY_MS) } - } catch (error) { - logger.warn?.(`[factory-sdk] Linear writeback read-back verification errored for ${path}; treating getOp ack as success`, error) } + throw new Error(`[factory-sdk] Linear writeback for ${path} acked but the read-back never confirmed it landed; the write did not propagate`) } const assertWritebackAcked = async ( diff --git a/src/writeback/writeback.test.ts b/src/writeback/writeback.test.ts index b8e1f73..98feacc 100644 --- a/src/writeback/writeback.test.ts +++ b/src/writeback/writeback.test.ts @@ -245,7 +245,10 @@ describe('MountLinearWriteback', () => { await expect(linear.verify(issue, { stateId: 'implementing-state' })).rejects.toThrow(/not acked/) }) - it('treats stale read-back as advisory after getOp ack for state, comment, and create writebacks', async () => { + it('throws (no faked success) when the read-back never confirms the write landed', async () => { + // A getOp ack can be faked-success by a busy/wedged mount; if the read-back + // never reflects the write, it did NOT land — the writeback must fail loudly + // rather than leave the caller believing it advanced. class StaleMountClient extends FakeMountClient { override async writeFile(path: string, content: unknown): Promise { await super.writeFile(path, content) @@ -262,29 +265,17 @@ describe('MountLinearWriteback', () => { const mount = new StaleMountClient({ [issuePath]: wrappedIssueRecord(), }) - const warnings: unknown[][] = [] - const linear = MountLinearWriteback(mount, { - logger: { - warn: (...args: unknown[]) => warnings.push(args), - }, - }) + const linear = MountLinearWriteback(mount, { logger: { warn: () => {} } }) - await expect(linear.setState(issue, 'implementing-state')).resolves.toBeUndefined() - await expect(linear.postComment(issue, 'Agent dispatched after stale mirror')).resolves.toBeUndefined() + await expect(linear.setState(issue, 'implementing-state')).rejects.toThrow(/read-back never confirmed it landed/u) + await expect(linear.postComment(issue, 'Agent dispatched after stale mirror')).rejects.toThrow(/read-back never confirmed it landed/u) await expect(linear.createIssue({ id: 'uuid-stale-create', identifier: 'AR-STALE', title: '[factory-e2e] synthetic issue with stale mirror', team: { key: 'AR', name: 'Agent Relay' }, stateId: 'ready-state', - })).resolves.toEqual({ path: '/linear/issues/factory-create-uuid-stale-create.json' }) - - expect(warnings).toHaveLength(3) - expect(warnings.map((warning) => warning[0])).toEqual([ - `[factory-sdk] Linear writeback read-back verification failed for ${issuePath}; treating getOp ack as success`, - expect.stringMatching(/^\[factory-sdk\] Linear writeback read-back verification failed for \/linear\/issues\/[^/]+\/comments\/AR-99__factory-/u), - '[factory-sdk] Linear writeback read-back verification failed for /linear/issues/factory-create-uuid-stale-create.json; treating getOp ack as success', - ]) + })).rejects.toThrow(/read-back never confirmed it landed/u) }) it('refuses setState and postComment on an issue without factory-e2e before writing', async () => {