diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index 9bda09b8..28504b26 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -1324,6 +1324,53 @@ test('slack injected confirmation retries exhausted releases dedupe key', async assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'alice', 'alice', 'alice']) }) +test('agent_not_found injected confirmation is suppressed without retry', async () => { + process.env.PEAR_INTEGRATION_EVENT_INJECTED_RETRY_DELAYS_MS = '5,5' + const harness = makeHarness(['alice'], { manualInjectedConfirmations: true }) + const warnCalls: unknown[][] = [] + const originalWarn = console.warn + console.warn = (...args: unknown[]) => { + warnCalls.push(args) + } + + try { + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + const path = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json' + await harness.emit(changeEvent(path, 'slack')) + await waitForSent(harness, 1) + // A recipient the broker reports as nonexistent is a permanent failure. + harness.pendingInjectedConfirmations[0].reject( + new Error('relaycast send_dm failed: API error (agent_not_found): Agent "alice" not found') + ) + + await waitUntil(() => + warnCalls.some((call) => call[0] === '[integration-events] delivery recipient no longer registered; suppressing') + ) + // No retry: exactly one send, no further injected-confirmation attempts, and + // crucially no "retries exhausted" storm against the vanished agent. + await new Promise((resolve) => setTimeout(resolve, 50)) + assert.equal(harness.sent.length, 1) + assert.equal(harness.injectedConfirmationCalls.length, 1) + assert.equal(harness.pendingInjectedConfirmations.length, 1) + assert.equal( + warnCalls.some((call) => call[0] === '[integration-events] delivery injected retries exhausted'), + false + ) + } finally { + console.warn = originalWarn + } +}) + test('slack injected confirmation retry resends only failed recipient', async () => { process.env.PEAR_INTEGRATION_EVENT_INJECTED_RETRY_DELAYS_MS = '5,5' const harness = makeHarness(['alice', 'bob'], { manualInjectedConfirmations: true }) diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index 9731ba38..cbd0d3c7 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -1645,6 +1645,14 @@ function logIntegrationEvent(message: string, metadata: Record) void appendIntegrationEventLog(message, metadata) } +// A recipient that the broker reports as nonexistent (rotated out, never +// registered) is a PERMANENT delivery failure, not a transient one: retrying +// the inject — and the Relaycast fallback — just spams the same vanished agent +// every few seconds for every event. Mirrors broker.ts isMissingAgentError. +function isPermanentRecipientError(error: unknown): boolean { + return /agent_not_found|no worker named|not found/i.test(toErrorMessage(error)) +} + function warnIntegrationEventAggregated(key: string, message: string, metadata: Record): void { if (!aggregatedWarnings.has(key) && aggregatedWarnings.size >= MAX_AGGREGATED_WARNING_KEYS) { const oldestKey = aggregatedWarnings.keys().next().value @@ -3256,13 +3264,35 @@ export class IntegrationEventBridge { const aborted = failed.find((result) => isAbortError(result.error)) if (aborted) throw aborted.error - for (const { confirmation, error } of failed) { + // A recipient that no longer exists is a permanent failure — retrying just + // re-spams the vanished agent. Drop those from the retry set and warn once + // (aggregated). Treat them as resolved so the dedupe key commits and a + // duplicate of this event doesn't re-attempt delivery to the ghost. + const permanent = failed.filter(({ error }) => isPermanentRecipientError(error)) + const retryable = failed.filter(({ error }) => !isPermanentRecipientError(error)) + for (const { confirmation, error } of permanent) { + warnIntegrationEventAggregated( + `delivery recipient gone:${projectId}`, + 'delivery recipient no longer registered; suppressing', + { + projectId, + eventId: event.id, + path: event.resource.path, + recipient: confirmation.recipient, + duplicateKey: deliveryClaim.key, + error: toErrorMessage(error) + } + ) + } + if (retryable.length === 0) return + + for (const { confirmation, error } of retryable) { this.warnDeliveryInjectedConfirmationFailed(projectId, event, deliveryClaim, confirmation, error) } const delayMs = retryDelays[retryIndex] if (delayMs === undefined) { - for (const { confirmation, error } of failed) { + for (const { confirmation, error } of retryable) { console.warn('[integration-events] delivery injected retries exhausted', { projectId, eventId: event.id, @@ -3273,10 +3303,10 @@ export class IntegrationEventBridge { error: toErrorMessage(error) }) } - throw failed[0].error + throw retryable[0].error } - for (const { confirmation, error } of failed) { + for (const { confirmation, error } of retryable) { warnIntegrationEventAggregated( `delivery injected confirmation retrying:${projectId}`, 'delivery injected confirmation retrying', @@ -3296,7 +3326,7 @@ export class IntegrationEventBridge { await delay(delayMs, signal) const retryConfirmations: InjectedConfirmation[] = [] - for (const { confirmation } of failed) { + for (const { confirmation } of retryable) { const nextAttempt = (attemptsByRecipient.get(confirmation.recipient) ?? 1) + 1 attemptsByRecipient.set(confirmation.recipient, nextAttempt) try { @@ -3378,6 +3408,11 @@ export class IntegrationEventBridge { const onlineExplicitAgents = projectAgents ? targets.agents.filter((agent) => projectAgents.includes(agent)) : [] + // An EMPTY live roster is treated as a transient broker-startup race, not + // proof the agent is gone — fall back to the configured targets so a real + // agent that simply hasn't been listed yet still gets the event. If that + // optimism is wrong (agent truly vanished), delivery fails permanently and + // confirmInjectedDeliveryWithRetry suppresses it instead of retry-storming. const explicitAgents = projectAgents && projectAgents.length === 0 && targets.agents.length > 0 ? targets.agents : onlineExplicitAgents diff --git a/src/main/integration-mounts.ts b/src/main/integration-mounts.ts index 9c8a87ae..60e6fd38 100644 --- a/src/main/integration-mounts.ts +++ b/src/main/integration-mounts.ts @@ -10,6 +10,7 @@ import { } from '@relayfile/sdk' import { accountWorkspaceReadyRetryOptions, getAccountWorkspaceId, refreshCloudAuth, resolveCloudAuth } from './auth' import { createPearMountLauncher } from './relayfile-mount-launcher' +import { forgetMountPid, killMountPid, reapOrphanedMountPids, recordMountPid } from './relayfile-mount-pids' const MOUNT_READY_TIMEOUT_MS = 60_000 const MOUNT_SYNC_TIMEOUT = '180s' @@ -202,6 +203,11 @@ export class IntegrationMountManager { // One mount per configured Relayfile path. Mounting provider roots makes // large integrations mirror far more data than the project selected. private handles = new Map() + // Last-known OS pid per mounted path, captured from handle.status(). Used to + // hard-kill a detached mount daemon the SDK's stop() may not reap, and to + // distinguish this instance's live mounts from reapable orphans at boot. + private mountPids = new Map() + private reapedOrphanMounts = false private refreshTimers = new Map>() private authRestartedAt = new Map() private restartAttempts = new Map() @@ -309,6 +315,8 @@ export class IntegrationMountManager { // from the desired set (see mount()). const handle = this.handles.get(providerRoot) this.handles.delete(providerRoot) + const pid = this.mountPids.get(providerRoot) + this.mountPids.delete(providerRoot) if (handle) { await handle.stop().catch((error) => { console.warn( @@ -317,6 +325,14 @@ export class IntegrationMountManager { ) }) } + // Backstop: stop() does not reliably reap a detached (`background: true`) + // daemon, so kill the captured pid if it is still a live relayfile-mount. + // Without this, a wedged mount's old process survives every forced restart + // and they pile up (the respawn-storm orphans). + if (killMountPid(pid)) { + console.warn(`[integration-mounts] Hard-killed lingering mount process ${pid} for ${providerRoot}`) + } + await forgetMountPid(pid) } currentWorkspaceId(): string | null { @@ -615,6 +631,19 @@ export class IntegrationMountManager { } this.workspaceId = workspaceId + // Once per app run, now that we're committed to mounting, reap + // relayfile-mount daemons a prior instance left orphaned (crash / kill -9 / + // dev hot-reload bypassing before-quit). Only PIDs we recorded and that + // still resolve to a live relayfile-mount are killed; our own live mounts + // are excluded. + if (!this.reapedOrphanMounts) { + this.reapedOrphanMounts = true + const killed = await reapOrphanedMountPids(new Set(this.mountPids.values())) + if (killed.length > 0) { + console.warn(`[integration-mounts] Reaped ${killed.length} orphaned mount process(es) from a prior run`, { pids: killed }) + } + } + const mountRoot = integrationMountRootForWorkspace(workspaceId) await ensureProtectedDirectory(join(homedir(), '.agentworkforce')) await ensureProtectedDirectory(join(homedir(), '.agentworkforce', 'pear')) @@ -695,6 +724,14 @@ export class IntegrationMountManager { handle = await startMount() } this.handles.set(spec.remotePath, handle) + // Capture the daemon pid so stopHandle can hard-kill it if stop() leaves + // it detached, and so the boot-time orphan reaper can tell our live + // mounts apart from strays. Best-effort: a missing pid just skips both. + const pid = await handle.status().then((s) => s.pid).catch(() => undefined) + if (typeof pid === 'number') { + this.mountPids.set(spec.remotePath, pid) + await recordMountPid(pid, spec.localDir) + } this.scheduleRefresh(spec.remotePath, handle) } catch (error) { if (isUnauthorizedError(error)) { diff --git a/src/main/integrations.test.ts b/src/main/integrations.test.ts index fb5381bd..7acfcde7 100644 --- a/src/main/integrations.test.ts +++ b/src/main/integrations.test.ts @@ -716,6 +716,24 @@ describe('IntegrationsManager', () => { }) }) + it('surfaces a restart-cap-exceeded mount as user-actionable auth recovery', () => { + const manager = new IntegrationsManager() + const observer = mock.integrationMountManager.setHealthObserver.mock.calls.at(-1)?.[0] + + observer?.({ + type: 'restart-cap-exceeded', + remotePath: '/slack/channels/C0BBTBC1RCM__epic/messages', + attempts: 5, + reason: 'reconcile loop stalled' + }) + + expect(manager.getAuthRecoveryState()).toMatchObject({ + reason: 'cloud-auth-required', + failureClass: 'mount-recovery-exhausted' + }) + expect(manager.getAuthRecoveryState()?.message).toMatch(/stopped recovering after 5 restarts/) + }) + it('clears sticky auth recovery state after the all-dead recovery retry respawns mounts', async () => { vi.useFakeTimers() const manager = new IntegrationsManager() diff --git a/src/main/integrations.ts b/src/main/integrations.ts index e05c2181..0f551e03 100644 --- a/src/main/integrations.ts +++ b/src/main/integrations.ts @@ -928,6 +928,19 @@ export class IntegrationsManager { this.setAuthRecoveryState(alert.reason, undefined, alert.message) return } + // A mount that exhausted its restart cap is paused, not recovering — five + // consecutive restarts failed, so this is not a transient blip the loop + // will fix on its own. Surface it to the user (the documented remedy for a + // wedged integration mount is re-auth) instead of letting the alert drop + // silently and the mount cycle every reset window forever. + if (alert.type === 'restart-cap-exceeded') { + this.setAuthRecoveryState( + 'cloud-auth-required', + 'mount-recovery-exhausted', + `Integration sync for ${alert.remotePath} stopped recovering after ${alert.attempts} restarts (${alert.reason}). Re-authenticate to resume writebacks.` + ) + return + } // Only auth-stall alerts carry pendingWriteback/message and map to a // mount-auth-stall event. reconcile-stalled alerts have neither field, so // emitting one here previously produced a malformed event with undefined diff --git a/src/main/relayfile-mount-pids.test.ts b/src/main/relayfile-mount-pids.test.ts new file mode 100644 index 00000000..3a196ea6 --- /dev/null +++ b/src/main/relayfile-mount-pids.test.ts @@ -0,0 +1,122 @@ +import { mkdtemp, rm, readFile } from 'node:fs/promises' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +const mockHome = vi.hoisted(() => ({ dir: '' })) +const mockChild = vi.hoisted(() => ({ execFileSync: vi.fn() })) + +vi.mock('node:os', async (importOriginal) => { + const actual = await importOriginal() + return { ...actual, homedir: () => mockHome.dir } +}) + +vi.mock('node:child_process', () => ({ + execFileSync: mockChild.execFileSync +})) + +import { forgetMountPid, killMountPid, reapOrphanedMountPids, recordMountPid } from './relayfile-mount-pids' + +function registryFile(): string { + return join(mockHome.dir, '.agentworkforce', 'pear', 'relayfile', 'mount-pids.json') +} + +async function readPids(): Promise { + const raw = await readFile(registryFile(), 'utf8') + return (JSON.parse(raw).pids as Array<{ pid: number }>).map((entry) => entry.pid) +} + +describe('relayfile-mount-pids', () => { + let killSpy: ReturnType + + beforeEach(async () => { + mockHome.dir = await mkdtemp(join(tmpdir(), 'mount-pids-')) + mockChild.execFileSync.mockReset() + // Default: any probed pid resolves to a relayfile-mount executable. + mockChild.execFileSync.mockReturnValue('relayfile-mount\n') + }) + + afterEach(async () => { + killSpy?.mockRestore() + await rm(mockHome.dir, { recursive: true, force: true }) + }) + + it('records and forgets pids', async () => { + await recordMountPid(4242, '/local/a') + await recordMountPid(4243, '/local/b') + expect((await readPids()).sort()).toEqual([4242, 4243]) + + // Re-recording the same pid does not duplicate it. + await recordMountPid(4242, '/local/a') + expect((await readPids()).filter((pid) => pid === 4242)).toHaveLength(1) + + await forgetMountPid(4242) + expect(await readPids()).toEqual([4243]) + }) + + it('ignores invalid pids', async () => { + await recordMountPid(undefined, '/x') + await recordMountPid(0, '/x') + await recordMountPid(1, '/x') + await expect(readFile(registryFile(), 'utf8')).rejects.toThrow() + }) + + it('reaps an orphaned live relayfile-mount and clears the registry', async () => { + killSpy = vi.spyOn(process, 'kill').mockImplementation(() => true) + await recordMountPid(50_000, '/local/a') + + const killed = await reapOrphanedMountPids(new Set()) + + expect(killed).toEqual([50_000]) + expect(killSpy).toHaveBeenCalledWith(50_000, 'SIGKILL') + expect(await readPids()).toEqual([]) + }) + + it('does not reap a pid this instance still manages', async () => { + killSpy = vi.spyOn(process, 'kill').mockImplementation(() => true) + await recordMountPid(50_001, '/local/a') + + const killed = await reapOrphanedMountPids(new Set([50_001])) + + expect(killed).toEqual([]) + expect(killSpy).not.toHaveBeenCalledWith(50_001, 'SIGKILL') + // The still-managed pid is retained for a future reap. + expect(await readPids()).toEqual([50_001]) + }) + + it('does not reap a recycled pid whose executable is not relayfile-mount', async () => { + killSpy = vi.spyOn(process, 'kill').mockImplementation(() => true) + mockChild.execFileSync.mockReturnValue('some-other-process\n') + await recordMountPid(50_002, '/local/a') + + const killed = await reapOrphanedMountPids(new Set()) + + expect(killed).toEqual([]) + expect(killSpy).not.toHaveBeenCalledWith(50_002, 'SIGKILL') + }) + + it('does not reap a dead pid', async () => { + killSpy = vi.spyOn(process, 'kill').mockImplementation((_pid, signal) => { + if (signal === 0) throw new Error('ESRCH') + return true + }) + await recordMountPid(50_003, '/local/a') + + const killed = await reapOrphanedMountPids(new Set()) + + expect(killed).toEqual([]) + expect(killSpy).not.toHaveBeenCalledWith(50_003, 'SIGKILL') + }) + + it('killMountPid kills a live relayfile-mount and ignores a dead pid', async () => { + killSpy = vi.spyOn(process, 'kill').mockImplementation(() => true) + expect(killMountPid(50_004)).toBe(true) + expect(killSpy).toHaveBeenCalledWith(50_004, 'SIGKILL') + + killSpy.mockImplementation((_pid: number, signal?: string | number) => { + if (signal === 0) throw new Error('ESRCH') + return true + }) + expect(killMountPid(50_005)).toBe(false) + }) +}) diff --git a/src/main/relayfile-mount-pids.ts b/src/main/relayfile-mount-pids.ts new file mode 100644 index 00000000..3ef2c7c5 --- /dev/null +++ b/src/main/relayfile-mount-pids.ts @@ -0,0 +1,133 @@ +import { execFileSync } from 'node:child_process' +import { mkdir, readFile, rename, writeFile } from 'node:fs/promises' +import { homedir } from 'node:os' +import { join } from 'node:path' +import { isRecord } from './guards' +import { toErrorMessage } from './errors' + +// A persisted registry of relayfile-mount child PIDs this app has spawned. +// +// Mounts run as detached (`background: true`) daemons, so a hard kill of the +// Electron main process — a crash, a `kill -9`, a dev hot-reload — leaves them +// orphaned and reparented to launchd/init. They never get reaped, accumulate +// across restarts (observed: 6 stale relayfile-mount procs outliving their +// parent), and the wedged ones keep spinning CPU. before-quit cleanup only +// covers the graceful path; this registry covers the rest by letting the next +// boot reap any mount PID a prior instance recorded but never stopped. +// +// Safety: we ONLY ever kill a PID we recorded AND that still resolves to a +// relayfile-mount executable (guards against PID reuse), so a user's manually +// `relayfile start`-ed mount — a different process tree — is never touched. + +interface MountPidEntry { + pid: number + localDir: string + recordedAt: string +} + +function registryPath(): string { + return join(homedir(), '.agentworkforce', 'pear', 'relayfile', 'mount-pids.json') +} + +async function readRegistry(): Promise { + let raw: string + try { + raw = await readFile(registryPath(), 'utf8') + } catch { + return [] + } + try { + const parsed: unknown = JSON.parse(raw) + if (!isRecord(parsed) || !Array.isArray(parsed.pids)) return [] + return parsed.pids.filter( + (entry): entry is MountPidEntry => + isRecord(entry) && typeof entry.pid === 'number' && typeof entry.localDir === 'string' + ) + } catch { + return [] + } +} + +async function writeRegistry(entries: MountPidEntry[]): Promise { + const path = registryPath() + try { + await mkdir(join(homedir(), '.agentworkforce', 'pear', 'relayfile'), { recursive: true }) + const tmp = `${path}.tmp` + await writeFile(tmp, JSON.stringify({ pids: entries }), 'utf8') + await rename(tmp, path) + } catch (error) { + console.warn('[relayfile-mount-pids] Failed to persist mount PID registry:', toErrorMessage(error)) + } +} + +// True only if the PID is alive AND its executable is a relayfile-mount binary. +// The comm check defends against PID reuse: a recorded PID that has since been +// recycled for an unrelated process must never be killed. +function isLiveRelayfileMount(pid: number): boolean { + if (!Number.isInteger(pid) || pid <= 1) return false + try { + process.kill(pid, 0) + } catch { + return false + } + if (process.platform === 'win32') return true + try { + const comm = execFileSync('ps', ['-p', String(pid), '-o', 'comm='], { encoding: 'utf8' }) + return /relayfile-mount/.test(comm) + } catch { + // ps failed / process vanished between the liveness probe and here. + return false + } +} + +export async function recordMountPid(pid: number | undefined, localDir: string): Promise { + if (typeof pid !== 'number' || !Number.isInteger(pid) || pid <= 1) return + const entries = (await readRegistry()).filter((entry) => entry.pid !== pid) + entries.push({ pid, localDir, recordedAt: new Date().toISOString() }) + await writeRegistry(entries) +} + +export async function forgetMountPid(pid: number | undefined): Promise { + if (typeof pid !== 'number') return + const entries = await readRegistry() + const next = entries.filter((entry) => entry.pid !== pid) + if (next.length !== entries.length) await writeRegistry(next) +} + +// Kill any recorded mount PID that is still a live relayfile-mount — these are +// orphans from a prior app instance that never shut down cleanly. Returns the +// PIDs actually killed. Resets the registry to only the entries we could not +// verify-and-kill (none, on success). +export async function reapOrphanedMountPids(livePids: ReadonlySet): Promise { + const entries = await readRegistry() + if (entries.length === 0) return [] + const killed: number[] = [] + for (const entry of entries) { + // A PID this instance is actively managing is not an orphan. + if (livePids.has(entry.pid)) continue + if (!isLiveRelayfileMount(entry.pid)) continue + try { + process.kill(entry.pid, 'SIGKILL') + killed.push(entry.pid) + } catch (error) { + console.warn(`[relayfile-mount-pids] Failed to reap orphan mount pid ${entry.pid}:`, toErrorMessage(error)) + } + } + // Drop everything we recorded; the live handles re-record their own PIDs as + // they (re)mount, and a still-live PID we manage is in livePids. + await writeRegistry(entries.filter((entry) => livePids.has(entry.pid))) + return killed +} + +// Best-effort synchronous SIGKILL of a single PID, used as a backstop after the +// SDK's async stop() so a detached daemon that ignored stop() is still reaped. +export function killMountPid(pid: number | undefined): boolean { + if (!isLiveRelayfileMount(pid ?? -1)) return false + try { + process.kill(pid as number, 'SIGKILL') + return true + } catch (error) { + console.warn(`[relayfile-mount-pids] Failed to kill mount pid ${pid}:`, toErrorMessage(error)) + return false + } +}