Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/main/__tests__/integration-event-bridge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,53 @@ test('slack injected confirmation retries exhausted releases dedupe key', async
assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'alice', 'alice', 'alice'])
})

test('agent_not_found injected confirmation is suppressed without retry', async () => {
process.env.PEAR_INTEGRATION_EVENT_INJECTED_RETRY_DELAYS_MS = '5,5'
const harness = makeHarness(['alice'], { manualInjectedConfirmations: true })
const warnCalls: unknown[][] = []
const originalWarn = console.warn
console.warn = (...args: unknown[]) => {
warnCalls.push(args)
}

try {
await withMockedNow('2026-06-05T14:00:00.000Z', async () => {
await harness.bridge.reconcile('project-1', [
integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/channels/C123ABC__proj-cloud'],
scope: { notifyAgents: ['alice'] }
})
])
})

const path = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json'
await harness.emit(changeEvent(path, 'slack'))
await waitForSent(harness, 1)
// A recipient the broker reports as nonexistent is a permanent failure.
harness.pendingInjectedConfirmations[0].reject(
new Error('relaycast send_dm failed: API error (agent_not_found): Agent "alice" not found')
)

await waitUntil(() =>
warnCalls.some((call) => call[0] === '[integration-events] delivery recipient no longer registered; suppressing')
)
// No retry: exactly one send, no further injected-confirmation attempts, and
// crucially no "retries exhausted" storm against the vanished agent.
await new Promise((resolve) => setTimeout(resolve, 50))
assert.equal(harness.sent.length, 1)
assert.equal(harness.injectedConfirmationCalls.length, 1)
assert.equal(harness.pendingInjectedConfirmations.length, 1)
assert.equal(
warnCalls.some((call) => call[0] === '[integration-events] delivery injected retries exhausted'),
false
)
} finally {
console.warn = originalWarn
}
})

test('slack injected confirmation retry resends only failed recipient', async () => {
process.env.PEAR_INTEGRATION_EVENT_INJECTED_RETRY_DELAYS_MS = '5,5'
const harness = makeHarness(['alice', 'bob'], { manualInjectedConfirmations: true })
Expand Down
45 changes: 40 additions & 5 deletions src/main/integration-event-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,14 @@ function logIntegrationEvent(message: string, metadata: Record<string, unknown>)
void appendIntegrationEventLog(message, metadata)
}

// A recipient that the broker reports as nonexistent (rotated out, never
// registered) is a PERMANENT delivery failure, not a transient one: retrying
// the inject — and the Relaycast fallback — just spams the same vanished agent
// every few seconds for every event. Mirrors broker.ts isMissingAgentError.
function isPermanentRecipientError(error: unknown): boolean {
return /agent_not_found|no worker named|not found/i.test(toErrorMessage(error))
}

function warnIntegrationEventAggregated(key: string, message: string, metadata: Record<string, unknown>): void {
if (!aggregatedWarnings.has(key) && aggregatedWarnings.size >= MAX_AGGREGATED_WARNING_KEYS) {
const oldestKey = aggregatedWarnings.keys().next().value
Expand Down Expand Up @@ -3256,13 +3264,35 @@ export class IntegrationEventBridge {
const aborted = failed.find((result) => isAbortError(result.error))
if (aborted) throw aborted.error

for (const { confirmation, error } of failed) {
// A recipient that no longer exists is a permanent failure — retrying just
// re-spams the vanished agent. Drop those from the retry set and warn once
// (aggregated). Treat them as resolved so the dedupe key commits and a
// duplicate of this event doesn't re-attempt delivery to the ghost.
const permanent = failed.filter(({ error }) => isPermanentRecipientError(error))
const retryable = failed.filter(({ error }) => !isPermanentRecipientError(error))
for (const { confirmation, error } of permanent) {
warnIntegrationEventAggregated(
`delivery recipient gone:${projectId}`,
'delivery recipient no longer registered; suppressing',
{
projectId,
eventId: event.id,
path: event.resource.path,
recipient: confirmation.recipient,
duplicateKey: deliveryClaim.key,
error: toErrorMessage(error)
}
)
}
if (retryable.length === 0) return

for (const { confirmation, error } of retryable) {
this.warnDeliveryInjectedConfirmationFailed(projectId, event, deliveryClaim, confirmation, error)
}

const delayMs = retryDelays[retryIndex]
if (delayMs === undefined) {
for (const { confirmation, error } of failed) {
for (const { confirmation, error } of retryable) {
console.warn('[integration-events] delivery injected retries exhausted', {
projectId,
eventId: event.id,
Expand All @@ -3273,10 +3303,10 @@ export class IntegrationEventBridge {
error: toErrorMessage(error)
})
}
throw failed[0].error
throw retryable[0].error
}

for (const { confirmation, error } of failed) {
for (const { confirmation, error } of retryable) {
warnIntegrationEventAggregated(
`delivery injected confirmation retrying:${projectId}`,
'delivery injected confirmation retrying',
Expand All @@ -3296,7 +3326,7 @@ export class IntegrationEventBridge {

await delay(delayMs, signal)
const retryConfirmations: InjectedConfirmation[] = []
for (const { confirmation } of failed) {
for (const { confirmation } of retryable) {
const nextAttempt = (attemptsByRecipient.get(confirmation.recipient) ?? 1) + 1
attemptsByRecipient.set(confirmation.recipient, nextAttempt)
try {
Expand Down Expand Up @@ -3378,6 +3408,11 @@ export class IntegrationEventBridge {
const onlineExplicitAgents = projectAgents
? targets.agents.filter((agent) => projectAgents.includes(agent))
: []
// An EMPTY live roster is treated as a transient broker-startup race, not
// proof the agent is gone — fall back to the configured targets so a real
// agent that simply hasn't been listed yet still gets the event. If that
// optimism is wrong (agent truly vanished), delivery fails permanently and
// confirmInjectedDeliveryWithRetry suppresses it instead of retry-storming.
const explicitAgents = projectAgents && projectAgents.length === 0 && targets.agents.length > 0
? targets.agents
: onlineExplicitAgents
Expand Down
37 changes: 37 additions & 0 deletions src/main/integration-mounts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
} from '@relayfile/sdk'
import { accountWorkspaceReadyRetryOptions, getAccountWorkspaceId, refreshCloudAuth, resolveCloudAuth } from './auth'
import { createPearMountLauncher } from './relayfile-mount-launcher'
import { forgetMountPid, killMountPid, reapOrphanedMountPids, recordMountPid } from './relayfile-mount-pids'

const MOUNT_READY_TIMEOUT_MS = 60_000
const MOUNT_SYNC_TIMEOUT = '180s'
Expand Down Expand Up @@ -202,6 +203,11 @@ export class IntegrationMountManager {
// One mount per configured Relayfile path. Mounting provider roots makes
// large integrations mirror far more data than the project selected.
private handles = new Map<string, MountedWorkspaceHandle>()
// Last-known OS pid per mounted path, captured from handle.status(). Used to
// hard-kill a detached mount daemon the SDK's stop() may not reap, and to
// distinguish this instance's live mounts from reapable orphans at boot.
private mountPids = new Map<string, number>()
private reapedOrphanMounts = false
private refreshTimers = new Map<string, ReturnType<typeof setTimeout>>()
private authRestartedAt = new Map<string, number>()
private restartAttempts = new Map<string, number>()
Expand Down Expand Up @@ -309,6 +315,8 @@ export class IntegrationMountManager {
// from the desired set (see mount()).
const handle = this.handles.get(providerRoot)
this.handles.delete(providerRoot)
const pid = this.mountPids.get(providerRoot)
this.mountPids.delete(providerRoot)
if (handle) {
await handle.stop().catch((error) => {
console.warn(
Expand All @@ -317,6 +325,14 @@ export class IntegrationMountManager {
)
})
}
// Backstop: stop() does not reliably reap a detached (`background: true`)
// daemon, so kill the captured pid if it is still a live relayfile-mount.
// Without this, a wedged mount's old process survives every forced restart
// and they pile up (the respawn-storm orphans).
if (killMountPid(pid)) {
console.warn(`[integration-mounts] Hard-killed lingering mount process ${pid} for ${providerRoot}`)
}
await forgetMountPid(pid)
}

currentWorkspaceId(): string | null {
Expand Down Expand Up @@ -615,6 +631,19 @@ export class IntegrationMountManager {
}
this.workspaceId = workspaceId

// Once per app run, now that we're committed to mounting, reap
// relayfile-mount daemons a prior instance left orphaned (crash / kill -9 /
// dev hot-reload bypassing before-quit). Only PIDs we recorded and that
// still resolve to a live relayfile-mount are killed; our own live mounts
// are excluded.
if (!this.reapedOrphanMounts) {
this.reapedOrphanMounts = true
const killed = await reapOrphanedMountPids(new Set(this.mountPids.values()))
Comment on lines +639 to +641

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reap orphans before auth-gated exits

Because this reaper runs after resolveCloudAuth() and getAccountWorkspaceId(), a restart with missing/expired cloud auth or a workspace lookup failure exits mount() before any recorded orphan PIDs are checked. That leaves the exact crash/hot-reload relayfile-mount processes this change is meant to clean up running during the user-action-required state, so a wedged mount can keep burning CPU until re-auth succeeds. Move the recorded-PID reap to a boot/auth-independent path or before these auth/workspace gates.

Useful? React with 👍 / 👎.

if (killed.length > 0) {
console.warn(`[integration-mounts] Reaped ${killed.length} orphaned mount process(es) from a prior run`, { pids: killed })
}
}

const mountRoot = integrationMountRootForWorkspace(workspaceId)
await ensureProtectedDirectory(join(homedir(), '.agentworkforce'))
await ensureProtectedDirectory(join(homedir(), '.agentworkforce', 'pear'))
Expand Down Expand Up @@ -695,6 +724,14 @@ export class IntegrationMountManager {
handle = await startMount()
}
this.handles.set(spec.remotePath, handle)
// Capture the daemon pid so stopHandle can hard-kill it if stop() leaves
// it detached, and so the boot-time orphan reaper can tell our live
// mounts apart from strays. Best-effort: a missing pid just skips both.
const pid = await handle.status().then((s) => s.pid).catch(() => undefined)
if (typeof pid === 'number') {
this.mountPids.set(spec.remotePath, pid)
await recordMountPid(pid, spec.localDir)
}
this.scheduleRefresh(spec.remotePath, handle)
} catch (error) {
if (isUnauthorizedError(error)) {
Expand Down
18 changes: 18 additions & 0 deletions src/main/integrations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,24 @@ describe('IntegrationsManager', () => {
})
})

it('surfaces a restart-cap-exceeded mount as user-actionable auth recovery', () => {
const manager = new IntegrationsManager()
const observer = mock.integrationMountManager.setHealthObserver.mock.calls.at(-1)?.[0]

observer?.({
type: 'restart-cap-exceeded',
remotePath: '/slack/channels/C0BBTBC1RCM__epic/messages',
attempts: 5,
reason: 'reconcile loop stalled'
})

expect(manager.getAuthRecoveryState()).toMatchObject({
reason: 'cloud-auth-required',
failureClass: 'mount-recovery-exhausted'
})
expect(manager.getAuthRecoveryState()?.message).toMatch(/stopped recovering after 5 restarts/)
})

it('clears sticky auth recovery state after the all-dead recovery retry respawns mounts', async () => {
vi.useFakeTimers()
const manager = new IntegrationsManager()
Expand Down
13 changes: 13 additions & 0 deletions src/main/integrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,19 @@ export class IntegrationsManager {
this.setAuthRecoveryState(alert.reason, undefined, alert.message)
return
}
// A mount that exhausted its restart cap is paused, not recovering — five
// consecutive restarts failed, so this is not a transient blip the loop
// will fix on its own. Surface it to the user (the documented remedy for a
// wedged integration mount is re-auth) instead of letting the alert drop
// silently and the mount cycle every reset window forever.
if (alert.type === 'restart-cap-exceeded') {
this.setAuthRecoveryState(
'cloud-auth-required',
'mount-recovery-exhausted',
`Integration sync for ${alert.remotePath} stopped recovering after ${alert.attempts} restarts (${alert.reason}). Re-authenticate to resume writebacks.`
)
return
}
// Only auth-stall alerts carry pendingWriteback/message and map to a
// mount-auth-stall event. reconcile-stalled alerts have neither field, so
// emitting one here previously produced a malformed event with undefined
Expand Down
122 changes: 122 additions & 0 deletions src/main/relayfile-mount-pids.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { mkdtemp, rm, readFile } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'

const mockHome = vi.hoisted(() => ({ dir: '' }))
const mockChild = vi.hoisted(() => ({ execFileSync: vi.fn() }))

vi.mock('node:os', async (importOriginal) => {
const actual = await importOriginal<typeof import('node:os')>()
return { ...actual, homedir: () => mockHome.dir }
})

vi.mock('node:child_process', () => ({
execFileSync: mockChild.execFileSync
}))

import { forgetMountPid, killMountPid, reapOrphanedMountPids, recordMountPid } from './relayfile-mount-pids'

function registryFile(): string {
return join(mockHome.dir, '.agentworkforce', 'pear', 'relayfile', 'mount-pids.json')
}

async function readPids(): Promise<number[]> {
const raw = await readFile(registryFile(), 'utf8')
return (JSON.parse(raw).pids as Array<{ pid: number }>).map((entry) => entry.pid)
}

describe('relayfile-mount-pids', () => {
let killSpy: ReturnType<typeof vi.spyOn>

beforeEach(async () => {
mockHome.dir = await mkdtemp(join(tmpdir(), 'mount-pids-'))
mockChild.execFileSync.mockReset()
// Default: any probed pid resolves to a relayfile-mount executable.
mockChild.execFileSync.mockReturnValue('relayfile-mount\n')
})

afterEach(async () => {
killSpy?.mockRestore()
await rm(mockHome.dir, { recursive: true, force: true })
})

it('records and forgets pids', async () => {
await recordMountPid(4242, '/local/a')
await recordMountPid(4243, '/local/b')
expect((await readPids()).sort()).toEqual([4242, 4243])

// Re-recording the same pid does not duplicate it.
await recordMountPid(4242, '/local/a')
expect((await readPids()).filter((pid) => pid === 4242)).toHaveLength(1)

await forgetMountPid(4242)
expect(await readPids()).toEqual([4243])
})

it('ignores invalid pids', async () => {
await recordMountPid(undefined, '/x')
await recordMountPid(0, '/x')
await recordMountPid(1, '/x')
await expect(readFile(registryFile(), 'utf8')).rejects.toThrow()
})

it('reaps an orphaned live relayfile-mount and clears the registry', async () => {
killSpy = vi.spyOn(process, 'kill').mockImplementation(() => true)
await recordMountPid(50_000, '/local/a')

const killed = await reapOrphanedMountPids(new Set())

expect(killed).toEqual([50_000])
expect(killSpy).toHaveBeenCalledWith(50_000, 'SIGKILL')
expect(await readPids()).toEqual([])
})

it('does not reap a pid this instance still manages', async () => {
killSpy = vi.spyOn(process, 'kill').mockImplementation(() => true)
await recordMountPid(50_001, '/local/a')

const killed = await reapOrphanedMountPids(new Set([50_001]))

expect(killed).toEqual([])
expect(killSpy).not.toHaveBeenCalledWith(50_001, 'SIGKILL')
// The still-managed pid is retained for a future reap.
expect(await readPids()).toEqual([50_001])
})

it('does not reap a recycled pid whose executable is not relayfile-mount', async () => {
killSpy = vi.spyOn(process, 'kill').mockImplementation(() => true)
mockChild.execFileSync.mockReturnValue('some-other-process\n')
await recordMountPid(50_002, '/local/a')

const killed = await reapOrphanedMountPids(new Set())

expect(killed).toEqual([])
expect(killSpy).not.toHaveBeenCalledWith(50_002, 'SIGKILL')
})

it('does not reap a dead pid', async () => {
killSpy = vi.spyOn(process, 'kill').mockImplementation((_pid, signal) => {
if (signal === 0) throw new Error('ESRCH')
return true
})
await recordMountPid(50_003, '/local/a')

const killed = await reapOrphanedMountPids(new Set())

expect(killed).toEqual([])
expect(killSpy).not.toHaveBeenCalledWith(50_003, 'SIGKILL')
})

it('killMountPid kills a live relayfile-mount and ignores a dead pid', async () => {
killSpy = vi.spyOn(process, 'kill').mockImplementation(() => true)
expect(killMountPid(50_004)).toBe(true)
expect(killSpy).toHaveBeenCalledWith(50_004, 'SIGKILL')

killSpy.mockImplementation((_pid: number, signal?: string | number) => {
if (signal === 0) throw new Error('ESRCH')
return true
})
expect(killMountPid(50_005)).toBe(false)
})
})
Loading
Loading