From f6666bea396f2213f13975ab16fe31d07f24c7cb Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 29 Apr 2026 13:05:41 +0000 Subject: [PATCH 1/3] refactor: extract run completion coordinator --- CONTEXT.md | 15 + src/host/hostMain.ts | 300 +------------- src/host/runCompletionCoordinator.ts | 376 ++++++++++++++++++ .../host/runCompletionCoordinator.test.ts | 296 ++++++++++++++ 4 files changed, 706 insertions(+), 281 deletions(-) create mode 100644 src/host/runCompletionCoordinator.ts create mode 100644 test/unit/host/runCompletionCoordinator.test.ts diff --git a/CONTEXT.md b/CONTEXT.md index 207ca20..576678c 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -23,6 +23,14 @@ _Avoid_: Finished session **Commandable Session**: A **Session** that can still accept user input and control commands. +**Waited Run**: +A run request where the caller asks `agent-tty` to wait until the command's completion signal is observed. +_Avoid_: Blocking run + +**Run Completion**: +The host-observed end point for a **Waited Run**, distinct from **Session** exit and caller timeout. +_Avoid_: Command finish, session completion + **Live Host Eligible Session**: A **Session** where callers should ask the live session host for fresh state. @@ -61,6 +69,10 @@ _Avoid_: Renderer capture - An **Offline Replay Eligible Session** is reconstructed from its persisted **Event Log** and manifest. - A **Snapshot Result** is derived from exactly one **Semantic Snapshot**. - A **Snapshot Artifact** contains exactly the **Snapshot Result** emitted to the caller. +- A **Commandable Session** can accept a **Waited Run**. +- A **Waited Run** may produce one **Run Completion**, time out for its caller, or be interrupted by **Session** exit. +- Caller timeout does not cancel the underlying **Run Completion**; it may still be observed later to keep internal completion bytes out of artifacts. +- After **Session** exit, an unobserved **Run Completion** can no longer arrive. ## Example dialogue @@ -70,6 +82,9 @@ _Avoid_: Renderer capture > **Dev:** "Does **Snapshot Capture** ask the renderer for terminal state?" > **Domain expert:** "No — the renderer first produces a **Semantic Snapshot**; **Snapshot Capture** derives and records the public result from that snapshot." +> **Dev:** "If a **Waited Run** times out, did the command finish?" +> **Domain expert:** "No. The caller stopped waiting, but the **Run Completion** may still arrive later and must still be recognized." + ## Flagged ambiguities - "Active" and "offline replay eligible" are independent classifications: `destroying` is both **Active** and **Offline Replay Eligible**. diff --git a/src/host/hostMain.ts b/src/host/hostMain.ts index 32e9575..9f2da19 100644 --- a/src/host/hostMain.ts +++ b/src/host/hostMain.ts @@ -1,4 +1,3 @@ -import crypto from 'node:crypto'; import { mkdir, rename, rm } from 'node:fs/promises'; import { dirname } from 'node:path'; import process from 'node:process'; @@ -9,13 +8,7 @@ import { EventLog } from './eventLog.js'; import { buildReplayInput } from './replay.js'; import { HostRendererManager } from './renderer.js'; import { RpcServer, type MethodHandler } from './rpcServer.js'; -import { - buildRunCompleteSignalSentinel, - RUN_MARKER_PATTERN, - RunCompletionPostambleEchoSanitizer, - RunCompletionSentinelScanner, - type SentinelPiece, -} from './runCompletionSentinel.js'; +import { RunCompletionCoordinator } from './runCompletionCoordinator.js'; import { SessionState } from './sessionState.js'; import { createPty } from '../pty/createPty.js'; import { encodeKey } from '../pty/keyEncoder.js'; @@ -87,29 +80,6 @@ type WaitOutcome = { timedOut: boolean; }; -interface ActiveRunCompletion { - inputRunSeq?: number; - sentinel: string; -} - -type RunCompletionWaitResult = - | { kind: 'completed'; seq: number } - | { kind: 'exited' }; - -interface RunCompletionWaiter { - reject: (error: unknown) => void; - resolve: (result: RunCompletionWaitResult) => void; -} - -type TimedRunCompletionWaitResult = - | RunCompletionWaitResult - | { kind: 'timeout' }; - -const RUN_COMPLETION_POSTAMBLE_ECHO_PREFIX = String.raw`printf '\033\137`; - -const RUN_COMPLETION_SIGNAL_TOKEN_BYTES = 4; -const MAX_RUN_COMPLETION_POSTAMBLE_ECHO_LENGTH = 64; - function normalizeExitSignal(signal: number | null): string | null { invariant( signal === null || (Number.isInteger(signal) && signal >= 0), @@ -139,65 +109,6 @@ function rethrowAsync(error: unknown): void { }); } -function shellOctalEscapedBytes(value: string): string { - invariant(typeof value === 'string', 'value must be a string'); - - return [...Buffer.from(value, 'utf8')] - .map((byte) => `\\${byte.toString(8).padStart(3, '0')}`) - .join(''); -} - -function generateRunCompleteSignalSentinel(): string { - const token = crypto - .randomBytes(RUN_COMPLETION_SIGNAL_TOKEN_BYTES) - .toString('base64url'); - invariant( - token.length === 6, - 'run-completion signal token must encode to six base64url characters', - ); - - return buildRunCompleteSignalSentinel(token); -} - -function buildRunCompletePostamble(marker: string, sentinel: string): string { - const markerMatch = RUN_MARKER_PATTERN.exec(marker); - invariant(markerMatch !== null, 'run marker must match expected format'); - invariant( - typeof sentinel === 'string' && sentinel.length > 0, - 'sentinel must be non-empty', - ); - - const markerPayload = markerMatch[1]; - invariant( - markerPayload !== undefined && markerPayload.length === 32, - 'run marker payload must be 32 lowercase hex characters', - ); - - const postamble = `printf '${shellOctalEscapedBytes(sentinel)}'`; - invariant( - postamble.length <= MAX_RUN_COMPLETION_POSTAMBLE_ECHO_LENGTH, - 'run-completion postamble echo must stay short enough to avoid terminal wrapping', - ); - invariant( - postamble.startsWith(RUN_COMPLETION_POSTAMBLE_ECHO_PREFIX), - 'run-completion postamble echo prefix must stay in sync with sanitizer', - ); - invariant( - !postamble.includes('agent-tty:run-complete:'), - 'run-completion postamble must not echo the complete sentinel label', - ); - invariant( - !postamble.includes('__AT_MARKER_'), - 'run-completion postamble must not echo the complete marker prefix', - ); - invariant( - !postamble.includes(markerPayload), - 'run-completion postamble must not echo the complete marker payload', - ); - - return `${postamble}\n`; -} - function resolveHostRendererName(input: string | undefined): RendererName { try { return resolveRendererName( @@ -376,10 +287,12 @@ export async function runHost(sessionId: string): Promise { invariant(false, 'PTY exit resolver must be initialized'); }; - const sentinelScanner = new RunCompletionSentinelScanner(); - const postambleEchoSanitizer = new RunCompletionPostambleEchoSanitizer(); - const activeRunCompletions = new Map(); - const runCompletionWaiters = new Map(); + const runCompletion = new RunCompletionCoordinator({ + appendOutput: async (data) => { + await eventLog.append('output', { data }); + }, + appendRunComplete: (payload) => eventLog.append('run_complete', payload), + }); let ptyIngestionQueue: Promise = Promise.resolve(); const ptyExitPromise = new Promise((resolve) => { @@ -408,96 +321,6 @@ export async function runHost(sessionId: string): Promise { ); state.setChildPid(pty.pid); - const resolveRunCompletionWaiter = (marker: string, seq: number): void => { - const waiter = runCompletionWaiters.get(marker); - if (waiter === undefined) { - return; - } - - runCompletionWaiters.delete(marker); - waiter.resolve({ kind: 'completed', seq }); - }; - - const rejectRunCompletionWaiter = (marker: string, error: unknown): void => { - const waiter = runCompletionWaiters.get(marker); - if (waiter === undefined) { - return; - } - - runCompletionWaiters.delete(marker); - waiter.reject(error); - }; - - const resolveRunCompletionWaitersForExit = (): void => { - for (const [marker, waiter] of runCompletionWaiters) { - runCompletionWaiters.delete(marker); - waiter.resolve({ kind: 'exited' }); - } - }; - - const subscribeRunCompletion = ( - marker: string, - ): Promise => { - invariant( - !runCompletionWaiters.has(marker), - 'run completion waiter must be unique per marker', - ); - - const { promise, reject, resolve } = - Promise.withResolvers(); - runCompletionWaiters.set(marker, { reject, resolve }); - return promise; - }; - - const waitForRunCompletion = async ( - marker: string, - completionPromise: Promise, - timeoutMs: number, - ): Promise => { - invariant( - Number.isInteger(timeoutMs) && timeoutMs > 0, - 'timeoutMs must be positive', - ); - - const { promise, reject, resolve } = - Promise.withResolvers(); - let resolved = false; - const timeoutHandle = setTimeout(() => { - if (resolved) { - return; - } - - resolved = true; - // Keep sentinel/postamble registrations active after timeout so the - // eventual internal completion bytes are still hidden from artifacts. - runCompletionWaiters.delete(marker); - resolve({ kind: 'timeout' }); - }, timeoutMs); - - void completionPromise.then( - (result) => { - if (resolved) { - return; - } - - resolved = true; - clearTimeout(timeoutHandle); - resolve(result); - }, - (error: unknown) => { - if (resolved) { - return; - } - - resolved = true; - clearTimeout(timeoutHandle); - reject(error instanceof Error ? error : new Error(String(error))); - }, - ); - - return await promise; - }; - const replayRendererThroughSeq = async (targetSeq: number): Promise => { invariant( Number.isInteger(targetSeq) && targetSeq >= 0, @@ -521,68 +344,6 @@ export async function runHost(sessionId: string): Promise { ); }; - const appendOutput = async (data: string): Promise => { - invariant(typeof data === 'string', 'output data must be a string'); - - const outputData = postambleEchoSanitizer.feed(data); - if (outputData.length > 0) { - await eventLog.append('output', { data: outputData }); - } - }; - - const appendFlushedPostambleEchoOutput = async (): Promise => { - const outputData = postambleEchoSanitizer.flush(); - if (outputData.length > 0) { - await eventLog.append('output', { data: outputData }); - } - }; - - const appendSentinelPieces = async ( - pieces: SentinelPiece[], - ): Promise => { - for (const piece of pieces) { - if (piece.type === 'output') { - await appendOutput(piece.data); - continue; - } - - const activeCompletion = activeRunCompletions.get(piece.marker); - invariant( - activeCompletion !== undefined, - 'run-completion sentinel must correspond to an active run marker', - ); - invariant( - activeCompletion.sentinel.length > 0, - 'active run-completion sentinel must be non-empty', - ); - - try { - const trailingEchoOutput = postambleEchoSanitizer.deregister( - piece.marker, - ); - if (trailingEchoOutput.length > 0) { - await eventLog.append('output', { data: trailingEchoOutput }); - } - - const seq = await eventLog.append('run_complete', { - marker: piece.marker, - ...(activeCompletion.inputRunSeq === undefined - ? {} - : { inputRunSeq: activeCompletion.inputRunSeq }), - }); - const deleted = activeRunCompletions.delete(piece.marker); - invariant( - deleted, - 'active run completion must be deleted after append succeeds', - ); - resolveRunCompletionWaiter(piece.marker, seq); - } catch (error) { - rejectRunCompletionWaiter(piece.marker, error); - throw error; - } - } - }; - const enqueuePtyIngestion = ( operation: () => Promise, ): Promise => { @@ -690,7 +451,7 @@ export async function runHost(sessionId: string): Promise { try { await eventLog.append('exit', { exitCode, exitSignal }); } finally { - resolveRunCompletionWaitersForExit(); + runCompletion.resolvePendingWaitersForExit(); try { await writeManifest(mPath, state.snapshot()); } finally { @@ -923,45 +684,23 @@ export async function runHost(sessionId: string): Promise { } satisfies RunResult; } - const marker = `__AT_MARKER_${crypto.randomUUID().replace(/-/g, '')}__`; - invariant( - RUN_MARKER_PATTERN.test(marker), - 'generated run marker must match expected format', - ); - let sentinel = generateRunCompleteSignalSentinel(); - while ( - [...activeRunCompletions.values()].some( - (completion) => completion.sentinel === sentinel, - ) - ) { - sentinel = generateRunCompleteSignalSentinel(); - } - const postamble = buildRunCompletePostamble(marker, sentinel); + const preparedRun = runCompletion.prepareWaitedRun(); const seq = await eventLog.append('input_run', { command, - marker, + marker: preparedRun.marker, noWait, }); - - invariant( - !activeRunCompletions.has(marker), - 'generated run marker must be unique among active completions', - ); - activeRunCompletions.set(marker, { inputRunSeq: seq, sentinel }); - sentinelScanner.register(marker, sentinel); - postambleEchoSanitizer.register(marker, postamble); - const completionPromise = subscribeRunCompletion(marker); - const injectedText = `${command}\n${postamble}`; + const completion = runCompletion.registerWaitedRun({ + marker: preparedRun.marker, + inputRunSeq: seq, + }); + const injectedText = `${command}\n${completion.postamble}`; const effectiveTimeoutMs = timeoutMs ?? 30_000; const startTime = Date.now(); pty.write(injectedText); lastActivityAt = Date.now(); - const waitResult = await waitForRunCompletion( - marker, - completionPromise, - effectiveTimeoutMs, - ); + const waitResult = await completion.wait(effectiveTimeoutMs); const durationMs = Date.now() - startTime; if (waitResult.kind === 'completed') { @@ -980,7 +719,7 @@ export async function runHost(sessionId: string): Promise { timedOut: waitResult.kind === 'timeout', seq, durationMs, - marker, + marker: preparedRun.marker, } satisfies RunResult; }, sendKeys: async (params: unknown) => { @@ -1417,7 +1156,7 @@ export async function runHost(sessionId: string): Promise { // is "in use" and should not be killed for inactivity. lastActivityAt = lastOutputAt; void enqueuePtyIngestion(async () => { - await appendSentinelPieces(sentinelScanner.feed(data)); + await runCompletion.ingestPtyData(data); }).catch((error: unknown) => { // Run-completion sentinels make serialized PTY ingestion part of the // canonical event-log contract: if appending output/control events fails, @@ -1430,8 +1169,7 @@ export async function runHost(sessionId: string): Promise { let ingestionError: unknown; void enqueuePtyIngestion(async () => { - await appendSentinelPieces(sentinelScanner.flush()); - await appendFlushedPostambleEchoOutput(); + await runCompletion.flushPtyDataOnExit(); }) .catch((error: unknown) => { ingestionError = error; diff --git a/src/host/runCompletionCoordinator.ts b/src/host/runCompletionCoordinator.ts new file mode 100644 index 0000000..1750812 --- /dev/null +++ b/src/host/runCompletionCoordinator.ts @@ -0,0 +1,376 @@ +import crypto from 'node:crypto'; + +import { + buildRunCompleteSignalSentinel, + RUN_MARKER_PATTERN, + RunCompletionPostambleEchoSanitizer, + RunCompletionSentinelScanner, + type SentinelPiece, +} from './runCompletionSentinel.js'; +import { invariant } from '../util/assert.js'; + +const RUN_COMPLETION_POSTAMBLE_ECHO_PREFIX = String.raw`printf '\033\137`; +const RUN_COMPLETION_SIGNAL_TOKEN_BYTES = 4; +const MAX_RUN_COMPLETION_POSTAMBLE_ECHO_LENGTH = 64; + +interface ActiveRunCompletion { + inputRunSeq: number; + sentinel: string; +} + +interface RunCompletionWaiter { + reject: (error: unknown) => void; + resolve: (result: RunCompletionWaitResult) => void; +} + +type RunCompletionWaitResult = + | { kind: 'completed'; seq: number } + | { kind: 'exited' }; + +export type RunCompletionWaitOutcome = + | RunCompletionWaitResult + | { kind: 'timeout' }; + +export interface RunCompletionEventAppender { + appendOutput(data: string): Promise; + appendRunComplete(payload: { + marker: string; + inputRunSeq?: number; + }): Promise; +} + +export interface PreparedWaitedRun { + marker: string; +} + +export interface RegisteredWaitedRunCompletion { + postamble: string; + sentinel: string; + wait(timeoutMs: number): Promise; +} + +function assertRunMarker(marker: string): void { + invariant(typeof marker === 'string', 'run marker must be a string'); + invariant( + RUN_MARKER_PATTERN.test(marker), + 'run marker must match expected format', + ); +} + +function shellOctalEscapedBytes(value: string): string { + invariant(typeof value === 'string', 'value must be a string'); + + return [...Buffer.from(value, 'utf8')] + .map((byte) => `\\${byte.toString(8).padStart(3, '0')}`) + .join(''); +} + +function generateRunCompleteSignalSentinel(): string { + const token = crypto + .randomBytes(RUN_COMPLETION_SIGNAL_TOKEN_BYTES) + .toString('base64url'); + invariant( + token.length === 6, + 'run-completion signal token must encode to six base64url characters', + ); + + return buildRunCompleteSignalSentinel(token); +} + +function buildRunCompletePostamble(marker: string, sentinel: string): string { + const markerMatch = RUN_MARKER_PATTERN.exec(marker); + invariant(markerMatch !== null, 'run marker must match expected format'); + invariant( + typeof sentinel === 'string' && sentinel.length > 0, + 'sentinel must be non-empty', + ); + + const markerPayload = markerMatch[1]; + invariant( + markerPayload !== undefined && markerPayload.length === 32, + 'run marker payload must be 32 lowercase hex characters', + ); + + const postamble = `printf '${shellOctalEscapedBytes(sentinel)}'`; + invariant( + postamble.length <= MAX_RUN_COMPLETION_POSTAMBLE_ECHO_LENGTH, + 'run-completion postamble echo must stay short enough to avoid terminal wrapping', + ); + invariant( + postamble.startsWith(RUN_COMPLETION_POSTAMBLE_ECHO_PREFIX), + 'run-completion postamble echo prefix must stay in sync with sanitizer', + ); + invariant( + !postamble.includes('agent-tty:run-complete:'), + 'run-completion postamble must not echo the complete sentinel label', + ); + invariant( + !postamble.includes('__AT_MARKER_'), + 'run-completion postamble must not echo the complete marker prefix', + ); + invariant( + !postamble.includes(markerPayload), + 'run-completion postamble must not echo the complete marker payload', + ); + + return `${postamble}\n`; +} + +export class RunCompletionCoordinator { + readonly #appender: RunCompletionEventAppender; + #sentinelScanner = new RunCompletionSentinelScanner(); + #postambleEchoSanitizer = new RunCompletionPostambleEchoSanitizer(); + readonly #activeRunCompletions = new Map(); + readonly #runCompletionWaiters = new Map(); + + public constructor(appender: RunCompletionEventAppender) { + invariant( + typeof appender.appendOutput === 'function', + 'run-completion output appender must be a function', + ); + invariant( + typeof appender.appendRunComplete === 'function', + 'run-completion completion appender must be a function', + ); + + this.#appender = appender; + } + + public prepareWaitedRun(): PreparedWaitedRun { + const marker = `__AT_MARKER_${crypto.randomUUID().replace(/-/g, '')}__`; + invariant( + RUN_MARKER_PATTERN.test(marker), + 'generated run marker must match expected format', + ); + + return { marker }; + } + + public registerWaitedRun(params: { + inputRunSeq: number; + marker: string; + }): RegisteredWaitedRunCompletion { + const { inputRunSeq, marker } = params; + assertRunMarker(marker); + invariant( + Number.isInteger(inputRunSeq) && inputRunSeq >= 0, + 'inputRunSeq must be a non-negative integer', + ); + invariant( + !this.#activeRunCompletions.has(marker), + 'generated run marker must be unique among active completions', + ); + + let sentinel = generateRunCompleteSignalSentinel(); + while (this.#hasActiveSentinel(sentinel)) { + sentinel = generateRunCompleteSignalSentinel(); + } + + const postamble = buildRunCompletePostamble(marker, sentinel); + this.#activeRunCompletions.set(marker, { inputRunSeq, sentinel }); + this.#sentinelScanner.register(marker, sentinel); + this.#postambleEchoSanitizer.register(marker, postamble); + const completionPromise = this.#subscribeRunCompletion(marker); + let waitStarted = false; + + return { + postamble, + sentinel, + wait: (timeoutMs: number): Promise => { + invariant( + !waitStarted, + 'run completion wait must only be started once', + ); + waitStarted = true; + return this.#waitForRunCompletion(marker, completionPromise, timeoutMs); + }, + }; + } + + public async ingestPtyData(data: string): Promise { + invariant(typeof data === 'string', 'PTY data must be a string'); + + await this.#appendSentinelPieces(this.#sentinelScanner.feed(data)); + } + + public async flushPtyDataOnExit(): Promise { + await this.#appendSentinelPieces(this.#sentinelScanner.flush()); + await this.#appendFlushedPostambleEchoOutput(); + } + + public resolvePendingWaitersForExit(): void { + const waiters = [...this.#runCompletionWaiters.values()]; + this.#runCompletionWaiters.clear(); + this.#activeRunCompletions.clear(); + this.#sentinelScanner = new RunCompletionSentinelScanner(); + this.#postambleEchoSanitizer = new RunCompletionPostambleEchoSanitizer(); + + for (const waiter of waiters) { + waiter.resolve({ kind: 'exited' }); + } + } + + #hasActiveSentinel(sentinel: string): boolean { + invariant( + typeof sentinel === 'string' && sentinel.length > 0, + 'sentinel must be non-empty', + ); + + return [...this.#activeRunCompletions.values()].some( + (completion) => completion.sentinel === sentinel, + ); + } + + #subscribeRunCompletion(marker: string): Promise { + assertRunMarker(marker); + invariant( + !this.#runCompletionWaiters.has(marker), + 'run completion waiter must be unique per marker', + ); + + const { promise, reject, resolve } = + Promise.withResolvers(); + this.#runCompletionWaiters.set(marker, { reject, resolve }); + return promise; + } + + async #waitForRunCompletion( + marker: string, + completionPromise: Promise, + timeoutMs: number, + ): Promise { + assertRunMarker(marker); + invariant( + Number.isInteger(timeoutMs) && timeoutMs > 0, + 'timeoutMs must be positive', + ); + + const { promise, reject, resolve } = + Promise.withResolvers(); + let resolved = false; + const timeoutHandle = setTimeout(() => { + if (resolved) { + return; + } + + resolved = true; + // Keep sentinel/postamble registrations active after timeout so the + // eventual internal completion bytes are still hidden from artifacts. + this.#runCompletionWaiters.delete(marker); + resolve({ kind: 'timeout' }); + }, timeoutMs); + + void completionPromise.then( + (result) => { + if (resolved) { + return; + } + + resolved = true; + clearTimeout(timeoutHandle); + resolve(result); + }, + (error: unknown) => { + if (resolved) { + return; + } + + resolved = true; + clearTimeout(timeoutHandle); + reject(error instanceof Error ? error : new Error(String(error))); + }, + ); + + return await promise; + } + + async #appendOutput(data: string): Promise { + invariant(typeof data === 'string', 'output data must be a string'); + + const outputData = this.#postambleEchoSanitizer.feed(data); + if (outputData.length > 0) { + await this.#appender.appendOutput(outputData); + } + } + + async #appendFlushedPostambleEchoOutput(): Promise { + const outputData = this.#postambleEchoSanitizer.flush(); + if (outputData.length > 0) { + await this.#appender.appendOutput(outputData); + } + } + + async #appendSentinelPieces(pieces: SentinelPiece[]): Promise { + for (const piece of pieces) { + if (piece.type === 'output') { + await this.#appendOutput(piece.data); + continue; + } + + const activeCompletion = this.#activeRunCompletions.get(piece.marker); + invariant( + activeCompletion !== undefined, + 'run-completion sentinel must correspond to an active run marker', + ); + invariant( + activeCompletion.sentinel.length > 0, + 'active run-completion sentinel must be non-empty', + ); + + try { + const trailingEchoOutput = this.#postambleEchoSanitizer.deregister( + piece.marker, + ); + if (trailingEchoOutput.length > 0) { + await this.#appender.appendOutput(trailingEchoOutput); + } + + const seq = await this.#appender.appendRunComplete({ + marker: piece.marker, + inputRunSeq: activeCompletion.inputRunSeq, + }); + invariant( + Number.isInteger(seq) && seq >= 0, + 'run_complete append sequence must be a non-negative integer', + ); + + const deleted = this.#activeRunCompletions.delete(piece.marker); + invariant( + deleted, + 'active run completion must be deleted after append succeeds', + ); + this.#resolveRunCompletionWaiter(piece.marker, seq); + } catch (error) { + this.#rejectRunCompletionWaiter(piece.marker, error); + throw error; + } + } + } + + #resolveRunCompletionWaiter(marker: string, seq: number): void { + assertRunMarker(marker); + invariant( + Number.isInteger(seq) && seq >= 0, + 'run_complete sequence must be a non-negative integer', + ); + + const waiter = this.#runCompletionWaiters.get(marker); + if (waiter === undefined) { + return; + } + + this.#runCompletionWaiters.delete(marker); + waiter.resolve({ kind: 'completed', seq }); + } + + #rejectRunCompletionWaiter(marker: string, error: unknown): void { + assertRunMarker(marker); + const waiter = this.#runCompletionWaiters.get(marker); + if (waiter === undefined) { + return; + } + + this.#runCompletionWaiters.delete(marker); + waiter.reject(error); + } +} diff --git a/test/unit/host/runCompletionCoordinator.test.ts b/test/unit/host/runCompletionCoordinator.test.ts new file mode 100644 index 0000000..93c8770 --- /dev/null +++ b/test/unit/host/runCompletionCoordinator.test.ts @@ -0,0 +1,296 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { RunCompletionCoordinator } from '../../../src/host/runCompletionCoordinator.js'; +import type { RunCompletionEventAppender } from '../../../src/host/runCompletionCoordinator.js'; + +interface OutputEvent { + type: 'output'; + data: string; +} + +interface RunCompleteEvent { + type: 'run_complete'; + marker: string; + inputRunSeq?: number; + seq: number; +} + +type AppendedEvent = OutputEvent | RunCompleteEvent; + +function createFakeAppender( + options: { + failOutput?: () => boolean; + failRunComplete?: () => boolean; + } = {}, +): { + appender: RunCompletionEventAppender; + events: AppendedEvent[]; +} { + const events: AppendedEvent[] = []; + let nextSeq = 100; + + return { + events, + appender: { + appendOutput: (data: string): Promise => { + if (options.failOutput?.() === true) { + return Promise.reject(new Error('output append failed')); + } + events.push({ type: 'output', data }); + return Promise.resolve(); + }, + appendRunComplete: (payload): Promise => { + if (options.failRunComplete?.() === true) { + return Promise.reject(new Error('run_complete append failed')); + } + const seq = nextSeq; + nextSeq += 1; + events.push({ type: 'run_complete', ...payload, seq }); + return Promise.resolve(seq); + }, + }, + }; +} + +describe('RunCompletionCoordinator', () => { + it('appends output around a run_complete event and resolves the waiter', async () => { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 7, + }); + const waitPromise = completion.wait(1_000); + + await coordinator.ingestPtyData(`before${completion.sentinel}after`); + + await expect(waitPromise).resolves.toEqual({ kind: 'completed', seq: 100 }); + expect(events).toEqual([ + { type: 'output', data: 'before' }, + { + type: 'run_complete', + marker: prepared.marker, + inputRunSeq: 7, + seq: 100, + }, + { type: 'output', data: 'after' }, + ]); + }); + + it('completes multiple active waited runs out of order', async () => { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const firstPrepared = coordinator.prepareWaitedRun(); + const firstCompletion = coordinator.registerWaitedRun({ + marker: firstPrepared.marker, + inputRunSeq: 10, + }); + const secondPrepared = coordinator.prepareWaitedRun(); + const secondCompletion = coordinator.registerWaitedRun({ + marker: secondPrepared.marker, + inputRunSeq: 11, + }); + const firstWait = firstCompletion.wait(1_000); + const secondWait = secondCompletion.wait(1_000); + + await coordinator.ingestPtyData(secondCompletion.sentinel); + await coordinator.ingestPtyData(firstCompletion.sentinel); + + await expect(secondWait).resolves.toEqual({ kind: 'completed', seq: 100 }); + await expect(firstWait).resolves.toEqual({ kind: 'completed', seq: 101 }); + expect(events).toEqual([ + { + type: 'run_complete', + marker: secondPrepared.marker, + inputRunSeq: 11, + seq: 100, + }, + { + type: 'run_complete', + marker: firstPrepared.marker, + inputRunSeq: 10, + seq: 101, + }, + ]); + }); + + it('keeps completion bytes hidden and appends run_complete after a waiter times out', async () => { + vi.useFakeTimers(); + try { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 12, + }); + const waitPromise = completion.wait(5); + + await vi.advanceTimersByTimeAsync(5); + + await expect(waitPromise).resolves.toEqual({ kind: 'timeout' }); + await coordinator.ingestPtyData(`before${completion.sentinel}after`); + + expect(events).toEqual([ + { type: 'output', data: 'before' }, + { + type: 'run_complete', + marker: prepared.marker, + inputRunSeq: 12, + seq: 100, + }, + { type: 'output', data: 'after' }, + ]); + } finally { + vi.useRealTimers(); + } + }); + + it('fails ingestion loudly when a timed-out completion later cannot append run_complete', async () => { + vi.useFakeTimers(); + try { + let failRunComplete = false; + const { appender } = createFakeAppender({ + failRunComplete: () => failRunComplete, + }); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 13, + }); + const waitPromise = completion.wait(5); + + await vi.advanceTimersByTimeAsync(5); + await expect(waitPromise).resolves.toEqual({ kind: 'timeout' }); + + failRunComplete = true; + await expect( + coordinator.ingestPtyData(completion.sentinel), + ).rejects.toThrow('run_complete append failed'); + } finally { + vi.useRealTimers(); + } + }); + + it('resolves pending waiters as exited without appending run_complete', async () => { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 14, + }); + const waitPromise = completion.wait(1_000); + + await coordinator.flushPtyDataOnExit(); + coordinator.resolvePendingWaitersForExit(); + + await expect(waitPromise).resolves.toEqual({ kind: 'exited' }); + expect(events).toEqual([]); + }); + + it('keeps exit resolution stable when the original timeout would have fired later', async () => { + vi.useFakeTimers(); + try { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 15, + }); + const waitPromise = completion.wait(100); + + coordinator.resolvePendingWaitersForExit(); + + await expect(waitPromise).resolves.toEqual({ kind: 'exited' }); + await vi.advanceTimersByTimeAsync(100); + expect(events).toEqual([]); + } finally { + vi.useRealTimers(); + } + }); + + it('rejects the active waiter when run_complete append fails', async () => { + const { appender } = createFakeAppender({ + failRunComplete: () => true, + }); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 16, + }); + const waitPromise = completion.wait(1_000); + + await expect( + coordinator.ingestPtyData(completion.sentinel), + ).rejects.toThrow('run_complete append failed'); + await expect(waitPromise).rejects.toThrow('run_complete append failed'); + }); + + it('rejects the active waiter when trailing postamble echo output append fails', async () => { + let failOutput = false; + const { appender } = createFakeAppender({ + failOutput: () => failOutput, + }); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 17, + }); + const waitPromise = completion.wait(1_000); + + await coordinator.ingestPtyData(completion.postamble.slice(0, 8)); + + failOutput = true; + const waitExpectation = expect(waitPromise).rejects.toThrow( + 'output append failed', + ); + await expect( + coordinator.ingestPtyData(completion.sentinel), + ).rejects.toThrow('output append failed'); + await waitExpectation; + }); + + it('does not reject the waiter when ordinary output append fails before completion', async () => { + let failOutput = true; + const { appender } = createFakeAppender({ + failOutput: () => failOutput, + }); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 18, + }); + const waitPromise = completion.wait(1_000); + + await expect(coordinator.ingestPtyData('visible')).rejects.toThrow( + 'output append failed', + ); + + failOutput = false; + await coordinator.ingestPtyData(completion.sentinel); + await expect(waitPromise).resolves.toEqual({ kind: 'completed', seq: 100 }); + }); + + it('flushes pending non-completed sentinel bytes as output on exit', async () => { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 19, + }); + const partialSentinel = completion.sentinel.slice(0, 4); + + await coordinator.ingestPtyData(partialSentinel); + await coordinator.flushPtyDataOnExit(); + + expect(events).toEqual([{ type: 'output', data: partialSentinel }]); + }); +}); From 389a9f68f031daf0b714e651b18c679d726cea22 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 29 Apr 2026 14:00:15 +0000 Subject: [PATCH 2/3] review: address run completion feedback --- src/host/hostMain.ts | 2 +- src/host/runCompletionCoordinator.ts | 51 ++++++++++++------- src/host/runCompletionSentinel.ts | 4 +- .../host/runCompletionCoordinator.test.ts | 31 ++++++++++- 4 files changed, 66 insertions(+), 22 deletions(-) diff --git a/src/host/hostMain.ts b/src/host/hostMain.ts index 9f2da19..e9b7c9f 100644 --- a/src/host/hostMain.ts +++ b/src/host/hostMain.ts @@ -451,7 +451,7 @@ export async function runHost(sessionId: string): Promise { try { await eventLog.append('exit', { exitCode, exitSignal }); } finally { - runCompletion.resolvePendingWaitersForExit(); + runCompletion.resetForExit(); try { await writeManifest(mPath, state.snapshot()); } finally { diff --git a/src/host/runCompletionCoordinator.ts b/src/host/runCompletionCoordinator.ts index 1750812..05bfcd4 100644 --- a/src/host/runCompletionCoordinator.ts +++ b/src/host/runCompletionCoordinator.ts @@ -1,6 +1,7 @@ import crypto from 'node:crypto'; import { + assertRunMarker, buildRunCompleteSignalSentinel, RUN_MARKER_PATTERN, RunCompletionPostambleEchoSanitizer, @@ -23,38 +24,35 @@ interface RunCompletionWaiter { resolve: (result: RunCompletionWaitResult) => void; } +/** Wait result delivered by the underlying run-completion observer. */ type RunCompletionWaitResult = | { kind: 'completed'; seq: number } | { kind: 'exited' }; -export type RunCompletionWaitOutcome = +/** Public waited-run wait result after applying the caller timeout. */ +export type TimedRunCompletionWaitResult = | RunCompletionWaitResult | { kind: 'timeout' }; +/** Appends replayable run-completion events in the host's serialized PTY ingestion queue. */ export interface RunCompletionEventAppender { appendOutput(data: string): Promise; appendRunComplete(payload: { marker: string; - inputRunSeq?: number; + inputRunSeq: number; }): Promise; } +/** Marker prepared before `input_run` is appended for a waited run. */ export interface PreparedWaitedRun { marker: string; } +/** Registered completion state returned after `input_run` appends successfully. */ export interface RegisteredWaitedRunCompletion { postamble: string; sentinel: string; - wait(timeoutMs: number): Promise; -} - -function assertRunMarker(marker: string): void { - invariant(typeof marker === 'string', 'run marker must be a string'); - invariant( - RUN_MARKER_PATTERN.test(marker), - 'run marker must match expected format', - ); + wait(timeoutMs: number): Promise; } function shellOctalEscapedBytes(value: string): string { @@ -79,7 +77,10 @@ function generateRunCompleteSignalSentinel(): string { function buildRunCompletePostamble(marker: string, sentinel: string): string { const markerMatch = RUN_MARKER_PATTERN.exec(marker); - invariant(markerMatch !== null, 'run marker must match expected format'); + invariant( + markerMatch !== null, + 'run marker must match expected format (__AT_MARKER_<32hex>__)', + ); invariant( typeof sentinel === 'string' && sentinel.length > 0, 'sentinel must be non-empty', @@ -116,6 +117,14 @@ function buildRunCompletePostamble(marker: string, sentinel: string): string { return `${postamble}\n`; } +/** + * Coordinates waited-run completion markers, hidden sentinels, waiters, and + * `run_complete` appends for a single host. + * + * `ingestPtyData()` and `flushPtyDataOnExit()` must be called serially by the + * host's PTY ingestion queue. Concurrent ingestion would break event-log order + * and can race scanner/sanitizer state. + */ export class RunCompletionCoordinator { readonly #appender: RunCompletionEventAppender; #sentinelScanner = new RunCompletionSentinelScanner(); @@ -136,16 +145,21 @@ export class RunCompletionCoordinator { this.#appender = appender; } + /** Creates the marker that will be recorded on the pending `input_run`. */ public prepareWaitedRun(): PreparedWaitedRun { const marker = `__AT_MARKER_${crypto.randomUUID().replace(/-/g, '')}__`; invariant( RUN_MARKER_PATTERN.test(marker), - 'generated run marker must match expected format', + 'generated run marker must match expected format (__AT_MARKER_<32hex>__)', ); return { marker }; } + /** + * Registers the marker after `input_run` appends and returns the shell + * postamble to write. The returned `wait()` function is single-use. + */ public registerWaitedRun(params: { inputRunSeq: number; marker: string; @@ -176,7 +190,7 @@ export class RunCompletionCoordinator { return { postamble, sentinel, - wait: (timeoutMs: number): Promise => { + wait: (timeoutMs: number): Promise => { invariant( !waitStarted, 'run completion wait must only be started once', @@ -187,18 +201,21 @@ export class RunCompletionCoordinator { }; } + /** Ingests one PTY output chunk; callers must serialize calls. */ public async ingestPtyData(data: string): Promise { invariant(typeof data === 'string', 'PTY data must be a string'); await this.#appendSentinelPieces(this.#sentinelScanner.feed(data)); } + /** Flushes scanner/sanitizer tails on PTY exit; callers must serialize calls. */ public async flushPtyDataOnExit(): Promise { await this.#appendSentinelPieces(this.#sentinelScanner.flush()); await this.#appendFlushedPostambleEchoOutput(); } - public resolvePendingWaitersForExit(): void { + /** Resolves pending waiters as exited and clears no-longer-observable state. */ + public resetForExit(): void { const waiters = [...this.#runCompletionWaiters.values()]; this.#runCompletionWaiters.clear(); this.#activeRunCompletions.clear(); @@ -238,7 +255,7 @@ export class RunCompletionCoordinator { marker: string, completionPromise: Promise, timeoutMs: number, - ): Promise { + ): Promise { assertRunMarker(marker); invariant( Number.isInteger(timeoutMs) && timeoutMs > 0, @@ -246,7 +263,7 @@ export class RunCompletionCoordinator { ); const { promise, reject, resolve } = - Promise.withResolvers(); + Promise.withResolvers(); let resolved = false; const timeoutHandle = setTimeout(() => { if (resolved) { diff --git a/src/host/runCompletionSentinel.ts b/src/host/runCompletionSentinel.ts index d7d8fbd..fad1451 100644 --- a/src/host/runCompletionSentinel.ts +++ b/src/host/runCompletionSentinel.ts @@ -52,11 +52,11 @@ interface ActivePostambleEcho { echoes: readonly string[]; } -function assertRunMarker(marker: string): void { +export function assertRunMarker(marker: string): void { invariant(typeof marker === 'string', 'marker must be a string'); invariant( RUN_MARKER_PATTERN.test(marker), - 'run marker must match expected format', + 'run marker must match expected format (__AT_MARKER_<32hex>__)', ); } diff --git a/test/unit/host/runCompletionCoordinator.test.ts b/test/unit/host/runCompletionCoordinator.test.ts index 93c8770..b304df3 100644 --- a/test/unit/host/runCompletionCoordinator.test.ts +++ b/test/unit/host/runCompletionCoordinator.test.ts @@ -78,6 +78,33 @@ describe('RunCompletionCoordinator', () => { ]); }); + it('strips echoed postamble bytes before appending visible output', async () => { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 8, + }); + const waitPromise = completion.wait(1_000); + const echoedPostamble = completion.postamble.replace(/\n$/u, '\r\n'); + + await coordinator.ingestPtyData( + `prompt${echoedPostamble}visible${completion.sentinel}`, + ); + + await expect(waitPromise).resolves.toEqual({ kind: 'completed', seq: 100 }); + expect(events).toEqual([ + { type: 'output', data: 'promptvisible' }, + { + type: 'run_complete', + marker: prepared.marker, + inputRunSeq: 8, + seq: 100, + }, + ]); + }); + it('completes multiple active waited runs out of order', async () => { const { appender, events } = createFakeAppender(); const coordinator = new RunCompletionCoordinator(appender); @@ -185,7 +212,7 @@ describe('RunCompletionCoordinator', () => { const waitPromise = completion.wait(1_000); await coordinator.flushPtyDataOnExit(); - coordinator.resolvePendingWaitersForExit(); + coordinator.resetForExit(); await expect(waitPromise).resolves.toEqual({ kind: 'exited' }); expect(events).toEqual([]); @@ -203,7 +230,7 @@ describe('RunCompletionCoordinator', () => { }); const waitPromise = completion.wait(100); - coordinator.resolvePendingWaitersForExit(); + coordinator.resetForExit(); await expect(waitPromise).resolves.toEqual({ kind: 'exited' }); await vi.advanceTimersByTimeAsync(100); From 38b64a4a650591626af33d670afdd8f25d7a0a56 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 29 Apr 2026 14:39:51 +0000 Subject: [PATCH 3/3] test: cover postamble flush on exit --- src/host/runCompletionCoordinator.ts | 4 ++-- .../unit/host/runCompletionCoordinator.test.ts | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/host/runCompletionCoordinator.ts b/src/host/runCompletionCoordinator.ts index 05bfcd4..fb2f30e 100644 --- a/src/host/runCompletionCoordinator.ts +++ b/src/host/runCompletionCoordinator.ts @@ -139,7 +139,7 @@ export class RunCompletionCoordinator { ); invariant( typeof appender.appendRunComplete === 'function', - 'run-completion completion appender must be a function', + 'run_complete event appender must be a function', ); this.#appender = appender; @@ -259,7 +259,7 @@ export class RunCompletionCoordinator { assertRunMarker(marker); invariant( Number.isInteger(timeoutMs) && timeoutMs > 0, - 'timeoutMs must be positive', + 'timeoutMs must be a positive integer', ); const { promise, reject, resolve } = diff --git a/test/unit/host/runCompletionCoordinator.test.ts b/test/unit/host/runCompletionCoordinator.test.ts index b304df3..2b029c8 100644 --- a/test/unit/host/runCompletionCoordinator.test.ts +++ b/test/unit/host/runCompletionCoordinator.test.ts @@ -305,6 +305,24 @@ describe('RunCompletionCoordinator', () => { await expect(waitPromise).resolves.toEqual({ kind: 'completed', seq: 100 }); }); + it('flushes pending partial postamble echo bytes as output on exit', async () => { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 19, + }); + const partialPostamble = completion.postamble.slice(0, 8); + + await coordinator.ingestPtyData(partialPostamble); + expect(events).toEqual([]); + + await coordinator.flushPtyDataOnExit(); + + expect(events).toEqual([{ type: 'output', data: partialPostamble }]); + }); + it('flushes pending non-completed sentinel bytes as output on exit', async () => { const { appender, events } = createFakeAppender(); const coordinator = new RunCompletionCoordinator(appender);