Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/factory-sdk/src/cli/fleet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export async function runFleetCli(argv: string[], deps: FleetCliDeps = {}): Prom
if (!loaded) throw new Error('factory command requires config')
const mount = await buildMount(loaded, deps)
const factory = createFactory(loaded.config, { mount, fleet })
return await runFactoryCommand(command, factory, mount, loaded.config, globals, out)
return await runFactoryCommand(command, factory, mount, fleet, loaded.config, globals, out)
}
}
} catch (error) {
Expand Down Expand Up @@ -185,6 +185,7 @@ async function runFactoryCommand(
command: Extract<ParsedCommand, { kind: 'factory' | 'factory-triage' | 'factory-dispatch' }>,
factory: Factory,
mount: MountClient,
fleet: FleetClient,
config: FactoryConfig,
globals: GlobalOptions,
out: Pick<NodeJS.WriteStream, 'write'>,
Expand Down Expand Up @@ -217,6 +218,7 @@ async function runFactoryCommand(
heartbeatPath: config.loop.heartbeatPath,
registryPath: config.loop.registryPath,
staleMs: config.loop.heartbeatStaleMs,
fleet,
}))
return 0
}
Expand Down
67 changes: 63 additions & 4 deletions packages/factory-sdk/src/fleet/internal-fleet-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { BrokerEvent, SendMessageInput, SpawnPtyInput } from '@agent-relay/
import { InternalFleetClient, type HarnessDriverClientLike } from './internal-fleet-client'

class FakeHarnessDriverClient implements HarnessDriverClientLike {
brokerPid: number | undefined
readonly spawned: SpawnPtyInput[] = []
readonly released: Array<{ name: string; reason?: string }> = []
readonly sent: SendMessageInput[] = []
Expand All @@ -14,12 +15,13 @@ class FakeHarnessDriverClient implements HarnessDriverClientLike {
readonly exitListeners = new Set<(agent: { name: string; sessionId?: string }) => void>()
connectEventsCalls = 0

agents: Array<{ name: string }> = []
agents: Array<{ name: string; pid?: number }> = []
nextSessionRef = 'session-1'
nextPid: number | undefined

async spawnPty(input: SpawnPtyInput): Promise<{ name: string; session_ref: string }> {
async spawnPty(input: SpawnPtyInput): Promise<{ name: string; session_ref: string; pid?: number }> {
this.spawned.push(input)
this.agents.push({ name: input.name })
this.agents.push({ name: input.name, pid: this.nextPid })
return { name: input.name, session_ref: this.nextSessionRef }
}

Expand All @@ -28,7 +30,7 @@ class FakeHarnessDriverClient implements HarnessDriverClientLike {
return { name }
}

async listAgents(): Promise<Array<{ name: string }>> {
async listAgents(): Promise<Array<{ name: string; pid?: number }>> {
return this.agents
}

Expand Down Expand Up @@ -119,6 +121,63 @@ describe('InternalFleetClient', () => {
])
})

it('resolves an agent PID from the broker roster', async () => {
const harness = new FakeHarnessDriverClient()
harness.agents = [{ name: 'ar-1-impl', pid: 901969 }]
const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' })

await expect(fleet.resolveAgentPid('ar-1-impl')).resolves.toEqual({ status: 'found', pid: 901969 })
})

it('retries roster PID lookup when broker spawned-list registration lags spawn ack', async () => {
vi.useFakeTimers()
try {
const harness = new FakeHarnessDriverClient()
let listCalls = 0
harness.listAgents = async () => {
listCalls += 1
return listCalls === 1 ? [{ name: 'ar-1-impl' }] : [{ name: 'ar-1-impl', pid: 901969 }]
}
const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' })

const resolved = fleet.resolveAgentPid('ar-1-impl')
await vi.advanceTimersByTimeAsync(75)

await expect(resolved).resolves.toEqual({ status: 'found', pid: 901969 })
expect(listCalls).toBe(2)
} finally {
vi.useRealTimers()
}
})

it('distinguishes absent agents from live agents with no PID', async () => {
vi.useFakeTimers()
try {
const absentHarness = new FakeHarnessDriverClient()
const absentFleet = new InternalFleetClient({ client: absentHarness, cwd: '/worktree' })
const absent = absentFleet.resolveAgentPid('ar-1-impl')
await vi.advanceTimersByTimeAsync(150)
await expect(absent).resolves.toEqual({ status: 'missing' })

const unresolvedHarness = new FakeHarnessDriverClient()
unresolvedHarness.agents = [{ name: 'ar-1-impl' }]
const unresolvedFleet = new InternalFleetClient({ client: unresolvedHarness, cwd: '/worktree' })
const unresolved = unresolvedFleet.resolveAgentPid('ar-1-impl')
await vi.advanceTimersByTimeAsync(150)
await expect(unresolved).resolves.toEqual({ status: 'unresolved' })
} finally {
vi.useRealTimers()
}
})

it('surfaces the broker pid as protected process state', async () => {
const harness = new FakeHarnessDriverClient()
harness.brokerPid = 68009
const fleet = new InternalFleetClient({ client: harness, cwd: '/worktree' })

await expect(fleet.protectedPids()).resolves.toEqual([68009])
})

it('maps claude capability and per-spawn cwd', async () => {
const harness = new FakeHarnessDriverClient()
const fleet = new InternalFleetClient({ client: harness, cwd: '/default' })
Expand Down
71 changes: 66 additions & 5 deletions packages/factory-sdk/src/fleet/internal-fleet-client.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import { HarnessDriverClient } from '@agent-relay/harness-driver'
import { readFile } from 'node:fs/promises'
import { join } from 'node:path'

import type { BrokerEvent, ListAgent, SendMessageInput, SpawnPtyInput } from '@agent-relay/harness-driver'

import type { Capability, FleetClient, RosterEntry, SendInput, SpawnInput, SpawnResult } from '../ports/fleet'
import type { AgentPidResolution, Capability, FleetClient, RosterEntry, SendInput, SpawnInput, SpawnResult } from '../ports/fleet'
import type { Logger } from '../ports/system'

type SpawnedHandleLike = { name: string; sessionId?: string; session_ref?: string; sessionRef?: string; pid?: number }
type HarnessEventListener = (event: BrokerEvent) => void
type DriverAgentLike = { name: string; sessionId?: string }
type DriverAgentLike = { name: string; sessionId?: string; pid?: number }
type DriverDeliveryEventLike = BrokerEvent

export interface HarnessDriverClientLike {
readonly brokerPid?: number
spawnPty(input: SpawnPtyInput): Promise<SpawnedHandleLike>
release(name: string, reason?: string): Promise<{ name: string }>
listAgents(): Promise<Array<Pick<ListAgent, 'name'>>>
listAgents(): Promise<Array<Pick<ListAgent, 'name'> & { pid?: number }>>
sendMessage(input: SendMessageInput): Promise<{ event_id: string; targets?: string[] }>
sendInput(name: string, data: string): Promise<unknown>
connectEvents?(sinceSeq?: number): void
Expand Down Expand Up @@ -49,10 +52,13 @@ const selfNode: RosterEntry['nodes'][number] = {
capabilities: ['spawn:claude', 'spawn:codex'],
live: true,
}
const PID_RESOLVE_ATTEMPTS = 3
const PID_RESOLVE_BACKOFF_MS = 75

export class InternalFleetClient implements FleetClient {
readonly #client: HarnessDriverClientLike
readonly #cwd?: string
readonly #connectionPath?: string
readonly #resumeCapability: Capability
readonly #logger?: Logger
readonly #agentExitListeners = new Set<AgentExitListener>()
Expand All @@ -72,6 +78,7 @@ export class InternalFleetClient implements FleetClient {

constructor(options: InternalFleetClientOptions = {}) {
this.#cwd = options.cwd
this.#connectionPath = options.connectionPath
this.#resumeCapability = options.resumeCapability ?? 'spawn:codex'
this.#logger = options.logger
this.#client = options.client ?? HarnessDriverClient.connect({ cwd: options.cwd, connectionPath: options.connectionPath })
Expand Down Expand Up @@ -124,6 +131,52 @@ export class InternalFleetClient implements FleetClient {
}
}

async protectedPids(): Promise<number[]> {
const pids = new Set<number>()
if (Number.isInteger(this.#client.brokerPid) && this.#client.brokerPid! > 0) {
pids.add(this.#client.brokerPid!)
}
const connectionPid = await this.#connectionFilePid()
if (connectionPid) {
pids.add(connectionPid)
}
return [...pids].sort((a, b) => a - b)
}

async resolveAgentPid(name: string): Promise<AgentPidResolution> {
try {
let sawAgent = false
for (let attempt = 1; attempt <= PID_RESOLVE_ATTEMPTS; attempt += 1) {
const agent = (await this.#client.listAgents()).find((candidate) => candidate.name === name)
if (agent) {
sawAgent = true
}
if (typeof agent?.pid === 'number') {
return { status: 'found', pid: agent.pid }
}
if (attempt < PID_RESOLVE_ATTEMPTS) {
await sleep(PID_RESOLVE_BACKOFF_MS)
}
}
return sawAgent ? { status: 'unresolved' } : { status: 'missing' }
} catch (error) {
this.#logger?.warn?.('[factory-sdk] unable to resolve spawned agent pid from roster', error)
return { status: 'unresolved' }
}
}

async #connectionFilePid(): Promise<number | undefined> {
const path = this.#connectionPath ?? connectionPathForCwd(this.#cwd)
if (!path) return undefined
try {
const parsed = JSON.parse(await readFile(path, 'utf8')) as { pid?: unknown }
const pid = parsed.pid
return typeof pid === 'number' && Number.isInteger(pid) && pid > 0 ? pid : undefined
} catch {
return undefined
}
}

async sendMessage(input: SendInput): Promise<void> {
await this.#client.sendMessage(messageInputFrom(input))
}
Expand Down Expand Up @@ -351,6 +404,14 @@ export class InternalFleetClient implements FleetClient {
}
}

function connectionPathForCwd(cwd: string | undefined): string | undefined {
const stateDir = process.env.AGENT_RELAY_STATE_DIR
if (stateDir) return join(stateDir, 'connection.json')
return cwd ? join(cwd, '.agentworkforce', 'relay', 'connection.json') : undefined
}

const sleep = (ms: number): Promise<void> => new Promise((resolve) => setTimeout(resolve, ms))

function assertSelfNode(node: SpawnInput['node']): void {
if (node && node !== 'self') {
throw new Error(`InternalFleetClient only supports node 'self' tonight; received ${node}`)
Expand All @@ -361,11 +422,11 @@ function sessionRefFrom(handle: SpawnedHandleLike): string | undefined {
return handle.session_ref ?? handle.sessionRef ?? handle.sessionId
}

function spawnResultFrom(handle: SpawnedHandleLike): SpawnResult {
function spawnResultFrom(handle: SpawnedHandleLike, resolvedPid = handle.pid): SpawnResult {
const result: SpawnResult = { name: handle.name }
const sessionRef = sessionRefFrom(handle)
if (sessionRef) result.sessionRef = sessionRef
if (typeof handle.pid === 'number') result.pid = handle.pid
if (typeof resolvedPid === 'number') result.pid = resolvedPid
return result
}

Expand Down
Loading
Loading