From 2534ccadeca6867b5f6b5d9de17d99989cc40794 Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Mon, 11 May 2026 12:35:17 -0700 Subject: [PATCH 1/4] fix: harden recorder proxy relay and data integrity - Forward HTTP method to upstream instead of hardcoding POST - Clear response timeout after successful completion (guard null socket) - Fix client-disconnect handler to check writableFinished before destroying - Wrap onHookBypassed and beforeWriteResponse callbacks in try/catch - Override audio content-type to application/json on non-2xx error relay - Atomic write (tmp+rename) for snapshot-mode fixture files - Sanitize undefined toolCall name/arguments before saving fixtures - Tighten video detection heuristic to exclude LLM provider fields - Clarify Float32Array alignment copy behavior --- src/recorder.ts | 89 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 22 deletions(-) diff --git a/src/recorder.ts b/src/recorder.ts index a57e5f4..a942bea 100644 --- a/src/recorder.ts +++ b/src/recorder.ts @@ -165,7 +165,7 @@ export async function proxyAndRecord( let streamedToClient = false; let clientDisconnected = false; try { - const result = await makeUpstreamRequest(target, forwardHeaders, requestBody, res); + const result = await makeUpstreamRequest(target, forwardHeaders, requestBody, res, req.method); upstreamStatus = result.status; upstreamHeaders = result.headers; upstreamBody = result.body; @@ -247,15 +247,20 @@ export async function proxyAndRecord( } else { const reasoningSpread = collapsed.reasoning ? { reasoning: collapsed.reasoning } : {}; if (collapsed.toolCalls && collapsed.toolCalls.length > 0) { + const sanitizedToolCalls = collapsed.toolCalls.map((tc) => ({ + ...tc, + name: tc.name ?? "", + arguments: tc.arguments ?? "{}", + })); if (collapsed.content) { // Both content and toolCalls present — save as ContentWithToolCallsResponse fixtureResponse = { content: collapsed.content, - toolCalls: collapsed.toolCalls, + toolCalls: sanitizedToolCalls, ...reasoningSpread, }; } else { - fixtureResponse = { toolCalls: collapsed.toolCalls, ...reasoningSpread }; + fixtureResponse = { toolCalls: sanitizedToolCalls, ...reasoningSpread }; } } else { fixtureResponse = { content: collapsed.content ?? "", ...reasoningSpread }; @@ -373,6 +378,9 @@ export async function proxyAndRecord( try { // Create the target directory (must be inside try/catch so filesystem // errors don't prevent the upstream response from being relayed). + // Keep synchronous: for streamed responses the HTTP reply is already on + // the wire, so any async yield lets callers observe the filesystem before + // the fixture is written. if (isSnapshotMode) { fs.mkdirSync(path.dirname(filepath), { recursive: true }); } else { @@ -412,7 +420,12 @@ export async function proxyAndRecord( fileContent._warning = warnings.join("; "); } - fs.writeFileSync(filepath, JSON.stringify(fileContent, null, 2), "utf-8"); + // Atomic write: write to temp file then rename to avoid read-modify-write races + // Keep synchronous — for streamed responses the HTTP response is already on the + // wire, so async writes would race with callers checking the filesystem. + const tmpPath = filepath + ".tmp." + process.pid; + fs.writeFileSync(tmpPath, JSON.stringify(fileContent, null, 2), "utf-8"); + fs.renameSync(tmpPath, filepath); writtenToDisk = true; } catch (err) { const msg = err instanceof Error ? err.message : "Unknown filesystem error"; @@ -450,31 +463,50 @@ export async function proxyAndRecord( : ctString.toLowerCase().includes("application/x-ndjson") ? "ndjson_streamed" : "sse_streamed"; - options.onHookBypassed(bypassReason); + try { + options.onHookBypassed(bypassReason); + } catch (err) { + defaults.logger.warn( + `onHookBypassed callback threw: ${err instanceof Error ? err.message : String(err)}`, + ); + } } } else { // Give the caller a chance to mutate or replace the response before relay. // Used by the chaos layer to turn a successful proxy into a malformed body. // `body` is the raw upstream bytes so binary payloads survive round-tripping. if (options?.beforeWriteResponse) { - const handled = await options.beforeWriteResponse({ - status: upstreamStatus, - contentType: ctString, - body: rawBuffer, - }); + let handled: boolean | undefined; + try { + handled = await options.beforeWriteResponse({ + status: upstreamStatus, + contentType: ctString, + body: rawBuffer, + }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + throw new Error(`beforeWriteResponse hook failed for ${providerKey}: ${msg}`); + } if (handled) return "handled_by_hook"; } - const relayHeaders: Record = {}; - if (ctString) { - relayHeaders["Content-Type"] = ctString; - } // Normalize status codes for the client: aimock acts as a gateway, so // upstream provider details (429 rate-limits, 503 outages, etc.) should // not leak. Successes → 200, errors → 502 (Bad Gateway). const clientStatus = upstreamStatus >= 200 && upstreamStatus < 300 ? 200 : 502; - res.writeHead(clientStatus, relayHeaders); const isAudioRelay = ctString.toLowerCase().startsWith("audio/"); + // When an upstream error (non-2xx) is relayed for an audio endpoint, the + // body is typically a JSON error object — override the content-type so + // clients don't try to decode JSON as audio. + const relayHeaders: Record = {}; + const clientCt = + (clientStatus >= 200 && clientStatus < 300) || !isAudioRelay + ? (ctString ?? "application/json") + : "application/json"; + if (clientCt) { + relayHeaders["Content-Type"] = clientCt; + } + res.writeHead(clientStatus, relayHeaders); res.end(isBinaryStream || isAudioRelay ? rawBuffer : upstreamBody); } @@ -490,6 +522,7 @@ function makeUpstreamRequest( headers: Record, body: string, clientRes?: http.ServerResponse, + method: string = "POST", ): Promise<{ status: number; headers: http.IncomingHttpHeaders; @@ -505,7 +538,7 @@ function makeUpstreamRequest( const req = transport.request( target, { - method: "POST", + method, timeout: UPSTREAM_TIMEOUT_MS, headers: { ...headers, @@ -548,10 +581,14 @@ function makeUpstreamRequest( // before the first data chunk arrives. if (typeof clientRes.flushHeaders === "function") clientRes.flushHeaders(); streamedToClient = true; - // Stop relaying if the client disconnects mid-stream + // Stop relaying if the client disconnects mid-stream. + // Check writableFinished to distinguish normal completion (where + // "close" also fires) from premature client disconnects. clientRes.on("close", () => { - clientDisconnected = true; - req.destroy(); + if (!clientRes.writableFinished) { + clientDisconnected = true; + req.destroy(); + } }); } const chunks: Buffer[] = []; @@ -574,6 +611,7 @@ function makeUpstreamRequest( }); res.on("error", reject); res.on("end", () => { + if (res.socket) res.setTimeout(0); const rawBuffer = Buffer.concat(chunks); if ( streamedToClient && @@ -661,8 +699,10 @@ function buildFixtureResponse( // Malformed embedding — return a zero-dimension embedding fixture return { embedding: [] }; } - const aligned = new Uint8Array(buf).buffer; // Always offset 0 - const floats = new Float32Array(aligned, 0, buf.byteLength / 4); + // Uint8Array constructor copies Buffer data to a fresh ArrayBuffer at offset 0, + // guaranteeing the alignment Float32Array requires. + const copied = new Uint8Array(buf); + const floats = new Float32Array(copied.buffer, 0, buf.byteLength / 4); return { embedding: Array.from(floats) }; } // OpenAI image generation: { created, data: [{ url, b64_json, revised_prompt }] } @@ -748,7 +788,12 @@ function buildFixtureResponse( !("message" in obj) && !("data" in obj) && !("object" in obj) && - !("outputs" in obj) + !("outputs" in obj) && + !("model" in obj) && + !("response" in obj) && + !("done" in obj) && + !("usage" in obj) && + !("error" in obj) ) { if (obj.status === "completed" && obj.url) { return { From 8b8d294afa0eb683a008395a65b62ee3f7188140 Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Mon, 11 May 2026 12:35:23 -0700 Subject: [PATCH 2/4] fix: improve server error handling, fal body relay, and chaos evaluation - Use consumed flag + deferred splice for one-shot error fixtures - Guard Azure model injection catch to only swallow SyntaxError - Pass matched fixture to evaluateChaos for non-completions endpoints - Cache fal request body to prevent double-consumption on passthrough - Include PUT in fal queue body reading --- src/server.ts | 165 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 147 insertions(+), 18 deletions(-) diff --git a/src/server.ts b/src/server.ts index 7e3fbcb..331cafd 100644 --- a/src/server.ts +++ b/src/server.ts @@ -354,14 +354,21 @@ async function handleControlAPI( }; // Insert at front so it matches before everything else fixtures.unshift(errorFixture); - // Remove synchronously on first match to prevent race conditions where - // two concurrent requests both match before the removal fires. + // One-shot: match once then self-remove. We use a `consumed` flag to + // prevent double-matching from concurrent requests and defer the actual + // splice via queueMicrotask so it never mutates the fixtures array while + // matchFixture is iterating over it. + let consumed = false; const original = errorFixture.match.predicate!; errorFixture.match.predicate = (req) => { + if (consumed) return false; const result = original(req); if (result) { - const idx = fixtures.indexOf(errorFixture); - if (idx !== -1) fixtures.splice(idx, 1); + consumed = true; + queueMicrotask(() => { + const idx = fixtures.indexOf(errorFixture); + if (idx !== -1) fixtures.splice(idx, 1); + }); } return result; }; @@ -1193,8 +1200,13 @@ export async function createServer( parsed.model = deploymentId; raw = JSON.stringify(parsed); } - } catch { - // Fall through — let handleEmbeddings report the parse error + } catch (err) { + if (!(err instanceof SyntaxError)) { + defaults.logger.error( + `Unexpected error in Azure model injection: ${err instanceof Error ? err.message : String(err)}`, + ); + } + // Fall through for parse errors — let handleEmbeddings report them } } await handleEmbeddings( @@ -1729,12 +1741,36 @@ export async function createServer( setCorsHeaders(res); try { const raw = await readBody(req); - const chaosAction = evaluateChaos(null, defaults.chaos, req.headers, defaults.logger); + // Try to match a fixture so chaos evaluation can use fixture-level overrides + let elSoundFixture: Fixture | null = null; + try { + const parsed = JSON.parse(raw) as Record; + const syntheticReq: ChatCompletionRequest = { + model: (parsed.model_id as string) ?? "eleven_text_to_sound_v2", + messages: [{ role: "user", content: (parsed.text as string) ?? "" }], + _endpointType: "audio-gen", + }; + const testId = getTestId(req); + elSoundFixture = matchFixture( + fixtures, + syntheticReq, + journal.getFixtureMatchCountsForTest(testId), + defaults.requestTransform, + ); + } catch { + // JSON parse failure — fixture matching not possible, handler will report the error + } + const chaosAction = evaluateChaos( + elSoundFixture, + defaults.chaos, + req.headers, + defaults.logger, + ); if (chaosAction) { applyChaosAction( chaosAction, res, - null, + elSoundFixture, journal, { method: req.method ?? "POST", @@ -1770,12 +1806,43 @@ export async function createServer( const musicSubType = musicMatch[1] ?? "music"; try { const raw = await readBody(req); - const chaosAction = evaluateChaos(null, defaults.chaos, req.headers, defaults.logger); + // Try to match a fixture so chaos evaluation can use fixture-level overrides + let elMusicFixture: Fixture | null = null; + try { + const parsed = JSON.parse(raw) as Record; + const prompt = + (typeof parsed.prompt === "string" ? parsed.prompt : null) ?? + (parsed.composition_plan != null + ? typeof parsed.composition_plan === "string" + ? parsed.composition_plan + : JSON.stringify(parsed.composition_plan) + : ""); + const syntheticReq: ChatCompletionRequest = { + model: (parsed.model_id as string) ?? "music_v1", + messages: [{ role: "user", content: prompt }], + _endpointType: "audio-gen", + }; + const testId = getTestId(req); + elMusicFixture = matchFixture( + fixtures, + syntheticReq, + journal.getFixtureMatchCountsForTest(testId), + defaults.requestTransform, + ); + } catch { + // JSON parse failure — fixture matching not possible, handler will report the error + } + const chaosAction = evaluateChaos( + elMusicFixture, + defaults.chaos, + req.headers, + defaults.logger, + ); if (chaosAction) { applyChaosAction( chaosAction, res, - null, + elMusicFixture, journal, { method: req.method ?? "POST", @@ -1804,6 +1871,10 @@ export async function createServer( return; } + // Body read by the general fal handler; preserved so legacy fal-audio + // routes below don't double-consume the stream on passthrough. + let falBody: string | undefined; + // /fal/* with `x-fal-target-host` header — general fal.ai routing // (queue.fal.run, fal.run, rest.fal.ai, rest.alpha.fal.ai). // Matches the requestMiddleware path-mirror convention used by @@ -1811,7 +1882,8 @@ export async function createServer( if (FAL_PREFIX_RE.test(pathname) && req.headers["x-fal-target-host"]) { setCorsHeaders(res); try { - const raw = req.method === "POST" || req.method === "PUT" ? await readBody(req) : ""; + falBody = req.method === "POST" || req.method === "PUT" ? await readBody(req) : ""; + const raw = falBody; const chaosAction = evaluateChaos(null, defaults.chaos, req.headers, defaults.logger); if (chaosAction) { applyChaosAction( @@ -1853,13 +1925,41 @@ export async function createServer( if (falQueueSubmitMatch && req.method === "POST") { setCorsHeaders(res); try { - const raw = await readBody(req); - const chaosAction = evaluateChaos(null, defaults.chaos, req.headers, defaults.logger); + const raw = falBody ?? (await readBody(req)); + // Try to match a fixture so chaos evaluation can use fixture-level overrides + let falSubmitFixture: Fixture | null = null; + try { + const parsed = raw.trim() ? (JSON.parse(raw) as Record) : {}; + const prompt = + (typeof parsed.prompt === "string" ? parsed.prompt : null) ?? + (typeof parsed.text === "string" ? parsed.text : null) ?? + ""; + const syntheticReq: ChatCompletionRequest = { + model: falQueueSubmitMatch[1], + messages: [{ role: "user", content: prompt }], + _endpointType: "fal-audio", + }; + const testId = getTestId(req); + falSubmitFixture = matchFixture( + fixtures, + syntheticReq, + journal.getFixtureMatchCountsForTest(testId), + defaults.requestTransform, + ); + } catch { + // JSON parse failure — fixture matching not possible, handler will report the error + } + const chaosAction = evaluateChaos( + falSubmitFixture, + defaults.chaos, + req.headers, + defaults.logger, + ); if (chaosAction) { applyChaosAction( chaosAction, res, - null, + falSubmitFixture, journal, { method: req.method ?? "POST", @@ -1896,7 +1996,8 @@ export async function createServer( ) { setCorsHeaders(res); try { - const raw = req.method === "POST" ? await readBody(req) : "{}"; + const raw = + req.method === "POST" || req.method === "PUT" ? (falBody ?? (await readBody(req))) : "{}"; const chaosAction = evaluateChaos(null, defaults.chaos, req.headers, defaults.logger); if (chaosAction) { applyChaosAction( @@ -1936,13 +2037,41 @@ export async function createServer( if (falRunMatch && req.method === "POST") { setCorsHeaders(res); try { - const raw = await readBody(req); - const chaosAction = evaluateChaos(null, defaults.chaos, req.headers, defaults.logger); + const raw = falBody ?? (await readBody(req)); + // Try to match a fixture so chaos evaluation can use fixture-level overrides + let falRunFixture: Fixture | null = null; + try { + const parsed = raw.trim() ? (JSON.parse(raw) as Record) : {}; + const prompt = + (typeof parsed.prompt === "string" ? parsed.prompt : null) ?? + (typeof parsed.text === "string" ? parsed.text : null) ?? + ""; + const syntheticReq: ChatCompletionRequest = { + model: falRunMatch[1], + messages: [{ role: "user", content: prompt }], + _endpointType: "fal-audio", + }; + const testId = getTestId(req); + falRunFixture = matchFixture( + fixtures, + syntheticReq, + journal.getFixtureMatchCountsForTest(testId), + defaults.requestTransform, + ); + } catch { + // JSON parse failure — fixture matching not possible, handler will report the error + } + const chaosAction = evaluateChaos( + falRunFixture, + defaults.chaos, + req.headers, + defaults.logger, + ); if (chaosAction) { applyChaosAction( chaosAction, res, - null, + falRunFixture, journal, { method: req.method ?? "POST", From 48fbda2b66f5f69ef0a1de252aefd127898ce84c Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Mon, 11 May 2026 12:35:30 -0700 Subject: [PATCH 3/4] test: harden recorder test cleanup, dedup helpers, fix assertions - Clean up previous tmpDir before overwriting in strict mode test - Replace global fetch with node:http helpers (del, postRaw) - Wrap default fixturePath test in try/finally for reliable cleanup - Hoist duplicate createRawUpstream to shared scope - Add explicit headersSent mock property instead of relying on internals - Guard setupUpstreamAndRecorder against leaked resources on re-entry - Narrow bare catch to only swallow expected filesystem errors --- src/__tests__/recorder.test.ts | 389 +++++++++++++++++++++------------ 1 file changed, 245 insertions(+), 144 deletions(-) diff --git a/src/__tests__/recorder.test.ts b/src/__tests__/recorder.test.ts index a73ee70..db962e1 100644 --- a/src/__tests__/recorder.test.ts +++ b/src/__tests__/recorder.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect, afterEach, vi } from "vitest"; +import { describe, it, expect, afterEach, vi, type MockInstance } from "vitest"; import * as http from "node:http"; import * as fs from "node:fs"; import * as os from "node:os"; @@ -10,6 +10,7 @@ import type { RecordConfig } from "../types.js"; import { Logger } from "../logger.js"; import { LLMock } from "../llmock.js"; import { encodeEventStreamMessage } from "../aws-event-stream.js"; +import { loadFixtureFile } from "../fixture-loader.js"; // --------------------------------------------------------------------------- // HTTP helpers @@ -82,6 +83,72 @@ function get( }); } +function postRaw( + url: string, + rawBody: string, + headers?: Record, +): Promise<{ status: number; headers: http.IncomingHttpHeaders; body: string }> { + return new Promise((resolve, reject) => { + const parsed = new URL(url); + const req = http.request( + { + hostname: parsed.hostname, + port: parsed.port, + path: parsed.pathname, + method: "POST", + headers: { + "Content-Type": "application/json", + "Content-Length": Buffer.byteLength(rawBody), + ...headers, + }, + }, + (res) => { + const chunks: Buffer[] = []; + res.on("data", (c: Buffer) => chunks.push(c)); + res.on("end", () => { + resolve({ + status: res.statusCode ?? 0, + headers: res.headers, + body: Buffer.concat(chunks).toString(), + }); + }); + }, + ); + req.on("error", reject); + req.write(rawBody); + req.end(); + }); +} + +function del( + url: string, +): Promise<{ status: number; headers: http.IncomingHttpHeaders; body: string }> { + return new Promise((resolve, reject) => { + const parsed = new URL(url); + const req = http.request( + { + hostname: parsed.hostname, + port: parsed.port, + path: parsed.pathname, + method: "DELETE", + }, + (res) => { + const chunks: Buffer[] = []; + res.on("data", (c: Buffer) => chunks.push(c)); + res.on("end", () => { + resolve({ + status: res.statusCode ?? 0, + headers: res.headers, + body: Buffer.concat(chunks).toString(), + }); + }); + }, + ); + req.on("error", reject); + req.end(); + }); +} + // --------------------------------------------------------------------------- // Test state // --------------------------------------------------------------------------- @@ -637,6 +704,9 @@ describe("recorder strict mode", () => { // Need to create a new recorder with both record + strict await new Promise((resolve) => recorder!.server.close(() => resolve())); + if (tmpDir) { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-record-")); recorder = await createServer([], { port: 0, @@ -1092,7 +1162,7 @@ describe("recorder end-to-end replay", () => { ); // Clear journal to distinguish proxy vs fixture-match - await fetch(`${recorderUrl}/v1/_requests`, { method: "DELETE" }); + await del(`${recorderUrl}/v1/_requests`); // Second request — should match recorded fixture const resp2 = await post(`${recorderUrl}/v1/chat/completions`, { @@ -1290,20 +1360,31 @@ describe("recorder edge cases", () => { // Check the default path const defaultPath = path.resolve("./fixtures/recorded"); - expect(fs.existsSync(defaultPath)).toBe(true); - const files = fs.readdirSync(defaultPath); - const fixtureFiles = files.filter((f) => f.startsWith("openai-") && f.endsWith(".json")); - expect(fixtureFiles.length).toBeGreaterThanOrEqual(1); - - // Clean up the default path files we just created - for (const f of fixtureFiles) { - fs.unlinkSync(path.join(defaultPath, f)); - } - // Remove dir if empty try { - fs.rmdirSync(defaultPath); - } catch { - // ignore — might not be empty if other tests ran + expect(fs.existsSync(defaultPath)).toBe(true); + const files = fs.readdirSync(defaultPath); + const fixtureFiles = files.filter((f) => f.startsWith("openai-") && f.endsWith(".json")); + expect(fixtureFiles.length).toBeGreaterThanOrEqual(1); + } finally { + // Clean up the default path files we created + if (fs.existsSync(defaultPath)) { + const cleanupFiles = fs.readdirSync(defaultPath); + for (const f of cleanupFiles.filter( + (f) => f.startsWith("openai-") && f.endsWith(".json"), + )) { + fs.unlinkSync(path.join(defaultPath, f)); + } + // Remove dir if empty — only swallow expected ENOTEMPTY/ENOENT + try { + fs.rmdirSync(defaultPath); + } catch (err: unknown) { + const code = + err instanceof Error && "code" in err ? (err as NodeJS.ErrnoException).code : undefined; + if (code !== "ENOTEMPTY" && code !== "ENOENT") { + console.warn("Unexpected error cleaning up defaultPath:", err); + } + } + } } }); @@ -1418,11 +1499,7 @@ describe("recorder edge cases", () => { // Send body with specific formatting (extra spaces, key order) const customBody = '{"model": "gpt-4", "messages": [{"role": "user", "content": "preserve me"}]}'; - const resp = await fetch(`${recorder.url}/v1/chat/completions`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: customBody, - }); + const resp = await postRaw(`${recorder.url}/v1/chat/completions`, customBody); expect(resp.status).toBe(200); // The upstream should have received the original body, not re-serialized @@ -1758,10 +1835,7 @@ describe("recorder multi-turn disambiguation", () => { const recordedFixtures: Fixture[] = fs .readdirSync(fixturePath) .filter((f) => f.endsWith(".json")) - .flatMap( - (f) => - (JSON.parse(fs.readFileSync(path.join(fixturePath, f), "utf-8")) as FixtureFile).fixtures, - ); + .flatMap((f) => loadFixtureFile(path.join(fixturePath, f))); expect(recordedFixtures).toHaveLength(2); // Phase 2: replay against a fresh aimock with only the recorded fixtures. @@ -1908,11 +1982,7 @@ describe("recorder multi-turn disambiguation", () => { const allFixtures: Fixture[] = fs .readdirSync(sharedFixturePath) .filter((f) => f.endsWith(".json")) - .flatMap( - (f) => - (JSON.parse(fs.readFileSync(path.join(sharedFixturePath, f), "utf-8")) as FixtureFile) - .fixtures, - ); + .flatMap((f) => loadFixtureFile(path.join(sharedFixturePath, f))); expect(allFixtures).toHaveLength(4); recorder = await createServer(allFixtures, { port: 0 }); @@ -2478,6 +2548,24 @@ describe("recorder filesystem write failure", () => { // buildFixtureResponse for non-OpenAI formats // --------------------------------------------------------------------------- +/** Shared helper: spins up a raw HTTP server that returns `responseBody` as JSON. */ +function createRawUpstream( + responseBody: object, + servers: http.Server[], +): Promise<{ url: string; server: http.Server }> { + return new Promise((resolve) => { + const srv = http.createServer((_req, res) => { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify(responseBody)); + }); + srv.listen(0, "127.0.0.1", () => { + const addr = srv.address() as { port: number }; + servers.push(srv); + resolve({ url: `http://127.0.0.1:${addr.port}`, server: srv }); + }); + }); +} + describe("recorder buildFixtureResponse non-OpenAI formats", () => { let servers: http.Server[] = []; @@ -2488,28 +2576,17 @@ describe("recorder buildFixtureResponse non-OpenAI formats", () => { servers = []; }); - function createRawUpstream(responseBody: object): Promise<{ url: string; server: http.Server }> { - return new Promise((resolve) => { - const srv = http.createServer((_req, res) => { - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify(responseBody)); - }); - srv.listen(0, "127.0.0.1", () => { - const addr = srv.address() as { port: number }; - servers.push(srv); - resolve({ url: `http://127.0.0.1:${addr.port}`, server: srv }); - }); - }); - } - it("records Anthropic format (content array with type/text)", async () => { - const { url: upstreamUrl } = await createRawUpstream({ - id: "msg_123", - type: "message", - role: "assistant", - content: [{ type: "text", text: "Bonjour from Anthropic" }], - stop_reason: "end_turn", - }); + const { url: upstreamUrl } = await createRawUpstream( + { + id: "msg_123", + type: "message", + role: "assistant", + content: [{ type: "text", text: "Bonjour from Anthropic" }], + stop_reason: "end_turn", + }, + servers, + ); tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-record-")); recorder = await createServer([], { @@ -2536,14 +2613,17 @@ describe("recorder buildFixtureResponse non-OpenAI formats", () => { }); it("records Gemini format (candidates array)", async () => { - const { url: upstreamUrl } = await createRawUpstream({ - candidates: [ - { - content: { role: "model", parts: [{ text: "Hello from Gemini" }] }, - finishReason: "STOP", - }, - ], - }); + const { url: upstreamUrl } = await createRawUpstream( + { + candidates: [ + { + content: { role: "model", parts: [{ text: "Hello from Gemini" }] }, + finishReason: "STOP", + }, + ], + }, + servers, + ); tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-record-")); recorder = await createServer([], { @@ -2568,11 +2648,14 @@ describe("recorder buildFixtureResponse non-OpenAI formats", () => { }); it("records Ollama format (message object)", async () => { - const { url: upstreamUrl } = await createRawUpstream({ - model: "llama3", - message: { role: "assistant", content: "Hello from Ollama" }, - done: true, - }); + const { url: upstreamUrl } = await createRawUpstream( + { + model: "llama3", + message: { role: "assistant", content: "Hello from Ollama" }, + done: true, + }, + servers, + ); tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-record-")); recorder = await createServer([], { @@ -2664,6 +2747,7 @@ describe("recorder content + toolCalls coexistence", () => { expect(fixtureContent.fixtures[0].response.toolCalls).toBeDefined(); expect(fixtureContent.fixtures[0].response.toolCalls).toHaveLength(1); expect(fixtureContent.fixtures[0].response.toolCalls![0].name).toBe("search"); + expect(fixtureContent.fixtures[0].response.content).toBeDefined(); await new Promise((resolve) => rawServer.close(() => resolve())); }); @@ -3601,6 +3685,7 @@ describe("recorder streaming edge cases", () => { expect(savedResponse.toolCalls).toBeDefined(); expect(savedResponse.toolCalls).toHaveLength(1); expect(savedResponse.toolCalls![0].name).toBe("get_weather"); + expect(savedResponse.content).toBeDefined(); }); }); @@ -3618,30 +3703,19 @@ describe("buildFixtureResponse additional format variants", () => { servers = []; }); - function createRawUpstream(responseBody: object): Promise<{ url: string; server: http.Server }> { - return new Promise((resolve) => { - const srv = http.createServer((_req, res) => { - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify(responseBody)); - }); - srv.listen(0, "127.0.0.1", () => { - const addr = srv.address() as { port: number }; - servers.push(srv); - resolve({ url: `http://127.0.0.1:${addr.port}`, server: srv }); - }); - }); - } - it("detects Bedrock Converse format (output.message.content text)", async () => { - const { url: upstreamUrl } = await createRawUpstream({ - output: { - message: { - role: "assistant", - content: [{ text: "Hello from Bedrock Converse" }], + const { url: upstreamUrl } = await createRawUpstream( + { + output: { + message: { + role: "assistant", + content: [{ text: "Hello from Bedrock Converse" }], + }, }, + stopReason: "end_turn", }, - stopReason: "end_turn", - }); + servers, + ); tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-record-")); recorder = await createServer([], { @@ -3667,22 +3741,25 @@ describe("buildFixtureResponse additional format variants", () => { }); it("detects Bedrock Converse toolUse format", async () => { - const { url: upstreamUrl } = await createRawUpstream({ - output: { - message: { - role: "assistant", - content: [ - { - toolUse: { - name: "get_weather", - input: { city: "NYC" }, + const { url: upstreamUrl } = await createRawUpstream( + { + output: { + message: { + role: "assistant", + content: [ + { + toolUse: { + name: "get_weather", + input: { city: "NYC" }, + }, }, - }, - ], + ], + }, }, + stopReason: "tool_use", }, - stopReason: "tool_use", - }); + servers, + ); tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-record-")); recorder = await createServer([], { @@ -3714,17 +3791,20 @@ describe("buildFixtureResponse additional format variants", () => { }); it("detects Anthropic tool_use with string input", async () => { - const { url: upstreamUrl } = await createRawUpstream({ - content: [ - { - type: "tool_use", - id: "toolu_str", - name: "search", - input: '{"query":"hello"}', - }, - ], - role: "assistant", - }); + const { url: upstreamUrl } = await createRawUpstream( + { + content: [ + { + type: "tool_use", + id: "toolu_str", + name: "search", + input: '{"query":"hello"}', + }, + ], + role: "assistant", + }, + servers, + ); tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-record-")); recorder = await createServer([], { @@ -3757,22 +3837,25 @@ describe("buildFixtureResponse additional format variants", () => { }); it("detects Gemini functionCall with string args", async () => { - const { url: upstreamUrl } = await createRawUpstream({ - candidates: [ - { - content: { - parts: [ - { - functionCall: { - name: "search", - args: '{"query":"hello"}', + const { url: upstreamUrl } = await createRawUpstream( + { + candidates: [ + { + content: { + parts: [ + { + functionCall: { + name: "search", + args: '{"query":"hello"}', + }, }, - }, - ], + ], + }, }, - }, - ], - }); + ], + }, + servers, + ); tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-record-")); recorder = await createServer([], { @@ -3802,14 +3885,17 @@ describe("buildFixtureResponse additional format variants", () => { }); it("detects Ollama message.content as array format", async () => { - const { url: upstreamUrl } = await createRawUpstream({ - model: "llama3", - message: { - role: "assistant", - content: [{ text: "Array content from Ollama" }], + const { url: upstreamUrl } = await createRawUpstream( + { + model: "llama3", + message: { + role: "assistant", + content: [{ text: "Array content from Ollama" }], + }, + done: true, }, - done: true, - }); + servers, + ); tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-record-")); recorder = await createServer([], { @@ -3836,22 +3922,25 @@ describe("buildFixtureResponse additional format variants", () => { }); it("detects Ollama tool_calls with string arguments", async () => { - const { url: upstreamUrl } = await createRawUpstream({ - model: "llama3", - message: { - role: "assistant", - content: "", - tool_calls: [ - { - function: { - name: "search", - arguments: '{"query":"test"}', + const { url: upstreamUrl } = await createRawUpstream( + { + model: "llama3", + message: { + role: "assistant", + content: "", + tool_calls: [ + { + function: { + name: "search", + arguments: '{"query":"test"}', + }, }, - }, - ], + ], + }, + done: true, }, - done: true, - }); + servers, + ); tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-record-")); recorder = await createServer([], { @@ -3921,6 +4010,8 @@ function createMockReqRes(): { req: http.IncomingMessage; res: http.ServerRespon const req = Object.create(http.IncomingMessage.prototype) as http.IncomingMessage; req.headers = {}; const res = Object.create(http.ServerResponse.prototype) as http.ServerResponse; + // Use explicit property instead of relying on Node.js internal _header getter + Object.defineProperty(res, "headersSent", { value: false, writable: true }); return { req, res }; } @@ -3928,6 +4019,16 @@ async function setupUpstreamAndRecorder( upstreamFixtures: Fixture[], providerKey: string = "openai", ): Promise<{ upstreamUrl: string; recorderUrl: string; fixturePath: string }> { + // Ensure previous resources are cleaned up before reassignment + if (recorder) { + await new Promise((resolve) => recorder!.server.close(() => resolve())); + recorder = undefined; + } + if (upstream) { + await new Promise((resolve) => upstream!.server.close(() => resolve())); + upstream = undefined; + } + // Create upstream "real API" server upstream = await createServer(upstreamFixtures, { port: 0 }); @@ -3956,7 +4057,7 @@ async function setupUpstreamAndRecorder( describe("makeUpstreamRequest body timeout", () => { let fastRawServer: http.Server | undefined; - let setTimeoutSpy: ReturnType | undefined; + let setTimeoutSpy: MockInstance | undefined; afterEach(async () => { setTimeoutSpy?.mockRestore(); From ee07f6b0a82c1bb74bd556d4f70ef6361f4c9f17 Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Mon, 11 May 2026 12:40:52 -0700 Subject: [PATCH 4/4] docs: update Unreleased changelog for PR #177 fixes --- CHANGELOG.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 23f0b3f..0fd01a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,23 @@ - **Drift detection compared only first event per type** — `compareSSESequences` now compares ALL events per type, not just the first, catching previously invisible divergences - **Ollama drift tests used broken async describe.skipIf** — replaced with synchronous env-var gate so tests are correctly skipped or executed - **12 unrestored spy/mock leaks and misleading assertions** — fix spy/mock leaks across test files and correct assertions that passed for the wrong reasons +- Proxy relay hardcoded POST method — now forwards the original HTTP method +- Response timeout timer leak — cleared after successful upstream completion +- Client disconnect handler race — checks `writableFinished` before destroying upstream request +- `onHookBypassed` and `beforeWriteResponse` callbacks not wrapped in try/catch +- Audio error relay sent non-2xx responses with audio content-type instead of application/json +- Snapshot-mode fixture writes not atomic — concurrent requests could corrupt the file +- Undefined `toolCall` name/arguments silently dropped during fixture save +- Video detection heuristic false-positives on LLM provider responses with `{id, status}` shape +- One-shot error fixture splice during iteration (deferred via microtask) +- Azure model injection catch swallowed non-SyntaxError exceptions +- fal request body lost on passthrough (double `readBody` consumption) +- fal queue handler dropped PUT request body +- Recorder test: tmpDir leak on strict-mode reassignment, global fetch dependency, fragile fixturePath cleanup, duplicate helpers, spy leak on assertion failure + +### Added + +- Fixture-level chaos evaluation for non-completions endpoints (ElevenLabs, fal) ### Changed