Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cli/fleet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,8 @@ async function buildFleet(globals: GlobalOptions, loaded: LoadedConfig | undefin
if (globals.backend === 'internal') {
const stderr = deps.stderr ?? process.stderr
const logger = streamLogger(stderr)
const { client, started } = await (deps.ensureRelayBroker ?? ensureRelayBroker)({ cwd, connectionPath, logger })
return createFleet({ backend: 'internal', cwd, connectionPath }, { harnessClient: client, ownsBroker: started })
const { client, started, workspaceKey } = await (deps.ensureRelayBroker ?? ensureRelayBroker)({ cwd, connectionPath, logger })
return createFleet({ backend: 'internal', cwd, connectionPath }, { harnessClient: client, ownsBroker: started, workspaceKey })
}

return createFleet({ backend: globals.backend, cwd, connectionPath })
Expand Down
2 changes: 2 additions & 0 deletions src/fleet/create-fleet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface CreateFleetDeps {
// True when harnessClient owns a broker we spawned, so the fleet shuts it down
// on dispose instead of leaving it running.
ownsBroker?: boolean
workspaceKey?: string
}

export function createFleet(options: CreateFleetOptions = {}, deps: CreateFleetDeps = {}) {
Expand All @@ -26,6 +27,7 @@ export function createFleet(options: CreateFleetOptions = {}, deps: CreateFleetD
return new InternalFleetClient({
client: deps.harnessClient,
ownsBroker: deps.ownsBroker,
workspaceKey: deps.workspaceKey,
cwd: options.cwd,
connectionPath: options.connectionPath,
})
Expand Down
40 changes: 37 additions & 3 deletions src/fleet/ensure-relay-broker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { describe, expect, it, vi } from 'vitest'
import { ensureRelayBroker } from './ensure-relay-broker'
import type { HarnessDriverClientLike } from './internal-fleet-client'

const noStoredWorkspaceKey = () => undefined

const fakeClient = (tag: string): HarnessDriverClientLike => ({
// tag lets a test assert which client (connected vs spawned) came back.
brokerPid: tag === 'connected' ? 111 : 222,
Expand Down Expand Up @@ -42,10 +44,11 @@ describe('ensureRelayBroker', () => {
})
const spawn = vi.fn(async () => spawned)

const handle = await ensureRelayBroker({ cwd: '/work', connect, spawn, env: {} })
const handle = await ensureRelayBroker({ cwd: '/work', connect, spawn, env: {}, resolveWorkspaceKey: noStoredWorkspaceKey })

expect(handle.client).toBe(spawned)
expect(handle.started).toBe(true)
expect(handle.workspaceKey).toBeUndefined()
expect(spawn).toHaveBeenCalledWith({ cwd: '/work', workspaceKey: undefined })
})

Expand All @@ -72,25 +75,56 @@ describe('ensureRelayBroker', () => {
spawn: async () => fakeClient('spawned'),
logger: { info },
env: {},
resolveWorkspaceKey: noStoredWorkspaceKey,
})
expect(info).toHaveBeenCalledWith('[factory] no relay broker running; starting one', { reason: 'boom', joiningWorkspace: false })
})

it('threads a workspace key (env or option) into spawn so the broker JOINS', async () => {
const spawn = vi.fn(async () => fakeClient('spawned'))
await ensureRelayBroker({ connect: () => { throw new Error('no broker') }, spawn, env: { RELAY_WORKSPACE_KEY: 'rk_live_test' } })
const envHandle = await ensureRelayBroker({ connect: () => { throw new Error('no broker') }, spawn, env: { RELAY_WORKSPACE_KEY: 'rk_live_test' } })
expect(envHandle.workspaceKey).toBe('rk_live_test')
expect(spawn).toHaveBeenCalledWith(expect.objectContaining({ workspaceKey: 'rk_live_test' }))

spawn.mockClear()
await ensureRelayBroker({ connect: () => { throw new Error('no broker') }, spawn, workspaceKey: 'rk_live_explicit', env: {} })
const explicitHandle = await ensureRelayBroker({ connect: () => { throw new Error('no broker') }, spawn, workspaceKey: 'rk_live_explicit', env: {} })
expect(explicitHandle.workspaceKey).toBe('rk_live_explicit')
expect(spawn).toHaveBeenCalledWith(expect.objectContaining({ workspaceKey: 'rk_live_explicit' }))
})

it('uses the active Agent Relay workspace key when env is empty', async () => {
const spawn = vi.fn(async () => fakeClient('spawned'))
const handle = await ensureRelayBroker({
connect: () => { throw new Error('no broker') },
spawn,
env: {},
resolveWorkspaceKey: () => 'rk_live_stored',
})

expect(handle.workspaceKey).toBe('rk_live_stored')
expect(spawn).toHaveBeenCalledWith(expect.objectContaining({ workspaceKey: 'rk_live_stored' }))
})

it('returns the resolved workspace key when reusing an existing broker', async () => {
const connected = fakeClient('connected')
const handle = await ensureRelayBroker({
connect: () => connected,
spawn: async () => fakeClient('spawned'),
env: {},
resolveWorkspaceKey: () => 'rk_live_reused',
})

expect(handle.client).toBe(connected)
expect(handle.started).toBe(false)
expect(handle.workspaceKey).toBe('rk_live_reused')
})

it('fails with actionable guidance when there is no broker and no workspace key', async () => {
await expect(ensureRelayBroker({
connect: () => { throw new Error('no broker') },
spawn: async () => { throw new Error('insert into workspaces failed') },
env: {},
resolveWorkspaceKey: noStoredWorkspaceKey,
})).rejects.toThrow(/RELAY_WORKSPACE_KEY/u)
})
})
22 changes: 10 additions & 12 deletions src/fleet/ensure-relay-broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { HarnessDriverClient } from '@agent-relay/harness-driver'

import type { Logger } from '../ports/system'
import type { HarnessDriverClientLike } from './internal-fleet-client'
import { resolveRelayWorkspaceKey } from './relay-workspace-key'

export interface EnsureRelayBrokerOptions {
cwd?: string
Expand All @@ -18,6 +19,7 @@ export interface EnsureRelayBrokerOptions {
connect?: (options: { cwd?: string; connectionPath?: string }) => HarnessDriverClientLike
spawn?: (options: { cwd?: string; workspaceKey?: string }) => Promise<HarnessDriverClientLike>
env?: NodeJS.ProcessEnv
resolveWorkspaceKey?: (env: NodeJS.ProcessEnv) => string | undefined
}

export interface RelayBrokerHandle {
Expand All @@ -26,6 +28,7 @@ export interface RelayBrokerHandle {
// dispose). False when we reused a broker that was already running — that one
// belongs to the operator and must never be killed.
started: boolean
workspaceKey?: string
}

// Resolve the relay broker for the internal fleet backend: reuse the broker that
Expand All @@ -39,11 +42,16 @@ export async function ensureRelayBroker(options: EnsureRelayBrokerOptions = {}):
const connect = options.connect ?? ((opts) => HarnessDriverClient.connect(opts))
const spawn = options.spawn ?? ((opts) => HarnessDriverClient.spawn(opts))
const env = options.env ?? process.env
const workspaceKey = resolveRelayWorkspaceKey({
workspaceKey: options.workspaceKey,
env,
activeWorkspaceKey: options.resolveWorkspaceKey,
})

try {
const client = connect({ cwd: options.cwd, connectionPath: options.connectionPath })
options.logger?.info?.('[factory] reusing the relay broker that is already running')
return { client, started: false }
return { client, started: false, workspaceKey }
} catch (error) {
if (options.autoStart === false) {
throw error
Expand All @@ -53,17 +61,13 @@ export async function ensureRelayBroker(options: EnsureRelayBrokerOptions = {}):
// initialize relaycast session: insert into workspaces"). The workspace key
// (rk_live_…) makes the broker join. Pear injects it at spawn; standalone the
// operator supplies it via RELAY_WORKSPACE_KEY.
const workspaceKey = nonEmpty(options.workspaceKey)
?? nonEmpty(env.RELAY_WORKSPACE_KEY)
?? nonEmpty(env.AGENT_RELAY_WORKSPACE_KEY)
?? nonEmpty(env.RELAY_API_KEY)
options.logger?.info?.('[factory] no relay broker running; starting one', {
reason: error instanceof Error ? error.message : String(error),
joiningWorkspace: Boolean(workspaceKey),
})
try {
const client = await spawn({ cwd: options.cwd, workspaceKey })
return { client, started: true }
return { client, started: true, workspaceKey }
} catch (spawnError) {
if (!workspaceKey) {
throw new Error(
Expand All @@ -78,9 +82,3 @@ export async function ensureRelayBroker(options: EnsureRelayBrokerOptions = {}):
}
}
}

const nonEmpty = (value: string | undefined): string | undefined => {
if (typeof value !== 'string') return undefined
const trimmed = value.trim()
return trimmed.length > 0 ? trimmed : undefined
}
26 changes: 26 additions & 0 deletions src/fleet/internal-fleet-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ describe('InternalFleetClient', () => {
cwd: '/worktree',
restartPolicy: { max_restarts: 2 },
continueFrom: 'previous-session',
spawnMode: 'task_exit',
exitAfterTask: true,
harnessConfig: expect.objectContaining({
runtime: 'pty',
command: 'codex',
Expand Down Expand Up @@ -215,6 +217,28 @@ describe('InternalFleetClient', () => {
})
})

it('threads a resolved workspace key into spawned agent MCP env', async () => {
const harness = new FakeHarnessDriverClient()
const fleet = new InternalFleetClient({
client: harness,
workspaceKey: 'rk_live_from_broker',
resolveAgentRelayMcpCommand: () => ({ command: '/usr/local/bin/node', args: ['/repo/node_modules/agent-relay/dist/cli/index.js', 'mcp'] }),
})

await fleet.spawn({
name: 'ar-1-impl',
capability: 'spawn:codex',
task: 'do work',
})

const env = harness.spawned[0]?.harnessConfig?.env
expect(env).toEqual(expect.objectContaining({
RELAY_WORKSPACE_KEY: 'rk_live_from_broker',
RELAY_API_KEY: 'rk_live_from_broker',
}))
expect(harness.spawned[0]?.harnessConfig?.args.join('\n')).toContain('"RELAY_WORKSPACE_KEY" = "rk_live_from_broker"')
})

it('falls back to ordinary spawn when agent-relay MCP cannot be resolved', async () => {
const harness = new FakeHarnessDriverClient()
const logger = { warn: vi.fn() }
Expand All @@ -240,6 +264,8 @@ describe('InternalFleetClient', () => {
cwd: '/worktree',
restartPolicy: undefined,
continueFrom: undefined,
spawnMode: 'task_exit',
exitAfterTask: true,
})
expect(logger.warn).toHaveBeenCalledWith(
'[factory-sdk] agent-relay MCP command not found; spawning without MCP injection',
Expand Down
33 changes: 18 additions & 15 deletions src/fleet/internal-fleet-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { BrokerEvent, ListAgent, SendMessageInput, SpawnPtyInput } from '@a

import type { AgentMessage, AgentPidResolution, Capability, FleetClient, RosterEntry, SendInput, SpawnInput, SpawnResult } from '../ports/fleet'
import type { Logger } from '../ports/system'
import { resolveRelayWorkspaceKey } from './relay-workspace-key'

const requireForResolve = createRequire(import.meta.url)

Expand Down Expand Up @@ -41,6 +42,7 @@ export interface InternalFleetClientOptions {
ownsBroker?: boolean
cwd?: string
connectionPath?: string
workspaceKey?: string
resumeCapability?: Capability
logger?: Logger
resolveAgentRelayMcpCommand?: () => AgentRelayMcpCommand | undefined
Expand Down Expand Up @@ -75,6 +77,7 @@ export class InternalFleetClient implements FleetClient {
readonly #ownsBroker: boolean
readonly #cwd?: string
readonly #connectionPath?: string
readonly #workspaceKey?: string
readonly #resumeCapability: Capability
readonly #logger?: Logger
readonly #resolveAgentRelayMcpCommand: () => AgentRelayMcpCommand | undefined
Expand All @@ -99,6 +102,7 @@ export class InternalFleetClient implements FleetClient {
constructor(options: InternalFleetClientOptions = {}) {
this.#cwd = options.cwd
this.#connectionPath = options.connectionPath
this.#workspaceKey = options.workspaceKey
this.#resumeCapability = options.resumeCapability ?? 'spawn:codex'
this.#logger = options.logger
this.#resolveAgentRelayMcpCommand = options.resolveAgentRelayMcpCommand ?? resolveAgentRelayMcpCommand
Expand All @@ -118,6 +122,8 @@ export class InternalFleetClient implements FleetClient {
cwd: input.cwd ?? this.#cwd,
restartPolicy: input.restartPolicy,
continueFrom: input.sessionRef,
spawnMode: 'task_exit',
exitAfterTask: true,
})
const handle = await this.#client.spawnPty(spawnInput)

Expand Down Expand Up @@ -159,7 +165,7 @@ export class InternalFleetClient implements FleetClient {

return {
...input,
harnessConfig: buildRelayMcpHarnessConfig(input, command),
harnessConfig: buildRelayMcpHarnessConfig(input, command, this.#workspaceKey),
}
}

Expand Down Expand Up @@ -555,8 +561,12 @@ export function resolveAgentRelayMcpCommand(): AgentRelayMcpCommand | undefined
}
}

export function buildRelayMcpHarnessConfig(input: SpawnPtyInput, command: AgentRelayMcpCommand): NonNullable<SpawnPtyInput['harnessConfig']> {
const relayEnv = relayMcpEnv(input.name, input.agentToken)
export function buildRelayMcpHarnessConfig(
input: SpawnPtyInput,
command: AgentRelayMcpCommand,
workspaceKey?: string,
): NonNullable<SpawnPtyInput['harnessConfig']> {
const relayEnv = relayMcpEnv(input.name, input.agentToken, workspaceKey)
return {
runtime: 'pty',
command: input.cli,
Expand Down Expand Up @@ -612,18 +622,16 @@ function stdioMcpServer(command: AgentRelayMcpCommand, relayEnv: Record<string,
}
}

function relayMcpEnv(agentName: string, agentToken?: string): Record<string, string> {
function relayMcpEnv(agentName: string, agentToken?: string, workspaceKey?: string): Record<string, string> {
const env: Record<string, string> = {
RELAY_AGENT_NAME: agentName,
RELAY_AGENT_TYPE: 'agent',
RELAY_STRICT_AGENT_NAME: '1',
}
const workspaceKey = nonEmpty(process.env.RELAY_WORKSPACE_KEY) ??
nonEmpty(process.env.AGENT_RELAY_WORKSPACE_KEY) ??
relayWorkspaceKeyFromApiKey(process.env.RELAY_API_KEY)
if (workspaceKey) {
env.RELAY_WORKSPACE_KEY = workspaceKey
env.RELAY_API_KEY = workspaceKey
const resolvedWorkspaceKey = resolveRelayWorkspaceKey({ workspaceKey })
if (resolvedWorkspaceKey) {
env.RELAY_WORKSPACE_KEY = resolvedWorkspaceKey
env.RELAY_API_KEY = resolvedWorkspaceKey
} else {
// No workspace key in the daemon env: the spawned agent's agent-relay MCP
// will boot WITHOUT credentials, so it joins a bare relaycast workspace and
Expand Down Expand Up @@ -680,11 +688,6 @@ function nonEmpty(value: string | undefined): string | undefined {
return trimmed || undefined
}

function relayWorkspaceKeyFromApiKey(value: string | undefined): string | undefined {
const trimmed = nonEmpty(value)
return trimmed?.startsWith('rk_live_') ? trimmed : undefined
}

function messageInputFrom(input: SendInput): SendMessageInput {
return {
to: input.to,
Expand Down
26 changes: 26 additions & 0 deletions src/fleet/relay-workspace-key.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { activeWorkspaceKey } from '@agent-relay/cloud'

export interface ResolveRelayWorkspaceKeyOptions {
workspaceKey?: string
env?: NodeJS.ProcessEnv
activeWorkspaceKey?: (env: NodeJS.ProcessEnv) => string | undefined
}

export function resolveRelayWorkspaceKey(options: ResolveRelayWorkspaceKeyOptions = {}): string | undefined {
const env = options.env ?? process.env
return nonEmpty(options.workspaceKey)
?? nonEmpty(env.RELAY_WORKSPACE_KEY)
?? nonEmpty(env.AGENT_RELAY_WORKSPACE_KEY)
?? relayWorkspaceKeyFromApiKey(env.RELAY_API_KEY)
?? nonEmpty((options.activeWorkspaceKey ?? activeWorkspaceKey)(env))
}

export function relayWorkspaceKeyFromApiKey(value: string | undefined): string | undefined {
const trimmed = nonEmpty(value)
return trimmed?.startsWith('rk_live_') ? trimmed : undefined
}

function nonEmpty(value: string | undefined): string | undefined {
const trimmed = value?.trim()
return trimmed || undefined
}
2 changes: 1 addition & 1 deletion src/orchestrator/factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4467,7 +4467,7 @@ describe('FactoryLoop', () => {
const mirrorIssue = realIssueFile(724, ready, {
labels: [],
title: '[factory] GitHub mirror without synced labels',
description: 'Issue body\n\nSource: https://github.com/AgentWorkforce/relayfile-adapters/issues/224',
description: 'Issue body\n\nSource: [https://github.com/AgentWorkforce/relayfile-adapters/issues/224](<https://github.com/AgentWorkforce/relayfile-adapters/issues/224>)',
})
const mount = new FakeMountClient({ [issuePath(724)]: mirrorIssue })
const fleet = new FakeFleetClient()
Expand Down
22 changes: 20 additions & 2 deletions src/orchestrator/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4139,8 +4139,26 @@ function githubMirrorRepoForIssue(issue: LinearIssue): string | undefined {
}

function githubRepoFromUrl(url: string | undefined): string | undefined {
const match = url?.match(/^https:\/\/github\.com\/([^/]+)\/([^/]+)\/issues\/\d+(?:[/?#].*)?$/iu)
return match?.[1] && match[2] ? `${match[1]}/${match[2]}` : undefined
for (const candidate of githubUrlCandidates(url)) {
const match = candidate.match(/^https:\/\/github\.com\/([^/]+)\/([^/]+)\/issues\/\d+(?:[/?#].*)?$/iu)
if (match?.[1] && match[2]) {
return `${match[1]}/${match[2]}`
}
}
return undefined
}

function githubUrlCandidates(value: string | undefined): string[] {
if (!value) {
return []
}
const candidates = new Set<string>()
const trimmed = value.trim()
candidates.add(trimmed.replace(/^<|>$/gu, ''))
for (const match of trimmed.matchAll(/https:\/\/github\.com\/[^\s<>)\]]+/giu)) {
candidates.add(match[0].replace(/[),.;]+$/gu, ''))
}
return [...candidates]
}

function labelRoutesForIssue(
Expand Down
Loading