diff --git a/CONTEXT.md b/CONTEXT.md index 207ca20..576678c 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -23,6 +23,14 @@ _Avoid_: Finished session **Commandable Session**: A **Session** that can still accept user input and control commands. +**Waited Run**: +A run request where the caller asks `agent-tty` to wait until the command's completion signal is observed. +_Avoid_: Blocking run + +**Run Completion**: +The host-observed end point for a **Waited Run**, distinct from **Session** exit and caller timeout. +_Avoid_: Command finish, session completion + **Live Host Eligible Session**: A **Session** where callers should ask the live session host for fresh state. @@ -61,6 +69,10 @@ _Avoid_: Renderer capture - An **Offline Replay Eligible Session** is reconstructed from its persisted **Event Log** and manifest. - A **Snapshot Result** is derived from exactly one **Semantic Snapshot**. - A **Snapshot Artifact** contains exactly the **Snapshot Result** emitted to the caller. +- A **Commandable Session** can accept a **Waited Run**. +- A **Waited Run** may produce one **Run Completion**, time out for its caller, or be interrupted by **Session** exit. +- Caller timeout does not cancel the underlying **Run Completion**; it may still be observed later to keep internal completion bytes out of artifacts. +- After **Session** exit, an unobserved **Run Completion** can no longer arrive. ## Example dialogue @@ -70,6 +82,9 @@ _Avoid_: Renderer capture > **Dev:** "Does **Snapshot Capture** ask the renderer for terminal state?" > **Domain expert:** "No — the renderer first produces a **Semantic Snapshot**; **Snapshot Capture** derives and records the public result from that snapshot." +> **Dev:** "If a **Waited Run** times out, did the command finish?" +> **Domain expert:** "No. The caller stopped waiting, but the **Run Completion** may still arrive later and must still be recognized." + ## Flagged ambiguities - "Active" and "offline replay eligible" are independent classifications: `destroying` is both **Active** and **Offline Replay Eligible**. diff --git a/src/host/hostMain.ts b/src/host/hostMain.ts index 32e9575..e9b7c9f 100644 --- a/src/host/hostMain.ts +++ b/src/host/hostMain.ts @@ -1,4 +1,3 @@ -import crypto from 'node:crypto'; import { mkdir, rename, rm } from 'node:fs/promises'; import { dirname } from 'node:path'; import process from 'node:process'; @@ -9,13 +8,7 @@ import { EventLog } from './eventLog.js'; import { buildReplayInput } from './replay.js'; import { HostRendererManager } from './renderer.js'; import { RpcServer, type MethodHandler } from './rpcServer.js'; -import { - buildRunCompleteSignalSentinel, - RUN_MARKER_PATTERN, - RunCompletionPostambleEchoSanitizer, - RunCompletionSentinelScanner, - type SentinelPiece, -} from './runCompletionSentinel.js'; +import { RunCompletionCoordinator } from './runCompletionCoordinator.js'; import { SessionState } from './sessionState.js'; import { createPty } from '../pty/createPty.js'; import { encodeKey } from '../pty/keyEncoder.js'; @@ -87,29 +80,6 @@ type WaitOutcome = { timedOut: boolean; }; -interface ActiveRunCompletion { - inputRunSeq?: number; - sentinel: string; -} - -type RunCompletionWaitResult = - | { kind: 'completed'; seq: number } - | { kind: 'exited' }; - -interface RunCompletionWaiter { - reject: (error: unknown) => void; - resolve: (result: RunCompletionWaitResult) => void; -} - -type TimedRunCompletionWaitResult = - | RunCompletionWaitResult - | { kind: 'timeout' }; - -const RUN_COMPLETION_POSTAMBLE_ECHO_PREFIX = String.raw`printf '\033\137`; - -const RUN_COMPLETION_SIGNAL_TOKEN_BYTES = 4; -const MAX_RUN_COMPLETION_POSTAMBLE_ECHO_LENGTH = 64; - function normalizeExitSignal(signal: number | null): string | null { invariant( signal === null || (Number.isInteger(signal) && signal >= 0), @@ -139,65 +109,6 @@ function rethrowAsync(error: unknown): void { }); } -function shellOctalEscapedBytes(value: string): string { - invariant(typeof value === 'string', 'value must be a string'); - - return [...Buffer.from(value, 'utf8')] - .map((byte) => `\\${byte.toString(8).padStart(3, '0')}`) - .join(''); -} - -function generateRunCompleteSignalSentinel(): string { - const token = crypto - .randomBytes(RUN_COMPLETION_SIGNAL_TOKEN_BYTES) - .toString('base64url'); - invariant( - token.length === 6, - 'run-completion signal token must encode to six base64url characters', - ); - - return buildRunCompleteSignalSentinel(token); -} - -function buildRunCompletePostamble(marker: string, sentinel: string): string { - const markerMatch = RUN_MARKER_PATTERN.exec(marker); - invariant(markerMatch !== null, 'run marker must match expected format'); - invariant( - typeof sentinel === 'string' && sentinel.length > 0, - 'sentinel must be non-empty', - ); - - const markerPayload = markerMatch[1]; - invariant( - markerPayload !== undefined && markerPayload.length === 32, - 'run marker payload must be 32 lowercase hex characters', - ); - - const postamble = `printf '${shellOctalEscapedBytes(sentinel)}'`; - invariant( - postamble.length <= MAX_RUN_COMPLETION_POSTAMBLE_ECHO_LENGTH, - 'run-completion postamble echo must stay short enough to avoid terminal wrapping', - ); - invariant( - postamble.startsWith(RUN_COMPLETION_POSTAMBLE_ECHO_PREFIX), - 'run-completion postamble echo prefix must stay in sync with sanitizer', - ); - invariant( - !postamble.includes('agent-tty:run-complete:'), - 'run-completion postamble must not echo the complete sentinel label', - ); - invariant( - !postamble.includes('__AT_MARKER_'), - 'run-completion postamble must not echo the complete marker prefix', - ); - invariant( - !postamble.includes(markerPayload), - 'run-completion postamble must not echo the complete marker payload', - ); - - return `${postamble}\n`; -} - function resolveHostRendererName(input: string | undefined): RendererName { try { return resolveRendererName( @@ -376,10 +287,12 @@ export async function runHost(sessionId: string): Promise { invariant(false, 'PTY exit resolver must be initialized'); }; - const sentinelScanner = new RunCompletionSentinelScanner(); - const postambleEchoSanitizer = new RunCompletionPostambleEchoSanitizer(); - const activeRunCompletions = new Map(); - const runCompletionWaiters = new Map(); + const runCompletion = new RunCompletionCoordinator({ + appendOutput: async (data) => { + await eventLog.append('output', { data }); + }, + appendRunComplete: (payload) => eventLog.append('run_complete', payload), + }); let ptyIngestionQueue: Promise = Promise.resolve(); const ptyExitPromise = new Promise((resolve) => { @@ -408,96 +321,6 @@ export async function runHost(sessionId: string): Promise { ); state.setChildPid(pty.pid); - const resolveRunCompletionWaiter = (marker: string, seq: number): void => { - const waiter = runCompletionWaiters.get(marker); - if (waiter === undefined) { - return; - } - - runCompletionWaiters.delete(marker); - waiter.resolve({ kind: 'completed', seq }); - }; - - const rejectRunCompletionWaiter = (marker: string, error: unknown): void => { - const waiter = runCompletionWaiters.get(marker); - if (waiter === undefined) { - return; - } - - runCompletionWaiters.delete(marker); - waiter.reject(error); - }; - - const resolveRunCompletionWaitersForExit = (): void => { - for (const [marker, waiter] of runCompletionWaiters) { - runCompletionWaiters.delete(marker); - waiter.resolve({ kind: 'exited' }); - } - }; - - const subscribeRunCompletion = ( - marker: string, - ): Promise => { - invariant( - !runCompletionWaiters.has(marker), - 'run completion waiter must be unique per marker', - ); - - const { promise, reject, resolve } = - Promise.withResolvers(); - runCompletionWaiters.set(marker, { reject, resolve }); - return promise; - }; - - const waitForRunCompletion = async ( - marker: string, - completionPromise: Promise, - timeoutMs: number, - ): Promise => { - invariant( - Number.isInteger(timeoutMs) && timeoutMs > 0, - 'timeoutMs must be positive', - ); - - const { promise, reject, resolve } = - Promise.withResolvers(); - let resolved = false; - const timeoutHandle = setTimeout(() => { - if (resolved) { - return; - } - - resolved = true; - // Keep sentinel/postamble registrations active after timeout so the - // eventual internal completion bytes are still hidden from artifacts. - runCompletionWaiters.delete(marker); - resolve({ kind: 'timeout' }); - }, timeoutMs); - - void completionPromise.then( - (result) => { - if (resolved) { - return; - } - - resolved = true; - clearTimeout(timeoutHandle); - resolve(result); - }, - (error: unknown) => { - if (resolved) { - return; - } - - resolved = true; - clearTimeout(timeoutHandle); - reject(error instanceof Error ? error : new Error(String(error))); - }, - ); - - return await promise; - }; - const replayRendererThroughSeq = async (targetSeq: number): Promise => { invariant( Number.isInteger(targetSeq) && targetSeq >= 0, @@ -521,68 +344,6 @@ export async function runHost(sessionId: string): Promise { ); }; - const appendOutput = async (data: string): Promise => { - invariant(typeof data === 'string', 'output data must be a string'); - - const outputData = postambleEchoSanitizer.feed(data); - if (outputData.length > 0) { - await eventLog.append('output', { data: outputData }); - } - }; - - const appendFlushedPostambleEchoOutput = async (): Promise => { - const outputData = postambleEchoSanitizer.flush(); - if (outputData.length > 0) { - await eventLog.append('output', { data: outputData }); - } - }; - - const appendSentinelPieces = async ( - pieces: SentinelPiece[], - ): Promise => { - for (const piece of pieces) { - if (piece.type === 'output') { - await appendOutput(piece.data); - continue; - } - - const activeCompletion = activeRunCompletions.get(piece.marker); - invariant( - activeCompletion !== undefined, - 'run-completion sentinel must correspond to an active run marker', - ); - invariant( - activeCompletion.sentinel.length > 0, - 'active run-completion sentinel must be non-empty', - ); - - try { - const trailingEchoOutput = postambleEchoSanitizer.deregister( - piece.marker, - ); - if (trailingEchoOutput.length > 0) { - await eventLog.append('output', { data: trailingEchoOutput }); - } - - const seq = await eventLog.append('run_complete', { - marker: piece.marker, - ...(activeCompletion.inputRunSeq === undefined - ? {} - : { inputRunSeq: activeCompletion.inputRunSeq }), - }); - const deleted = activeRunCompletions.delete(piece.marker); - invariant( - deleted, - 'active run completion must be deleted after append succeeds', - ); - resolveRunCompletionWaiter(piece.marker, seq); - } catch (error) { - rejectRunCompletionWaiter(piece.marker, error); - throw error; - } - } - }; - const enqueuePtyIngestion = ( operation: () => Promise, ): Promise => { @@ -690,7 +451,7 @@ export async function runHost(sessionId: string): Promise { try { await eventLog.append('exit', { exitCode, exitSignal }); } finally { - resolveRunCompletionWaitersForExit(); + runCompletion.resetForExit(); try { await writeManifest(mPath, state.snapshot()); } finally { @@ -923,45 +684,23 @@ export async function runHost(sessionId: string): Promise { } satisfies RunResult; } - const marker = `__AT_MARKER_${crypto.randomUUID().replace(/-/g, '')}__`; - invariant( - RUN_MARKER_PATTERN.test(marker), - 'generated run marker must match expected format', - ); - let sentinel = generateRunCompleteSignalSentinel(); - while ( - [...activeRunCompletions.values()].some( - (completion) => completion.sentinel === sentinel, - ) - ) { - sentinel = generateRunCompleteSignalSentinel(); - } - const postamble = buildRunCompletePostamble(marker, sentinel); + const preparedRun = runCompletion.prepareWaitedRun(); const seq = await eventLog.append('input_run', { command, - marker, + marker: preparedRun.marker, noWait, }); - - invariant( - !activeRunCompletions.has(marker), - 'generated run marker must be unique among active completions', - ); - activeRunCompletions.set(marker, { inputRunSeq: seq, sentinel }); - sentinelScanner.register(marker, sentinel); - postambleEchoSanitizer.register(marker, postamble); - const completionPromise = subscribeRunCompletion(marker); - const injectedText = `${command}\n${postamble}`; + const completion = runCompletion.registerWaitedRun({ + marker: preparedRun.marker, + inputRunSeq: seq, + }); + const injectedText = `${command}\n${completion.postamble}`; const effectiveTimeoutMs = timeoutMs ?? 30_000; const startTime = Date.now(); pty.write(injectedText); lastActivityAt = Date.now(); - const waitResult = await waitForRunCompletion( - marker, - completionPromise, - effectiveTimeoutMs, - ); + const waitResult = await completion.wait(effectiveTimeoutMs); const durationMs = Date.now() - startTime; if (waitResult.kind === 'completed') { @@ -980,7 +719,7 @@ export async function runHost(sessionId: string): Promise { timedOut: waitResult.kind === 'timeout', seq, durationMs, - marker, + marker: preparedRun.marker, } satisfies RunResult; }, sendKeys: async (params: unknown) => { @@ -1417,7 +1156,7 @@ export async function runHost(sessionId: string): Promise { // is "in use" and should not be killed for inactivity. lastActivityAt = lastOutputAt; void enqueuePtyIngestion(async () => { - await appendSentinelPieces(sentinelScanner.feed(data)); + await runCompletion.ingestPtyData(data); }).catch((error: unknown) => { // Run-completion sentinels make serialized PTY ingestion part of the // canonical event-log contract: if appending output/control events fails, @@ -1430,8 +1169,7 @@ export async function runHost(sessionId: string): Promise { let ingestionError: unknown; void enqueuePtyIngestion(async () => { - await appendSentinelPieces(sentinelScanner.flush()); - await appendFlushedPostambleEchoOutput(); + await runCompletion.flushPtyDataOnExit(); }) .catch((error: unknown) => { ingestionError = error; diff --git a/src/host/runCompletionCoordinator.ts b/src/host/runCompletionCoordinator.ts new file mode 100644 index 0000000..fb2f30e --- /dev/null +++ b/src/host/runCompletionCoordinator.ts @@ -0,0 +1,393 @@ +import crypto from 'node:crypto'; + +import { + assertRunMarker, + buildRunCompleteSignalSentinel, + RUN_MARKER_PATTERN, + RunCompletionPostambleEchoSanitizer, + RunCompletionSentinelScanner, + type SentinelPiece, +} from './runCompletionSentinel.js'; +import { invariant } from '../util/assert.js'; + +const RUN_COMPLETION_POSTAMBLE_ECHO_PREFIX = String.raw`printf '\033\137`; +const RUN_COMPLETION_SIGNAL_TOKEN_BYTES = 4; +const MAX_RUN_COMPLETION_POSTAMBLE_ECHO_LENGTH = 64; + +interface ActiveRunCompletion { + inputRunSeq: number; + sentinel: string; +} + +interface RunCompletionWaiter { + reject: (error: unknown) => void; + resolve: (result: RunCompletionWaitResult) => void; +} + +/** Wait result delivered by the underlying run-completion observer. */ +type RunCompletionWaitResult = + | { kind: 'completed'; seq: number } + | { kind: 'exited' }; + +/** Public waited-run wait result after applying the caller timeout. */ +export type TimedRunCompletionWaitResult = + | RunCompletionWaitResult + | { kind: 'timeout' }; + +/** Appends replayable run-completion events in the host's serialized PTY ingestion queue. */ +export interface RunCompletionEventAppender { + appendOutput(data: string): Promise; + appendRunComplete(payload: { + marker: string; + inputRunSeq: number; + }): Promise; +} + +/** Marker prepared before `input_run` is appended for a waited run. */ +export interface PreparedWaitedRun { + marker: string; +} + +/** Registered completion state returned after `input_run` appends successfully. */ +export interface RegisteredWaitedRunCompletion { + postamble: string; + sentinel: string; + wait(timeoutMs: number): Promise; +} + +function shellOctalEscapedBytes(value: string): string { + invariant(typeof value === 'string', 'value must be a string'); + + return [...Buffer.from(value, 'utf8')] + .map((byte) => `\\${byte.toString(8).padStart(3, '0')}`) + .join(''); +} + +function generateRunCompleteSignalSentinel(): string { + const token = crypto + .randomBytes(RUN_COMPLETION_SIGNAL_TOKEN_BYTES) + .toString('base64url'); + invariant( + token.length === 6, + 'run-completion signal token must encode to six base64url characters', + ); + + return buildRunCompleteSignalSentinel(token); +} + +function buildRunCompletePostamble(marker: string, sentinel: string): string { + const markerMatch = RUN_MARKER_PATTERN.exec(marker); + invariant( + markerMatch !== null, + 'run marker must match expected format (__AT_MARKER_<32hex>__)', + ); + invariant( + typeof sentinel === 'string' && sentinel.length > 0, + 'sentinel must be non-empty', + ); + + const markerPayload = markerMatch[1]; + invariant( + markerPayload !== undefined && markerPayload.length === 32, + 'run marker payload must be 32 lowercase hex characters', + ); + + const postamble = `printf '${shellOctalEscapedBytes(sentinel)}'`; + invariant( + postamble.length <= MAX_RUN_COMPLETION_POSTAMBLE_ECHO_LENGTH, + 'run-completion postamble echo must stay short enough to avoid terminal wrapping', + ); + invariant( + postamble.startsWith(RUN_COMPLETION_POSTAMBLE_ECHO_PREFIX), + 'run-completion postamble echo prefix must stay in sync with sanitizer', + ); + invariant( + !postamble.includes('agent-tty:run-complete:'), + 'run-completion postamble must not echo the complete sentinel label', + ); + invariant( + !postamble.includes('__AT_MARKER_'), + 'run-completion postamble must not echo the complete marker prefix', + ); + invariant( + !postamble.includes(markerPayload), + 'run-completion postamble must not echo the complete marker payload', + ); + + return `${postamble}\n`; +} + +/** + * Coordinates waited-run completion markers, hidden sentinels, waiters, and + * `run_complete` appends for a single host. + * + * `ingestPtyData()` and `flushPtyDataOnExit()` must be called serially by the + * host's PTY ingestion queue. Concurrent ingestion would break event-log order + * and can race scanner/sanitizer state. + */ +export class RunCompletionCoordinator { + readonly #appender: RunCompletionEventAppender; + #sentinelScanner = new RunCompletionSentinelScanner(); + #postambleEchoSanitizer = new RunCompletionPostambleEchoSanitizer(); + readonly #activeRunCompletions = new Map(); + readonly #runCompletionWaiters = new Map(); + + public constructor(appender: RunCompletionEventAppender) { + invariant( + typeof appender.appendOutput === 'function', + 'run-completion output appender must be a function', + ); + invariant( + typeof appender.appendRunComplete === 'function', + 'run_complete event appender must be a function', + ); + + this.#appender = appender; + } + + /** Creates the marker that will be recorded on the pending `input_run`. */ + public prepareWaitedRun(): PreparedWaitedRun { + const marker = `__AT_MARKER_${crypto.randomUUID().replace(/-/g, '')}__`; + invariant( + RUN_MARKER_PATTERN.test(marker), + 'generated run marker must match expected format (__AT_MARKER_<32hex>__)', + ); + + return { marker }; + } + + /** + * Registers the marker after `input_run` appends and returns the shell + * postamble to write. The returned `wait()` function is single-use. + */ + public registerWaitedRun(params: { + inputRunSeq: number; + marker: string; + }): RegisteredWaitedRunCompletion { + const { inputRunSeq, marker } = params; + assertRunMarker(marker); + invariant( + Number.isInteger(inputRunSeq) && inputRunSeq >= 0, + 'inputRunSeq must be a non-negative integer', + ); + invariant( + !this.#activeRunCompletions.has(marker), + 'generated run marker must be unique among active completions', + ); + + let sentinel = generateRunCompleteSignalSentinel(); + while (this.#hasActiveSentinel(sentinel)) { + sentinel = generateRunCompleteSignalSentinel(); + } + + const postamble = buildRunCompletePostamble(marker, sentinel); + this.#activeRunCompletions.set(marker, { inputRunSeq, sentinel }); + this.#sentinelScanner.register(marker, sentinel); + this.#postambleEchoSanitizer.register(marker, postamble); + const completionPromise = this.#subscribeRunCompletion(marker); + let waitStarted = false; + + return { + postamble, + sentinel, + wait: (timeoutMs: number): Promise => { + invariant( + !waitStarted, + 'run completion wait must only be started once', + ); + waitStarted = true; + return this.#waitForRunCompletion(marker, completionPromise, timeoutMs); + }, + }; + } + + /** Ingests one PTY output chunk; callers must serialize calls. */ + public async ingestPtyData(data: string): Promise { + invariant(typeof data === 'string', 'PTY data must be a string'); + + await this.#appendSentinelPieces(this.#sentinelScanner.feed(data)); + } + + /** Flushes scanner/sanitizer tails on PTY exit; callers must serialize calls. */ + public async flushPtyDataOnExit(): Promise { + await this.#appendSentinelPieces(this.#sentinelScanner.flush()); + await this.#appendFlushedPostambleEchoOutput(); + } + + /** Resolves pending waiters as exited and clears no-longer-observable state. */ + public resetForExit(): void { + const waiters = [...this.#runCompletionWaiters.values()]; + this.#runCompletionWaiters.clear(); + this.#activeRunCompletions.clear(); + this.#sentinelScanner = new RunCompletionSentinelScanner(); + this.#postambleEchoSanitizer = new RunCompletionPostambleEchoSanitizer(); + + for (const waiter of waiters) { + waiter.resolve({ kind: 'exited' }); + } + } + + #hasActiveSentinel(sentinel: string): boolean { + invariant( + typeof sentinel === 'string' && sentinel.length > 0, + 'sentinel must be non-empty', + ); + + return [...this.#activeRunCompletions.values()].some( + (completion) => completion.sentinel === sentinel, + ); + } + + #subscribeRunCompletion(marker: string): Promise { + assertRunMarker(marker); + invariant( + !this.#runCompletionWaiters.has(marker), + 'run completion waiter must be unique per marker', + ); + + const { promise, reject, resolve } = + Promise.withResolvers(); + this.#runCompletionWaiters.set(marker, { reject, resolve }); + return promise; + } + + async #waitForRunCompletion( + marker: string, + completionPromise: Promise, + timeoutMs: number, + ): Promise { + assertRunMarker(marker); + invariant( + Number.isInteger(timeoutMs) && timeoutMs > 0, + 'timeoutMs must be a positive integer', + ); + + const { promise, reject, resolve } = + Promise.withResolvers(); + let resolved = false; + const timeoutHandle = setTimeout(() => { + if (resolved) { + return; + } + + resolved = true; + // Keep sentinel/postamble registrations active after timeout so the + // eventual internal completion bytes are still hidden from artifacts. + this.#runCompletionWaiters.delete(marker); + resolve({ kind: 'timeout' }); + }, timeoutMs); + + void completionPromise.then( + (result) => { + if (resolved) { + return; + } + + resolved = true; + clearTimeout(timeoutHandle); + resolve(result); + }, + (error: unknown) => { + if (resolved) { + return; + } + + resolved = true; + clearTimeout(timeoutHandle); + reject(error instanceof Error ? error : new Error(String(error))); + }, + ); + + return await promise; + } + + async #appendOutput(data: string): Promise { + invariant(typeof data === 'string', 'output data must be a string'); + + const outputData = this.#postambleEchoSanitizer.feed(data); + if (outputData.length > 0) { + await this.#appender.appendOutput(outputData); + } + } + + async #appendFlushedPostambleEchoOutput(): Promise { + const outputData = this.#postambleEchoSanitizer.flush(); + if (outputData.length > 0) { + await this.#appender.appendOutput(outputData); + } + } + + async #appendSentinelPieces(pieces: SentinelPiece[]): Promise { + for (const piece of pieces) { + if (piece.type === 'output') { + await this.#appendOutput(piece.data); + continue; + } + + const activeCompletion = this.#activeRunCompletions.get(piece.marker); + invariant( + activeCompletion !== undefined, + 'run-completion sentinel must correspond to an active run marker', + ); + invariant( + activeCompletion.sentinel.length > 0, + 'active run-completion sentinel must be non-empty', + ); + + try { + const trailingEchoOutput = this.#postambleEchoSanitizer.deregister( + piece.marker, + ); + if (trailingEchoOutput.length > 0) { + await this.#appender.appendOutput(trailingEchoOutput); + } + + const seq = await this.#appender.appendRunComplete({ + marker: piece.marker, + inputRunSeq: activeCompletion.inputRunSeq, + }); + invariant( + Number.isInteger(seq) && seq >= 0, + 'run_complete append sequence must be a non-negative integer', + ); + + const deleted = this.#activeRunCompletions.delete(piece.marker); + invariant( + deleted, + 'active run completion must be deleted after append succeeds', + ); + this.#resolveRunCompletionWaiter(piece.marker, seq); + } catch (error) { + this.#rejectRunCompletionWaiter(piece.marker, error); + throw error; + } + } + } + + #resolveRunCompletionWaiter(marker: string, seq: number): void { + assertRunMarker(marker); + invariant( + Number.isInteger(seq) && seq >= 0, + 'run_complete sequence must be a non-negative integer', + ); + + const waiter = this.#runCompletionWaiters.get(marker); + if (waiter === undefined) { + return; + } + + this.#runCompletionWaiters.delete(marker); + waiter.resolve({ kind: 'completed', seq }); + } + + #rejectRunCompletionWaiter(marker: string, error: unknown): void { + assertRunMarker(marker); + const waiter = this.#runCompletionWaiters.get(marker); + if (waiter === undefined) { + return; + } + + this.#runCompletionWaiters.delete(marker); + waiter.reject(error); + } +} diff --git a/src/host/runCompletionSentinel.ts b/src/host/runCompletionSentinel.ts index d7d8fbd..fad1451 100644 --- a/src/host/runCompletionSentinel.ts +++ b/src/host/runCompletionSentinel.ts @@ -52,11 +52,11 @@ interface ActivePostambleEcho { echoes: readonly string[]; } -function assertRunMarker(marker: string): void { +export function assertRunMarker(marker: string): void { invariant(typeof marker === 'string', 'marker must be a string'); invariant( RUN_MARKER_PATTERN.test(marker), - 'run marker must match expected format', + 'run marker must match expected format (__AT_MARKER_<32hex>__)', ); } diff --git a/test/unit/host/runCompletionCoordinator.test.ts b/test/unit/host/runCompletionCoordinator.test.ts new file mode 100644 index 0000000..2b029c8 --- /dev/null +++ b/test/unit/host/runCompletionCoordinator.test.ts @@ -0,0 +1,341 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { RunCompletionCoordinator } from '../../../src/host/runCompletionCoordinator.js'; +import type { RunCompletionEventAppender } from '../../../src/host/runCompletionCoordinator.js'; + +interface OutputEvent { + type: 'output'; + data: string; +} + +interface RunCompleteEvent { + type: 'run_complete'; + marker: string; + inputRunSeq?: number; + seq: number; +} + +type AppendedEvent = OutputEvent | RunCompleteEvent; + +function createFakeAppender( + options: { + failOutput?: () => boolean; + failRunComplete?: () => boolean; + } = {}, +): { + appender: RunCompletionEventAppender; + events: AppendedEvent[]; +} { + const events: AppendedEvent[] = []; + let nextSeq = 100; + + return { + events, + appender: { + appendOutput: (data: string): Promise => { + if (options.failOutput?.() === true) { + return Promise.reject(new Error('output append failed')); + } + events.push({ type: 'output', data }); + return Promise.resolve(); + }, + appendRunComplete: (payload): Promise => { + if (options.failRunComplete?.() === true) { + return Promise.reject(new Error('run_complete append failed')); + } + const seq = nextSeq; + nextSeq += 1; + events.push({ type: 'run_complete', ...payload, seq }); + return Promise.resolve(seq); + }, + }, + }; +} + +describe('RunCompletionCoordinator', () => { + it('appends output around a run_complete event and resolves the waiter', async () => { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 7, + }); + const waitPromise = completion.wait(1_000); + + await coordinator.ingestPtyData(`before${completion.sentinel}after`); + + await expect(waitPromise).resolves.toEqual({ kind: 'completed', seq: 100 }); + expect(events).toEqual([ + { type: 'output', data: 'before' }, + { + type: 'run_complete', + marker: prepared.marker, + inputRunSeq: 7, + seq: 100, + }, + { type: 'output', data: 'after' }, + ]); + }); + + it('strips echoed postamble bytes before appending visible output', async () => { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 8, + }); + const waitPromise = completion.wait(1_000); + const echoedPostamble = completion.postamble.replace(/\n$/u, '\r\n'); + + await coordinator.ingestPtyData( + `prompt${echoedPostamble}visible${completion.sentinel}`, + ); + + await expect(waitPromise).resolves.toEqual({ kind: 'completed', seq: 100 }); + expect(events).toEqual([ + { type: 'output', data: 'promptvisible' }, + { + type: 'run_complete', + marker: prepared.marker, + inputRunSeq: 8, + seq: 100, + }, + ]); + }); + + it('completes multiple active waited runs out of order', async () => { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const firstPrepared = coordinator.prepareWaitedRun(); + const firstCompletion = coordinator.registerWaitedRun({ + marker: firstPrepared.marker, + inputRunSeq: 10, + }); + const secondPrepared = coordinator.prepareWaitedRun(); + const secondCompletion = coordinator.registerWaitedRun({ + marker: secondPrepared.marker, + inputRunSeq: 11, + }); + const firstWait = firstCompletion.wait(1_000); + const secondWait = secondCompletion.wait(1_000); + + await coordinator.ingestPtyData(secondCompletion.sentinel); + await coordinator.ingestPtyData(firstCompletion.sentinel); + + await expect(secondWait).resolves.toEqual({ kind: 'completed', seq: 100 }); + await expect(firstWait).resolves.toEqual({ kind: 'completed', seq: 101 }); + expect(events).toEqual([ + { + type: 'run_complete', + marker: secondPrepared.marker, + inputRunSeq: 11, + seq: 100, + }, + { + type: 'run_complete', + marker: firstPrepared.marker, + inputRunSeq: 10, + seq: 101, + }, + ]); + }); + + it('keeps completion bytes hidden and appends run_complete after a waiter times out', async () => { + vi.useFakeTimers(); + try { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 12, + }); + const waitPromise = completion.wait(5); + + await vi.advanceTimersByTimeAsync(5); + + await expect(waitPromise).resolves.toEqual({ kind: 'timeout' }); + await coordinator.ingestPtyData(`before${completion.sentinel}after`); + + expect(events).toEqual([ + { type: 'output', data: 'before' }, + { + type: 'run_complete', + marker: prepared.marker, + inputRunSeq: 12, + seq: 100, + }, + { type: 'output', data: 'after' }, + ]); + } finally { + vi.useRealTimers(); + } + }); + + it('fails ingestion loudly when a timed-out completion later cannot append run_complete', async () => { + vi.useFakeTimers(); + try { + let failRunComplete = false; + const { appender } = createFakeAppender({ + failRunComplete: () => failRunComplete, + }); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 13, + }); + const waitPromise = completion.wait(5); + + await vi.advanceTimersByTimeAsync(5); + await expect(waitPromise).resolves.toEqual({ kind: 'timeout' }); + + failRunComplete = true; + await expect( + coordinator.ingestPtyData(completion.sentinel), + ).rejects.toThrow('run_complete append failed'); + } finally { + vi.useRealTimers(); + } + }); + + it('resolves pending waiters as exited without appending run_complete', async () => { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 14, + }); + const waitPromise = completion.wait(1_000); + + await coordinator.flushPtyDataOnExit(); + coordinator.resetForExit(); + + await expect(waitPromise).resolves.toEqual({ kind: 'exited' }); + expect(events).toEqual([]); + }); + + it('keeps exit resolution stable when the original timeout would have fired later', async () => { + vi.useFakeTimers(); + try { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 15, + }); + const waitPromise = completion.wait(100); + + coordinator.resetForExit(); + + await expect(waitPromise).resolves.toEqual({ kind: 'exited' }); + await vi.advanceTimersByTimeAsync(100); + expect(events).toEqual([]); + } finally { + vi.useRealTimers(); + } + }); + + it('rejects the active waiter when run_complete append fails', async () => { + const { appender } = createFakeAppender({ + failRunComplete: () => true, + }); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 16, + }); + const waitPromise = completion.wait(1_000); + + await expect( + coordinator.ingestPtyData(completion.sentinel), + ).rejects.toThrow('run_complete append failed'); + await expect(waitPromise).rejects.toThrow('run_complete append failed'); + }); + + it('rejects the active waiter when trailing postamble echo output append fails', async () => { + let failOutput = false; + const { appender } = createFakeAppender({ + failOutput: () => failOutput, + }); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 17, + }); + const waitPromise = completion.wait(1_000); + + await coordinator.ingestPtyData(completion.postamble.slice(0, 8)); + + failOutput = true; + const waitExpectation = expect(waitPromise).rejects.toThrow( + 'output append failed', + ); + await expect( + coordinator.ingestPtyData(completion.sentinel), + ).rejects.toThrow('output append failed'); + await waitExpectation; + }); + + it('does not reject the waiter when ordinary output append fails before completion', async () => { + let failOutput = true; + const { appender } = createFakeAppender({ + failOutput: () => failOutput, + }); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 18, + }); + const waitPromise = completion.wait(1_000); + + await expect(coordinator.ingestPtyData('visible')).rejects.toThrow( + 'output append failed', + ); + + failOutput = false; + await coordinator.ingestPtyData(completion.sentinel); + await expect(waitPromise).resolves.toEqual({ kind: 'completed', seq: 100 }); + }); + + it('flushes pending partial postamble echo bytes as output on exit', async () => { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 19, + }); + const partialPostamble = completion.postamble.slice(0, 8); + + await coordinator.ingestPtyData(partialPostamble); + expect(events).toEqual([]); + + await coordinator.flushPtyDataOnExit(); + + expect(events).toEqual([{ type: 'output', data: partialPostamble }]); + }); + + it('flushes pending non-completed sentinel bytes as output on exit', async () => { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 19, + }); + const partialSentinel = completion.sentinel.slice(0, 4); + + await coordinator.ingestPtyData(partialSentinel); + await coordinator.flushPtyDataOnExit(); + + expect(events).toEqual([{ type: 'output', data: partialSentinel }]); + }); +});