diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index dcd54cfb..4937688c 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -880,22 +880,22 @@ test('local fallback watchers require historical download even for command roots integration({ provider: 'slack', integrationId: 'slack-1', - mountPaths: ['/slack/.outbox'], + mountPaths: ['/slack/channels/C123ABC/messages'], downloadHistoricalData: false, localMountPaths: [ - join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', '.outbox') + join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'channels', 'C123ABC', 'messages') ] }) ], [ - '/slack/.outbox/**' + '/slack/channels/C123ABC/messages/**' ] ) assert.deepEqual(roots, []) }) -test('local fallback watchers reject bare outbox command roots', () => { +test('local fallback watchers reject non-canonical command-looking roots', () => { const roots = localWatchRootsFor( 'workspace-id', [ @@ -917,6 +917,28 @@ test('local fallback watchers reject bare outbox command roots', () => { assert.deepEqual(roots, []) }) +test('local fallback watchers reject command roots with traversal segments', () => { + const roots = localWatchRootsFor( + 'workspace-id', + [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC/../messages'], + downloadHistoricalData: true, + localMountPaths: [ + join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'channels', 'C123ABC', '..', 'messages') + ] + }) + ], + [ + '/slack/channels/C123ABC/../messages/**' + ] + ) + + assert.deepEqual(roots, []) +}) + test('local fallback watchers do not watch broad provider history paths', () => { const roots = localWatchRootsFor( 'workspace-id', @@ -947,15 +969,15 @@ test('local fallback watchers are limited to bounded command roots when historic integration({ provider: 'slack', integrationId: 'slack-1', - mountPaths: ['/slack/.outbox'], + mountPaths: ['/slack/channels/C123ABC/messages'], downloadHistoricalData: true, localMountPaths: [ - join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', '.outbox') + join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'channels', 'C123ABC', 'messages') ] }) ], [ - '/slack/.outbox/**', + '/slack/channels/C123ABC/messages/**', '/slack/channels/C123ABC/**' ] ) @@ -963,10 +985,10 @@ test('local fallback watchers are limited to bounded command roots when historic const byRemoteRoot = new Map(roots.map((root) => [root.remoteRoot, root.localRoot])) assert.equal( - byRemoteRoot.get('/slack/.outbox')?.endsWith(join('workspace-id', 'slack', '.outbox')), + byRemoteRoot.get('/slack/channels/C123ABC/messages')?.endsWith(join('workspace-id', 'slack', 'channels', 'C123ABC', 'messages')), true ) - assert.deepEqual(Array.from(byRemoteRoot.keys()), ['/slack/.outbox']) + assert.deepEqual(Array.from(byRemoteRoot.keys()), ['/slack/channels/C123ABC/messages']) }) test('local fallback watchers accept legacy integration command mount paths', () => { @@ -976,19 +998,47 @@ test('local fallback watchers accept legacy integration command mount paths', () integration({ provider: 'slack', integrationId: 'slack-1', - mountPaths: ['/integrations/slack/.outbox'], + mountPaths: ['/integrations/slack/channels/C123ABC/messages'], + downloadHistoricalData: true, + localMountPaths: [ + join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'channels', 'C123ABC', 'messages') + ] + }) + ], + [ + '/slack/channels/C123ABC/messages/**' + ] + ) + + assert.equal(roots.some((root) => root.remoteRoot === '/slack/channels/C123ABC/messages'), true) +}) + +test('local fallback watchers use the shared Slack users command-root grammar', () => { + const roots = localWatchRootsFor( + 'workspace-id', + [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/users/U123/messages'], downloadHistoricalData: true, localMountPaths: [ - join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', '.outbox') + join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'users', 'U123', 'messages') ] }) ], [ - '/slack/.outbox/**' + '/slack/users/U123/messages/**' ] ) - assert.equal(roots.some((root) => root.remoteRoot === '/slack/.outbox'), true) + assert.equal( + roots.some((root) => + root.remoteRoot === '/slack/users/U123/messages' && + root.localRoot === join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'users', 'U123', 'messages') + ), + true + ) }) test('local watcher path construction does not duplicate remote path segments', () => { @@ -1352,7 +1402,10 @@ test('integration event delivery failures use aggregated warn cadence by default warnCalls.map((call) => (call[1] as { suppressedSinceLastLog: number }).suppressedSinceLastLog), [0, 24] ) - assert.deepEqual(getIntegrationEventTelemetrySnapshot().projects['project-1'], { + const telemetry = getIntegrationEventTelemetrySnapshot().projects['project-1'] + assert.ok(telemetry) + assert.equal(telemetry.brokerSendsDeferred >= 0, true) + assert.deepEqual({ ...telemetry, brokerSendsDeferred: 0 }, { eventsReceived: 26, eventsInjected: 0, eventsCoalesced: 0, diff --git a/src/main/cloud-agent.test.ts b/src/main/cloud-agent.test.ts index 821dc9d7..acab2828 100644 --- a/src/main/cloud-agent.test.ts +++ b/src/main/cloud-agent.test.ts @@ -17,6 +17,8 @@ const mock = vi.hoisted(() => { workspaceId: string localDir: string remotePath: string + localLayout?: string + syncMode?: string scopes?: string[] } @@ -386,10 +388,33 @@ describe('CloudAgentManager', () => { workspaceId: 'relay-workspace-id', localDir: mock.project.rootPath, remotePath: '/remote/project-1', + localLayout: 'exact', + syncMode: 'mirror', scopes: ['relayfile:fs:read:/**', 'relayfile:fs:write:/**'] }) }) + it('mounts non-root sandbox relayfile paths at the project root without local double-pathing', async () => { + mock.boxResponses.push({ + sandboxId: 'sandbox-1', + execUrl: 'https://sandbox.example', + relayfileToken: 'relayfile-token', + relayfileMountPath: '/workspace/project-1', + status: 'ready' + }) + const manager = new CloudAgentManager() + + await manager.attach('project-1', 'cloud-agent-1') + + expect(mock.mountInputs[0]).toMatchObject({ + localDir: '/tmp/project-1', + remotePath: '/workspace/project-1', + localLayout: 'exact', + syncMode: 'mirror' + }) + expect(mock.mountInputs[0]?.localDir).not.toContain('/workspace/project-1') + }) + it('keeps git-overlay clone source for dirty ssh-remote projects', async () => { mock.project.rootPath = await createCleanGitProject() await execFileAsync('git', ['remote', 'set-url', 'origin', 'git@github.com:acme/fast-repo.git'], { diff --git a/src/main/cloud-agent.ts b/src/main/cloud-agent.ts index 003b58b2..7d5f2e12 100644 --- a/src/main/cloud-agent.ts +++ b/src/main/cloud-agent.ts @@ -1212,6 +1212,8 @@ export class CloudAgentManager { localDir: project.rootPath, remotePath: sandbox.relayfileMountPath, mode: 'poll' as const, + localLayout: 'exact' as const, + syncMode: 'mirror' as const, background: true, agentName: `pear-${project.id}`, scopes: ['relayfile:fs:read:/**', 'relayfile:fs:write:/**'], @@ -1234,6 +1236,8 @@ export class CloudAgentManager { ...input, env: { ...input.env, + RELAYFILE_MOUNT_LOCAL_LAYOUT: 'exact', + RELAYFILE_MOUNT_SYNC_MODE: 'mirror', RELAYFILE_CONFLICT_POLICY: policy }, onEvent: (event) => { diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index 7af58e72..b400b690 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -13,6 +13,8 @@ import { type Subscription } from '@relayfile/sdk' import type { ConnectedIntegration } from './integrations' +// @ts-expect-error Node's strip-types test runner requires the explicit .ts extension. +import { isSlackWritebackCommandRoot } from './slack-writeback-command-roots.ts' import type { IntegrationEventTelemetryCounters, IntegrationEventTelemetrySnapshot @@ -744,9 +746,7 @@ function allowsLocalMountWatching(integration: ConnectedIntegration): boolean { } function isBoundedLocalCommandRoot(remoteRoot: string): boolean { - const segments = pathSegments(remoteRoot) - if (segments.length === 0) return false - return segments.some((segment) => segment === '.outbox') + return isSlackWritebackCommandRoot(remoteRoot) } function watchableLocalIntegrations(integrations: ConnectedIntegration[]): ConnectedIntegration[] { diff --git a/src/main/integration-mounts.test.ts b/src/main/integration-mounts.test.ts index 5360c49a..63df4e94 100644 --- a/src/main/integration-mounts.test.ts +++ b/src/main/integration-mounts.test.ts @@ -11,11 +11,14 @@ const mock = vi.hoisted(() => { localDir: string remotePath: string mode: string + localLayout?: string + syncMode?: string background: boolean agentName: string scopes?: string[] readyTimeoutMs: number launcher?: { + start?: (input: { env: Record }) => Promise __options?: { onEvent?: (event: { type: string; text?: string }) => void } @@ -23,6 +26,7 @@ const mock = vi.hoisted(() => { } const mountInputs: MountInput[] = [] + const startMount = vi.fn(async () => undefined) let currentAuth: MockCloudAuth | null = null let mountExpiresAt: string | null = null let mountSuggestedRefreshAt: string | null = null @@ -52,6 +56,7 @@ const mock = vi.hoisted(() => { return { mountInputs, + startMount, mkdir: vi.fn(async () => undefined), chmod: vi.fn(async () => undefined), rm: vi.fn(async () => undefined), @@ -107,7 +112,7 @@ vi.mock('./auth', () => ({ })) vi.mock('./relayfile-mount-launcher', () => ({ - createPearMountLauncher: vi.fn((options) => ({ start: vi.fn(), __options: options })) + createPearMountLauncher: vi.fn((options) => ({ start: mock.startMount, __options: options })) })) import { IntegrationMountManager } from './integration-mounts' @@ -118,6 +123,7 @@ describe('IntegrationMountManager', () => { mock.mkdir.mockClear() mock.chmod.mockClear() mock.rm.mockClear() + mock.startMount.mockClear() mock.currentAuth = { apiUrl: 'https://cloud.example', accessToken: 'account-token' @@ -154,6 +160,8 @@ describe('IntegrationMountManager', () => { workspaceId: 'account-workspace-id', localDir: '/tmp/pear-home/.agentworkforce/pear/relayfile/workspaces/account-workspace-id/github/repos', remotePath: '/github/repos', + localLayout: 'exact', + syncMode: 'mirror', agentName: 'pear-integrations-github-repos', scopes: ['relayfile:fs:read:/github/repos/**', 'relayfile:fs:write:/github/repos/**'] }) @@ -195,6 +203,8 @@ describe('IntegrationMountManager', () => { expect(mock.mountInputs[0]).toMatchObject({ localDir: '/tmp/pear-home/.agentworkforce/pear/relayfile/workspaces/account-workspace-id/discovery/slack', remotePath: '/discovery/slack', + localLayout: 'exact', + syncMode: 'mirror', agentName: 'pear-integrations-discovery-slack', scopes: ['relayfile:fs:read:/discovery/slack/**', 'relayfile:fs:write:/discovery/slack/**'] }) @@ -206,6 +216,61 @@ describe('IntegrationMountManager', () => { ]) }) + it('mounts canonical Slack command roots exactly once in write-only mode', async () => { + const manager = new IntegrationMountManager() + + await manager.ensureMounted([ + { + provider: 'slack', + mountPaths: ['/slack/channels/C123/messages'] + } + ]) + + expect(mock.mountInputs).toHaveLength(1) + expect(mock.mountInputs[0]).toMatchObject({ + localDir: '/tmp/pear-home/.agentworkforce/pear/relayfile/workspaces/account-workspace-id/slack/channels/C123/messages', + remotePath: '/slack/channels/C123/messages', + localLayout: 'exact', + syncMode: 'write-only', + scopes: ['relayfile:fs:read:/slack/channels/C123/messages/**', 'relayfile:fs:write:/slack/channels/C123/messages/**'] + }) + expect(mock.mountInputs[0]?.localDir).not.toContain('messages/slack/channels/C123/messages') + + const launcher = mock.mountInputs[0]?.launcher + const started = await launcher?.start?.({ + env: { + RELAYFILE_REMOTE_PATH: '/slack/channels/C123/messages', + RELAYFILE_LOCAL_DIR: '/tmp/root' + } + }) + + expect(started).toBeUndefined() + expect(mock.startMount).toHaveBeenCalledWith(expect.objectContaining({ + env: expect.objectContaining({ + RELAYFILE_MOUNT_LOCAL_LAYOUT: 'exact', + RELAYFILE_MOUNT_SYNC_MODE: 'write-only' + }) + })) + }) + + it('uses the shared Slack command-root grammar for write-only mode', async () => { + const manager = new IntegrationMountManager() + + await manager.ensureMounted([ + { + provider: 'slack', + mountPaths: ['/slack/users/U123/messages'] + } + ]) + + expect(mock.mountInputs[0]).toMatchObject({ + localDir: '/tmp/pear-home/.agentworkforce/pear/relayfile/workspaces/account-workspace-id/slack/users/U123/messages', + remotePath: '/slack/users/U123/messages', + localLayout: 'exact', + syncMode: 'write-only' + }) + }) + it('scopes bare discovery mounts to the integration provider', async () => { const manager = new IntegrationMountManager() diff --git a/src/main/integration-mounts.ts b/src/main/integration-mounts.ts index d2614da4..2a06394a 100644 --- a/src/main/integration-mounts.ts +++ b/src/main/integration-mounts.ts @@ -3,10 +3,13 @@ import { homedir } from 'node:os' import { join } from 'node:path' import { RelayfileSetup, + type MountLauncher, + type MountLauncherStart, type MountedWorkspaceHandle } from '@relayfile/sdk' import { accountWorkspaceReadyRetryOptions, getAccountWorkspaceId, refreshCloudAuth, resolveCloudAuth } from './auth' import { createPearMountLauncher } from './relayfile-mount-launcher' +import { isSlackWritebackCommandRoot } from './slack-writeback-command-roots' const MOUNT_READY_TIMEOUT_MS = 60_000 const MOUNT_REFRESH_FALLBACK_MARGIN_MS = 5 * 60_000 @@ -22,6 +25,8 @@ type IntegrationMountInput = { type IntegrationMountSpec = { remotePath: string localDir: string + localLayout: 'exact' + syncMode: 'mirror' | 'write-only' agentName: string scopes: string[] } @@ -278,24 +283,29 @@ export class IntegrationMountManager { await ensureProtectedDirectory(spec.localDir) try { - const startMount = async (): Promise => setup.mountWorkspace({ - workspaceId, - localDir: spec.localDir, - remotePath: spec.remotePath, - mode: 'poll', - background: true, - agentName: spec.agentName, - scopes: spec.scopes, - launcher: createPearMountLauncher({ - onEvent: (event) => { - const text = typeof event.text === 'string' ? event.text : '' - if (isMountAuthExpiredOutput(text)) { - this.queueForcedRestart(spec.remotePath, 'auth failure') + const startMount = async (): Promise => { + const mountInput = { + workspaceId, + localDir: spec.localDir, + remotePath: spec.remotePath, + mode: 'poll' as const, + localLayout: spec.localLayout, + syncMode: spec.syncMode, + background: true, + agentName: spec.agentName, + scopes: spec.scopes, + launcher: this.createContractLauncher(spec, createPearMountLauncher({ + onEvent: (event) => { + const text = typeof event.text === 'string' ? event.text : '' + if (isMountAuthExpiredOutput(text)) { + this.queueForcedRestart(spec.remotePath, 'auth failure') + } } - } - }), - readyTimeoutMs: MOUNT_READY_TIMEOUT_MS - }) + })), + readyTimeoutMs: MOUNT_READY_TIMEOUT_MS + } + return setup.mountWorkspace(mountInput) + } let handle: MountedWorkspaceHandle try { handle = await startMount() @@ -337,6 +347,8 @@ export class IntegrationMountManager { return { remotePath: mountPath, localDir: join(mountRoot, ...remotePathSegments(mountPath)), + localLayout: 'exact', + syncMode: isSlackWritebackCommandRoot(mountPath) ? 'write-only' : 'mirror', agentName: `pear-integrations-${agentSegment}`, scopes: [ `relayfile:fs:read:${mountPath}/**`, @@ -348,6 +360,21 @@ export class IntegrationMountManager { return specs.sort((a, b) => a.remotePath.localeCompare(b.remotePath)) } + private createContractLauncher(spec: IntegrationMountSpec, launcher: MountLauncher): MountLauncher { + return Object.assign({ + start: (input: MountLauncherStart) => launcher.start({ + ...input, + env: { + ...input.env, + RELAYFILE_MOUNT_LOCAL_LAYOUT: spec.localLayout, + RELAYFILE_MOUNT_SYNC_MODE: spec.syncMode + } + }) + }, { + __options: (launcher as { __options?: unknown }).__options + }) as MountLauncher + } + private async removeLegacyIntegrationMountRoot(mountRoot: string): Promise { const legacyRoot = join(mountRoot, 'integrations') await rm(legacyRoot, { recursive: true, force: true }).catch((error) => { diff --git a/src/main/integrations.test.ts b/src/main/integrations.test.ts index b379311c..a5d2236c 100644 --- a/src/main/integrations.test.ts +++ b/src/main/integrations.test.ts @@ -318,20 +318,21 @@ describe('IntegrationsManager', () => { ) }) - it('keeps local sync to discovery and narrow outbox command roots while historical download is off', () => { + it('keeps local sync to discovery and narrow canonical writeback command roots while historical download is off', () => { expect(localSyncMountPathsForIntegration({ provider: 'slack', integrationId: 'slack-integration-1', scope: {}, - mountPaths: ['/discovery/slack', '/slack/channels', '/slack/channels/C123', '/slack/dms/D123'], + mountPaths: ['/discovery/slack', '/slack/channels', '/slack/channels/C123', '/slack/dms/D123', '/slack/users/U123/messages'], connectedAt: '2026-06-05T00:00:00.000Z', notifyAgent: true, subscribeAgent: true, downloadHistoricalData: false })).toEqual([ '/discovery/slack', - '/slack/channels/C123/.outbox', - '/slack/dms/D123/.outbox' + '/slack/channels/C123/messages', + '/slack/dms/D123/messages', + '/slack/users/U123/messages' ]) }) @@ -348,7 +349,7 @@ describe('IntegrationsManager', () => { })).toEqual(['/discovery/slack', '/slack/channels/C123', '/slack/dms/D123']) }) - it('names outbox command roots in agent guidance while historical download is off', () => { + it('names canonical writeback command roots in agent guidance while historical download is off', () => { const manager = new IntegrationsManager() as unknown as SystemMessageSnippetBuilder const message = manager.buildSystemMessageSnippet([{ @@ -362,8 +363,8 @@ describe('IntegrationsManager', () => { downloadHistoricalData: false }], true) - expect(message).toContain('create writeback files under .integrations/slack/channels/C123/.outbox') - expect(message).toContain('Writeback command roots are mounted at .integrations/slack/channels/C123/.outbox') + expect(message).toContain('create writeback files under .integrations/slack/channels/C123/messages') + expect(message).toContain('Writeback command roots are mounted at .integrations/slack/channels/C123/messages') expect(message).not.toContain('create writeback files under .integrations/slack/channels/C123, not under discovery') }) diff --git a/src/main/integrations.ts b/src/main/integrations.ts index 2f49e8c7..08c082a9 100644 --- a/src/main/integrations.ts +++ b/src/main/integrations.ts @@ -25,6 +25,7 @@ import { removeProjectIntegrationsLink } from './integration-symlinks' import { INTEGRATIONS_CATALOG } from './integrations.catalog' +import { slackWritebackCommandMountPathFor } from './slack-writeback-command-roots' import { getRelayWorkspaceManager } from './relay-workspace' import { loadStore, saveStore, type ProjectIntegration } from './store' @@ -343,15 +344,11 @@ function isDiscoveryMountPath(mountPath: string): boolean { return mountPath.split('/').filter(Boolean)[0] === 'discovery' } -function outboxMountPathFor(mountPath: string): string { - const normalized = mountPath.trim().replace(/\/+$/u, '') - return `${normalized}/.outbox` -} - function writebackCommandMountPathsForIntegration(integration: ConnectedIntegration): string[] { return canonicalMountPathsForConnectedIntegration(integration) .filter(isNarrowHistoricalMountPath) - .map(outboxMountPathFor) + .map((mountPath) => slackWritebackCommandMountPathFor(toRelayfileProvider(integration.provider), mountPath)) + .filter((mountPath): mountPath is string => !!mountPath) } export function localSyncMountPathsForIntegration(integration: ConnectedIntegration): string[] { diff --git a/src/main/slack-writeback-command-roots.ts b/src/main/slack-writeback-command-roots.ts new file mode 100644 index 00000000..d2da0688 --- /dev/null +++ b/src/main/slack-writeback-command-roots.ts @@ -0,0 +1,45 @@ +const SLACK_WRITEBACK_COLLECTIONS = new Set(['channels', 'dms', 'users']) + +function normalizeRemotePath(path: string): string | null { + const segments = path + .trim() + .split(/[\\/]+/) + .filter((segment) => segment.length > 0) + if (segments.some((segment) => segment === '.' || segment === '..')) return null + return segments.length > 0 ? `/${segments.join('/')}` : '/' +} + +function isSlackProvider(provider: string): boolean { + const normalized = provider.trim().toLowerCase() + return normalized === 'slack' || normalized.startsWith('slack-') +} + +export function slackWritebackCommandMountPathFor(provider: string, mountPath: string): string | null { + if (!isSlackProvider(provider)) return null + const normalized = normalizeRemotePath(mountPath) + if (!normalized) return null + const segments = normalized.split('/').filter(Boolean) + if (segments[0] !== 'slack') return null + + const collection = segments[1] + if (!SLACK_WRITEBACK_COLLECTIONS.has(collection)) return null + + if (segments.length === 3) { + return `${normalized}/messages` + } + if (segments.length === 4 && segments[3] === 'messages') { + return normalized + } + if (segments.length === 5 && segments[3] === 'threads') { + return `${normalized}/replies` + } + if (segments.length === 6 && segments[3] === 'threads' && segments[5] === 'replies') { + return normalized + } + return null +} + +export function isSlackWritebackCommandRoot(remotePath: string): boolean { + const normalized = normalizeRemotePath(remotePath) + return normalized !== null && slackWritebackCommandMountPathFor('slack', remotePath) === normalized +}