From ce2d065c3ace48a1ac76259d84eb80c0483518ae Mon Sep 17 00:00:00 2001 From: Khaliq Date: Mon, 8 Jun 2026 15:08:35 +0200 Subject: [PATCH] feat(integration-mounts): stalled-revision mount-recovery backstop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a defense-in-depth detector to the mount health poll that catches a mount which is cycling status:ready with no error yet has silently stopped materializing new revisions — the failure mode behind the Slack threads read-down outage (cloud emitted bare channel-id event paths that the mount's literal-prefix isUnderRemoteRoot filter dropped). The durable fix for that specific bug shipped server-side in cloud#2010 (live-verified in prod); this is the general client-side backstop so any future silent incremental-sync drop self-recovers instead of staying invisible until the ~100min full pull. Detector (readStalledInjectedRevisions + a check in checkMountHealth, which previously restarted ONLY on auth failure or deadline wedge): if the event-bridge injected a revision under a mount (logged as "injecting" in integration-events.log) that the mount's own state.json still doesn't list after a 15min wall-clock grace AND the mount has successfully reconciled since the revision arrived (lastSuccessfulReconcileAt > injectedAt), force a queueForcedRestart(..., { clearState: true }) so the next full pull re-pulls it. Reuses the existing 60s restart throttle + handledHealthErrorKeys dedup, so one drop = one restart. General across mounts; only mounts with injecting entries under their remoteRoot can match. Correctness anchors (verified against the v0.8.19 daemon source): state.files is keyed by the full normalized suffixed remote path, so the comparison is apples-to-apples; lastSuccessfulReconcileAt is UTC RFC3339Nano + omitempty, so a never-reconciled mount is skipped. Grace is wall-clock (not cycle-count) to survive interval/transport retuning. Adds 7 unit tests for the detector. (Originally also carried a threads-only RELAYFILE_MOUNT_WEBSOCKET=false staleness-floor stopgap; dropped after cloud#2010 restored correct incremental read-down with websocket on.) Co-Authored-By: Claude Opus 4.8 --- src/main/integration-mounts.test.ts | 92 ++++++++++++++++++++++- src/main/integration-mounts.ts | 112 ++++++++++++++++++++++++++++ 2 files changed, 203 insertions(+), 1 deletion(-) diff --git a/src/main/integration-mounts.test.ts b/src/main/integration-mounts.test.ts index be096c42..d0127e57 100644 --- a/src/main/integration-mounts.test.ts +++ b/src/main/integration-mounts.test.ts @@ -118,7 +118,7 @@ vi.mock('./relayfile-mount-launcher', () => ({ createPearMountLauncher: vi.fn((options) => ({ start: mock.startMount, __options: options })) })) -import { IntegrationMountManager } from './integration-mounts' +import { IntegrationMountManager, readStalledInjectedRevisions } from './integration-mounts' describe('IntegrationMountManager', () => { beforeEach(() => { @@ -843,3 +843,93 @@ describe('IntegrationMountManager', () => { }) }) }) + +describe('readStalledInjectedRevisions', () => { + const NOW = '2026-06-08T13:00:00.000Z' + const REMOTE_PATH = '/slack/channels/C123__slug/threads' + const REPLY_PATH = '/slack/channels/C123__slug/threads/1780871788_370329/replies/1780921813_531539.json' + + const injectingLine = (path: string, timestamp: string): string => JSON.stringify({ + timestamp, + message: 'injecting', + metadata: { projectId: 'p', eventId: 'ws:file.created:rev_1', path, recipients: ['slack-comms'] } + }) + + const stateWith = (overrides: Record = {}): Record => ({ + status: 'ready', + lastSuccessfulReconcileAt: '2026-06-08T12:55:00.000Z', + files: {}, + ...overrides + }) + + beforeEach(() => { + mock.readFile.mockClear() + vi.useFakeTimers() + vi.setSystemTime(new Date(NOW)) + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('flags an injected revision that aged past the grace window and never landed in state.files', async () => { + mock.readFile.mockResolvedValueOnce(injectingLine(REPLY_PATH, '2026-06-08T12:40:00.000Z')) + + const result = await readStalledInjectedRevisions(REMOTE_PATH, stateWith()) + + expect(result).toEqual({ + missingCount: 1, + oldestInjectedAt: '2026-06-08T12:40:00.000Z', + examples: [REPLY_PATH] + }) + }) + + it('does not flag a revision still within the grace window', async () => { + mock.readFile.mockResolvedValueOnce(injectingLine(REPLY_PATH, '2026-06-08T12:55:30.000Z')) + + expect(await readStalledInjectedRevisions(REMOTE_PATH, stateWith())).toBeNull() + }) + + it('does not flag a revision the mount has already tracked in state.files', async () => { + mock.readFile.mockResolvedValueOnce(injectingLine(REPLY_PATH, '2026-06-08T12:40:00.000Z')) + + const result = await readStalledInjectedRevisions( + REMOTE_PATH, + stateWith({ files: { [REPLY_PATH]: { revision: 'rev_1', status: 'ready' } } }) + ) + + expect(result).toBeNull() + }) + + it('does not restart a mount that has not reconciled since the revision arrived', async () => { + mock.readFile.mockResolvedValueOnce(injectingLine(REPLY_PATH, '2026-06-08T12:40:00.000Z')) + + // lastSuccessfulReconcileAt predates the injected revision → no chance to pull it yet. + expect(await readStalledInjectedRevisions( + REMOTE_PATH, + stateWith({ lastSuccessfulReconcileAt: '2026-06-08T12:30:00.000Z' }) + )).toBeNull() + }) + + it('does not restart a mount that has never reconciled (omitted lastSuccessfulReconcileAt)', async () => { + mock.readFile.mockResolvedValueOnce(injectingLine(REPLY_PATH, '2026-06-08T12:40:00.000Z')) + + const state = stateWith() + delete state.lastSuccessfulReconcileAt + expect(await readStalledInjectedRevisions(REMOTE_PATH, state)).toBeNull() + }) + + it('ignores injected revisions for a different mount root', async () => { + mock.readFile.mockResolvedValueOnce( + injectingLine('/slack/channels/C999__other/threads/1/replies/2.json', '2026-06-08T12:40:00.000Z') + ) + + expect(await readStalledInjectedRevisions(REMOTE_PATH, stateWith())).toBeNull() + }) + + it('returns null when the events log is unreadable', async () => { + mock.readFile.mockRejectedValueOnce(new Error('ENOENT')) + + expect(await readStalledInjectedRevisions(REMOTE_PATH, stateWith())).toBeNull() + }) +}) diff --git a/src/main/integration-mounts.ts b/src/main/integration-mounts.ts index 85eaeed4..58ec2f4a 100644 --- a/src/main/integration-mounts.ts +++ b/src/main/integration-mounts.ts @@ -20,6 +20,24 @@ const MOUNT_AUTH_RESTART_THROTTLE_MS = 60_000 // coalesce into one restart instead of racing it. const MOUNT_HEALTH_POLL_INTERVAL_MS = 45_000 const MOUNT_SYNC_WEDGE_FAILURES = 3 +// Defense-in-depth backstop for a silently-dropped incremental sync event (the +// class of bug fixed server-side by cloud#2010 — an event whose path the mount's +// literal-prefix filter rejected, so the mount cycles status:ready yet never +// tracks the revision). Under normal operation a received revision lands within +// one reconcile (incremental applies in seconds; a ws->poll fallback within +// ~5min), so a revision still absent after this window — while the mount keeps +// reconciling — indicates a real drop, not an in-flight pull. 15min wall-clock +// clears normal latency (incl. the ~5min poll fallback) with wide margin and is +// well under the ~100min periodic full-pull self-heal, so it recovers a drop far +// sooner than a scheduled full pull would. Wall-clock (not cycle-count) stays +// correct if the mount's interval/transport knobs change. Paired with an +// alive-guard (lastSuccessfulReconcileAt > injectedAt) so we only restart a mount +// that has demonstrably reconciled since receipt yet still lacks the revision. +const STALLED_REVISION_GRACE_MS = 15 * 60_000 +const STALLED_REVISION_LOG_SCAN_LINES = 500 +// Mirrors integration-event-bridge.ts INTEGRATION_EVENT_LOG_PATH. Duplicated to +// avoid a mounts->bridge import cycle; hoist to a shared module if it grows. +const INTEGRATION_EVENT_LOG_PATH = join(homedir(), '.agentworkforce', 'pear', 'integration-events.log') export const MAX_LOCAL_INTEGRATION_MOUNT_PATHS = 24 type IntegrationMountInput = { @@ -328,6 +346,32 @@ export class IntegrationMountManager { this.handledHealthErrorKeys.set(remotePath, healthErrorKey) } } + + // Defense-in-depth backstop for a silently-dropped incremental event: the + // mount keeps cycling status:ready with no lastError, so the auth and + // sync-wedge checks above never fire. If the event-bridge injected a + // revision under this mount that the mount's own state.json still does not + // list past the grace window — and the mount has reconciled since it arrived + // — force a clearState restart so the next full pull re-pulls it. + if (state) { + const stalled = await readStalledInjectedRevisions(remotePath, state) + if (stalled) { + const healthErrorKey = ['stalled-events', stalled.oldestInjectedAt, stalled.missingCount].join('|') + if (this.handledHealthErrorKeys.get(remotePath) === healthErrorKey) continue + const queued = this.queueForcedRestart(remotePath, 'stalled events', { clearState: true }) + this.handledHealthErrorKeys.set(remotePath, healthErrorKey) + if (queued) { + console.warn( + `[integration-mounts] Mount stalled (injected revisions never materialized) for ${remotePath}; restarting with fresh full pull`, + { + missing: stalled.missingCount, + oldestInjectedAt: stalled.oldestInjectedAt, + examples: stalled.examples + } + ) + } + } + } } } @@ -662,6 +706,74 @@ function isMountSyncWedgeOutput(text: string): boolean { return /context deadline exceeded|i\/o timeout|Client\.Timeout exceeded/i.test(text) } +type StalledInjectedRevisions = { + missingCount: number + oldestInjectedAt: string + examples: string[] +} + +// Detects revisions the event-bridge has injected for this mount (logged as +// "injecting" with the resolved suffixed path) that the mount's own state.json +// still has not tracked — the signature of a silently-dropped incremental event, +// which leaves the mount cycling status:ready with no error. `state.files` is +// keyed by the full normalized (suffixed) remote path, so it compares +// apples-to-apples with the injected path. Fires only when a path is absent for +// longer than the full-pull grace window AND the mount has successfully reconciled +// since the revision arrived (lastSuccessfulReconcileAt > injectedAt) — proving +// the mount is alive and had a chance to pull it, so absence is a real drop rather +// than a slow pull or a never-reconciled/booting mount. +export async function readStalledInjectedRevisions( + remotePath: string, + state: Record +): Promise { + let logText: string + try { + logText = await readFile(INTEGRATION_EVENT_LOG_PATH, 'utf8') + } catch { + return null + } + + const syncedPaths = new Set(Object.keys(asRecord(state.files) ?? {})) + // omitempty in the daemon's publicState: a never-reconciled mount omits this, + // so lastReconcileMs stays null and every candidate is skipped below. + const lastReconcileMs = parseTimestamp( + typeof state.lastSuccessfulReconcileAt === 'string' ? state.lastSuccessfulReconcileAt : null + ) + const prefix = `${remotePath}/` + const now = Date.now() + const missing: Array<{ path: string; injectedAt: string }> = [] + const seen = new Set() + + for (const line of logText.trim().split(/\r?\n/u).slice(-STALLED_REVISION_LOG_SCAN_LINES)) { + if (!line.includes('"injecting"')) continue + let entry: Record | null + try { + entry = asRecord(JSON.parse(line)) + } catch { + continue + } + if (!entry || entry.message !== 'injecting') continue + const meta = asRecord(entry.metadata) + const path = meta && typeof meta.path === 'string' ? meta.path : null + const injectedAt = typeof entry.timestamp === 'string' ? entry.timestamp : null + if (!path || !injectedAt || !path.startsWith(prefix) || seen.has(path)) continue + seen.add(path) + const injectedMs = parseTimestamp(injectedAt) + if (injectedMs === null || now - injectedMs < STALLED_REVISION_GRACE_MS) continue + if (lastReconcileMs === null || lastReconcileMs <= injectedMs) continue + if (syncedPaths.has(path)) continue + missing.push({ path, injectedAt }) + } + + if (!missing.length) return null + missing.sort((a, b) => a.injectedAt.localeCompare(b.injectedAt)) + return { + missingCount: missing.length, + oldestInjectedAt: missing[0].injectedAt, + examples: missing.slice(0, 3).map((entry) => entry.path) + } +} + function mountPathsForIntegrations(integrations: IntegrationMountInput[]): string[] { const mountPaths = Array.from(new Set( integrations