Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 91 additions & 1 deletion src/main/integration-mounts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ vi.mock('./relayfile-mount-launcher', () => ({
createPearMountLauncher: vi.fn((options) => ({ start: mock.startMount, __options: options }))
}))

import { IntegrationMountManager } from './integration-mounts'
import { IntegrationMountManager, readStalledInjectedRevisions } from './integration-mounts'

describe('IntegrationMountManager', () => {
beforeEach(() => {
Expand Down Expand Up @@ -843,3 +843,93 @@ describe('IntegrationMountManager', () => {
})
})
})

describe('readStalledInjectedRevisions', () => {
const NOW = '2026-06-08T13:00:00.000Z'
const REMOTE_PATH = '/slack/channels/C123__slug/threads'
const REPLY_PATH = '/slack/channels/C123__slug/threads/1780871788_370329/replies/1780921813_531539.json'

const injectingLine = (path: string, timestamp: string): string => JSON.stringify({
timestamp,
message: 'injecting',
metadata: { projectId: 'p', eventId: 'ws:file.created:rev_1', path, recipients: ['slack-comms'] }
})

const stateWith = (overrides: Record<string, unknown> = {}): Record<string, unknown> => ({
status: 'ready',
lastSuccessfulReconcileAt: '2026-06-08T12:55:00.000Z',
files: {},
...overrides
})

beforeEach(() => {
mock.readFile.mockClear()
vi.useFakeTimers()
vi.setSystemTime(new Date(NOW))
})

afterEach(() => {
vi.useRealTimers()
})

it('flags an injected revision that aged past the grace window and never landed in state.files', async () => {
mock.readFile.mockResolvedValueOnce(injectingLine(REPLY_PATH, '2026-06-08T12:40:00.000Z'))

const result = await readStalledInjectedRevisions(REMOTE_PATH, stateWith())

expect(result).toEqual({
missingCount: 1,
oldestInjectedAt: '2026-06-08T12:40:00.000Z',
examples: [REPLY_PATH]
})
})

it('does not flag a revision still within the grace window', async () => {
mock.readFile.mockResolvedValueOnce(injectingLine(REPLY_PATH, '2026-06-08T12:55:30.000Z'))

expect(await readStalledInjectedRevisions(REMOTE_PATH, stateWith())).toBeNull()
})

it('does not flag a revision the mount has already tracked in state.files', async () => {
mock.readFile.mockResolvedValueOnce(injectingLine(REPLY_PATH, '2026-06-08T12:40:00.000Z'))

const result = await readStalledInjectedRevisions(
REMOTE_PATH,
stateWith({ files: { [REPLY_PATH]: { revision: 'rev_1', status: 'ready' } } })
)

expect(result).toBeNull()
})

it('does not restart a mount that has not reconciled since the revision arrived', async () => {
mock.readFile.mockResolvedValueOnce(injectingLine(REPLY_PATH, '2026-06-08T12:40:00.000Z'))

// lastSuccessfulReconcileAt predates the injected revision → no chance to pull it yet.
expect(await readStalledInjectedRevisions(
REMOTE_PATH,
stateWith({ lastSuccessfulReconcileAt: '2026-06-08T12:30:00.000Z' })
)).toBeNull()
})

it('does not restart a mount that has never reconciled (omitted lastSuccessfulReconcileAt)', async () => {
mock.readFile.mockResolvedValueOnce(injectingLine(REPLY_PATH, '2026-06-08T12:40:00.000Z'))

const state = stateWith()
delete state.lastSuccessfulReconcileAt
expect(await readStalledInjectedRevisions(REMOTE_PATH, state)).toBeNull()
})

it('ignores injected revisions for a different mount root', async () => {
mock.readFile.mockResolvedValueOnce(
injectingLine('/slack/channels/C999__other/threads/1/replies/2.json', '2026-06-08T12:40:00.000Z')
)

expect(await readStalledInjectedRevisions(REMOTE_PATH, stateWith())).toBeNull()
})

it('returns null when the events log is unreadable', async () => {
mock.readFile.mockRejectedValueOnce(new Error('ENOENT'))

expect(await readStalledInjectedRevisions(REMOTE_PATH, stateWith())).toBeNull()
})
})
112 changes: 112 additions & 0 deletions src/main/integration-mounts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ const MOUNT_AUTH_RESTART_THROTTLE_MS = 60_000
// coalesce into one restart instead of racing it.
const MOUNT_HEALTH_POLL_INTERVAL_MS = 45_000
const MOUNT_SYNC_WEDGE_FAILURES = 3
// Defense-in-depth backstop for a silently-dropped incremental sync event (the
// class of bug fixed server-side by cloud#2010 — an event whose path the mount's
// literal-prefix filter rejected, so the mount cycles status:ready yet never
// tracks the revision). Under normal operation a received revision lands within
// one reconcile (incremental applies in seconds; a ws->poll fallback within
// ~5min), so a revision still absent after this window — while the mount keeps
// reconciling — indicates a real drop, not an in-flight pull. 15min wall-clock
// clears normal latency (incl. the ~5min poll fallback) with wide margin and is
// well under the ~100min periodic full-pull self-heal, so it recovers a drop far
// sooner than a scheduled full pull would. Wall-clock (not cycle-count) stays
// correct if the mount's interval/transport knobs change. Paired with an
// alive-guard (lastSuccessfulReconcileAt > injectedAt) so we only restart a mount
// that has demonstrably reconciled since receipt yet still lacks the revision.
const STALLED_REVISION_GRACE_MS = 15 * 60_000
const STALLED_REVISION_LOG_SCAN_LINES = 500
// Mirrors integration-event-bridge.ts INTEGRATION_EVENT_LOG_PATH. Duplicated to
// avoid a mounts->bridge import cycle; hoist to a shared module if it grows.
const INTEGRATION_EVENT_LOG_PATH = join(homedir(), '.agentworkforce', 'pear', 'integration-events.log')

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Unbounded Log File Growth

The log file integration-events.log is appended to indefinitely without any rotation or truncation mechanism. Over time, this file can grow to hundreds of megabytes or gigabytes, which will eventually cause readFile to fail or exhaust system memory.

Consider implementing a log rotation or truncation strategy (e.g., capping the file size or rotating it periodically) in the event bridge layer to ensure long-term stability.

export const MAX_LOCAL_INTEGRATION_MOUNT_PATHS = 24

type IntegrationMountInput = {
Expand Down Expand Up @@ -328,6 +346,32 @@ export class IntegrationMountManager {
this.handledHealthErrorKeys.set(remotePath, healthErrorKey)
}
}

// Defense-in-depth backstop for a silently-dropped incremental event: the
// mount keeps cycling status:ready with no lastError, so the auth and
// sync-wedge checks above never fire. If the event-bridge injected a
// revision under this mount that the mount's own state.json still does not
// list past the grace window — and the mount has reconciled since it arrived
// — force a clearState restart so the next full pull re-pulls it.
if (state) {
const stalled = await readStalledInjectedRevisions(remotePath, state)
if (stalled) {
const healthErrorKey = ['stalled-events', stalled.oldestInjectedAt, stalled.missingCount].join('|')
if (this.handledHealthErrorKeys.get(remotePath) === healthErrorKey) continue
const queued = this.queueForcedRestart(remotePath, 'stalled events', { clearState: true })
this.handledHealthErrorKeys.set(remotePath, healthErrorKey)
if (queued) {
console.warn(
`[integration-mounts] Mount stalled (injected revisions never materialized) for ${remotePath}; restarting with fresh full pull`,
{
missing: stalled.missingCount,
oldestInjectedAt: stalled.oldestInjectedAt,
examples: stalled.examples
}
)
Comment on lines +362 to +371

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Stalled-event handling marks the issue as handled even when restart is throttled, so recovery may never be retried.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/main/integration-mounts.ts, line 362:

<comment>Stalled-event handling marks the issue as handled even when restart is throttled, so recovery may never be retried.</comment>

<file context>
@@ -328,6 +346,32 @@ export class IntegrationMountManager {
+          const healthErrorKey = ['stalled-events', stalled.oldestInjectedAt, stalled.missingCount].join('|')
+          if (this.handledHealthErrorKeys.get(remotePath) === healthErrorKey) continue
+          const queued = this.queueForcedRestart(remotePath, 'stalled events', { clearState: true })
+          this.handledHealthErrorKeys.set(remotePath, healthErrorKey)
+          if (queued) {
+            console.warn(
</file context>
Suggested change
this.handledHealthErrorKeys.set(remotePath, healthErrorKey)
if (queued) {
console.warn(
`[integration-mounts] Mount stalled (injected revisions never materialized) for ${remotePath}; restarting with fresh full pull`,
{
missing: stalled.missingCount,
oldestInjectedAt: stalled.oldestInjectedAt,
examples: stalled.examples
}
)
if (queued) {
this.handledHealthErrorKeys.set(remotePath, healthErrorKey)
console.warn(
`[integration-mounts] Mount stalled (injected revisions never materialized) for ${remotePath}; restarting with fresh full pull`,
{
missing: stalled.missingCount,
oldestInjectedAt: stalled.oldestInjectedAt,
examples: stalled.examples
}
)
}

}
}
}
}
}

Expand Down Expand Up @@ -662,6 +706,74 @@ function isMountSyncWedgeOutput(text: string): boolean {
return /context deadline exceeded|i\/o timeout|Client\.Timeout exceeded/i.test(text)
}

type StalledInjectedRevisions = {
missingCount: number
oldestInjectedAt: string
examples: string[]
}

// Detects revisions the event-bridge has injected for this mount (logged as
// "injecting" with the resolved suffixed path) that the mount's own state.json
// still has not tracked — the signature of a silently-dropped incremental event,
// which leaves the mount cycling status:ready with no error. `state.files` is
// keyed by the full normalized (suffixed) remote path, so it compares
// apples-to-apples with the injected path. Fires only when a path is absent for
// longer than the full-pull grace window AND the mount has successfully reconciled
// since the revision arrived (lastSuccessfulReconcileAt > injectedAt) — proving
// the mount is alive and had a chance to pull it, so absence is a real drop rather
// than a slow pull or a never-reconciled/booting mount.
export async function readStalledInjectedRevisions(
remotePath: string,
state: Record<string, unknown>
): Promise<StalledInjectedRevisions | null> {
let logText: string
try {
logText = await readFile(INTEGRATION_EVENT_LOG_PATH, 'utf8')
} catch {
return null
}

const syncedPaths = new Set(Object.keys(asRecord(state.files) ?? {}))
// omitempty in the daemon's publicState: a never-reconciled mount omits this,
// so lastReconcileMs stays null and every candidate is skipped below.
const lastReconcileMs = parseTimestamp(
typeof state.lastSuccessfulReconcileAt === 'string' ? state.lastSuccessfulReconcileAt : null
)
const prefix = `${remotePath}/`
const now = Date.now()
const missing: Array<{ path: string; injectedAt: string }> = []
const seen = new Set<string>()

for (const line of logText.trim().split(/\r?\n/u).slice(-STALLED_REVISION_LOG_SCAN_LINES)) {
if (!line.includes('"injecting"')) continue
let entry: Record<string, unknown> | null
try {
entry = asRecord(JSON.parse(line))
} catch {
continue
}
if (!entry || entry.message !== 'injecting') continue
const meta = asRecord(entry.metadata)
const path = meta && typeof meta.path === 'string' ? meta.path : null
const injectedAt = typeof entry.timestamp === 'string' ? entry.timestamp : null
if (!path || !injectedAt || !path.startsWith(prefix) || seen.has(path)) continue
seen.add(path)
const injectedMs = parseTimestamp(injectedAt)
if (injectedMs === null || now - injectedMs < STALLED_REVISION_GRACE_MS) continue
if (lastReconcileMs === null || lastReconcileMs <= injectedMs) continue
if (syncedPaths.has(path)) continue
missing.push({ path, injectedAt })
}

if (!missing.length) return null
missing.sort((a, b) => a.injectedAt.localeCompare(b.injectedAt))
return {
missingCount: missing.length,
oldestInjectedAt: missing[0].injectedAt,
examples: missing.slice(0, 3).map((entry) => entry.path)
}
}

function mountPathsForIntegrations(integrations: IntegrationMountInput[]): string[] {
const mountPaths = Array.from(new Set(
integrations
Expand Down
Loading