diff --git a/docs/AGENTS.md b/docs/AGENTS.md index 7fc5330b0b..f0043ade2d 100644 --- a/docs/AGENTS.md +++ b/docs/AGENTS.md @@ -41,6 +41,7 @@ gh pr view --json mergeable,mergeStateStatus | jq '.' ## Repo Reference - Core files: `src/main.ts`, `src/preload.ts`, `src/App.tsx`, `src/config.ts`. +- Up-to-date model names: see `src/common/knownModels.ts` for current provider model IDs. - Persistent data: `~/.mux/config.json`, `~/.mux/src//` (worktrees), `~/.mux/sessions//chat.jsonl`. ## Documentation Rules @@ -69,6 +70,12 @@ gh pr view --json mergeable,mergeStateStatus | jq '.' - Use `git mv` to retain history when moving files. - Never kill the running mux process; rely on `make typecheck` + targeted `bun test path/to/file.test.ts` for validation (run `make test` only when necessary; it can be slow). +## Self-Healing & Crash Resilience + +- Prefer **self-healing** behavior: if corrupted or invalid data exists in persisted state (e.g., `chat.jsonl`), the system should sanitize or filter it at load/request time rather than failing permanently. +- Never let a single malformed line in history brick a workspace—apply defensive filtering in request-building paths so the user can continue working. +- When streaming crashes, any incomplete state committed to disk should either be repairable on next load or excluded from provider requests to avoid API validation errors. + ## Testing Doctrine Two types of tests are preferred: diff --git a/src/browser/stories/App.projectSettings.stories.tsx b/src/browser/stories/App.projectSettings.stories.tsx index 3fde7bdd60..d24c856342 100644 --- a/src/browser/stories/App.projectSettings.stories.tsx +++ b/src/browser/stories/App.projectSettings.stories.tsx @@ -148,7 +148,7 @@ async function openProjectSettings(canvasElement: HTMLElement): Promise { const settingsButton = await canvas.findByTestId("settings-button", {}, { timeout: 10000 }); await userEvent.click(settingsButton); - await body.findByRole("dialog"); + await body.findByRole("dialog", {}, { timeout: 10000 }); const projectsButton = await body.findByRole("button", { name: /Projects/i }); await userEvent.click(projectsButton); @@ -171,7 +171,7 @@ async function openWorkspaceMCPModal(canvasElement: HTMLElement): Promise await userEvent.click(mcpButton); // Wait for dialog - await body.findByRole("dialog"); + await body.findByRole("dialog", {}, { timeout: 10000 }); } // ═══════════════════════════════════════════════════════════════════════════════ diff --git a/src/browser/utils/messages/ChatEventProcessor.ts b/src/browser/utils/messages/ChatEventProcessor.ts index 7d19b11401..268da68da8 100644 --- a/src/browser/utils/messages/ChatEventProcessor.ts +++ b/src/browser/utils/messages/ChatEventProcessor.ts @@ -254,12 +254,27 @@ export function createChatEventProcessor(): ChatEventProcessor { const lastPart = message.parts.at(-1); if (lastPart?.type === "reasoning") { - lastPart.text += event.delta; + // Signature updates come with empty delta - just update the signature + if (event.signature && !event.delta) { + lastPart.signature = event.signature; + lastPart.providerOptions = { anthropic: { signature: event.signature } }; + } else { + lastPart.text += event.delta; + // Also capture signature if present with text + if (event.signature) { + lastPart.signature = event.signature; + lastPart.providerOptions = { anthropic: { signature: event.signature } }; + } + } } else { message.parts.push({ type: "reasoning", text: event.delta, timestamp: event.timestamp, + signature: event.signature, + providerOptions: event.signature + ? { anthropic: { signature: event.signature } } + : undefined, }); } return; diff --git a/src/browser/utils/messages/modelMessageTransform.test.ts b/src/browser/utils/messages/modelMessageTransform.test.ts index ffb3c80540..2b9e586462 100644 --- a/src/browser/utils/messages/modelMessageTransform.test.ts +++ b/src/browser/utils/messages/modelMessageTransform.test.ts @@ -152,7 +152,7 @@ describe("modelMessageTransform", () => { expect(lastAssistant).toBeTruthy(); expect(Array.isArray(lastAssistant?.content)).toBe(true); if (Array.isArray(lastAssistant?.content)) { - expect(lastAssistant.content[0]).toEqual({ type: "reasoning", text: "" }); + expect(lastAssistant.content[0]).toEqual({ type: "reasoning", text: "..." }); } }); it("should keep text-only messages unchanged", () => { @@ -1151,6 +1151,70 @@ describe("filterEmptyAssistantMessages", () => { expect(result2[0].id).toBe("assistant-1"); }); + it("should filter out assistant messages with only incomplete tool calls (input-available)", () => { + const messages: MuxMessage[] = [ + { + id: "user-1", + role: "user", + parts: [{ type: "text", text: "Run a command" }], + metadata: { timestamp: 1000 }, + }, + { + id: "assistant-1", + role: "assistant", + parts: [ + { + type: "dynamic-tool", + state: "input-available", + toolCallId: "call-1", + toolName: "bash", + input: { script: "pwd" }, + }, + ], + metadata: { timestamp: 2000, partial: true }, + }, + { + id: "user-2", + role: "user", + parts: [{ type: "text", text: "Continue" }], + metadata: { timestamp: 3000 }, + }, + ]; + + // Incomplete tool calls are dropped by convertToModelMessages(ignoreIncompleteToolCalls: true), + // so we must treat them as empty here to avoid generating an invalid request. + const result = filterEmptyAssistantMessages(messages, false); + expect(result.map((m) => m.id)).toEqual(["user-1", "user-2"]); + }); + + it("should preserve assistant messages with completed tool calls (output-available)", () => { + const messages: MuxMessage[] = [ + { + id: "user-1", + role: "user", + parts: [{ type: "text", text: "Run a command" }], + metadata: { timestamp: 1000 }, + }, + { + id: "assistant-1", + role: "assistant", + parts: [ + { + type: "dynamic-tool", + state: "output-available", + toolCallId: "call-1", + toolName: "bash", + input: { script: "pwd" }, + output: { stdout: "https://github.com/home/user" }, + }, + ], + metadata: { timestamp: 2000 }, + }, + ]; + + const result = filterEmptyAssistantMessages(messages, false); + expect(result.map((m) => m.id)).toEqual(["user-1", "assistant-1"]); + }); it("should filter out assistant messages with only empty text regardless of preserveReasoningOnly", () => { const messages: MuxMessage[] = [ { diff --git a/src/browser/utils/messages/modelMessageTransform.ts b/src/browser/utils/messages/modelMessageTransform.ts index b74ed67626..251a8c074f 100644 --- a/src/browser/utils/messages/modelMessageTransform.ts +++ b/src/browser/utils/messages/modelMessageTransform.ts @@ -44,10 +44,37 @@ export function filterEmptyAssistantMessages( return false; } - // Keep assistant messages that have at least one text or tool part - const hasContent = msg.parts.some( - (part) => (part.type === "text" && part.text) || part.type === "dynamic-tool" - ); + // Keep assistant messages that have at least one part that will survive + // conversion to provider ModelMessages. + // + // Important: We call convertToModelMessages(..., { ignoreIncompleteToolCalls: true }). + // That means *incomplete* tool calls (state: "input-available") will be dropped. + // If we treat them as content here, we can end up sending an assistant message that + // becomes empty after conversion, which the AI SDK rejects ("all messages must have + // non-empty content...") and can brick a workspace after a crash. + const hasContent = msg.parts.some((part) => { + if (part.type === "text") { + return part.text.length > 0; + } + + // Reasoning-only messages are handled below (provider-dependent). + if (part.type === "reasoning") { + return false; + } + + if (part.type === "dynamic-tool") { + // Only completed tool calls produce content that can be replayed to the model. + return part.state === "output-available"; + } + + // File/image parts count as content. + if (part.type === "file") { + return true; + } + + // Future-proofing: unknown parts should not brick the request. + return true; + }); if (hasContent) { return true; @@ -463,6 +490,62 @@ function filterReasoningOnlyMessages(messages: ModelMessage[]): ModelMessage[] { }); } +/** + * Strip Anthropic reasoning parts that lack a valid signature. + * + * Anthropic's Extended Thinking API requires thinking blocks to include a signature + * for replay. The Vercel AI SDK's Anthropic provider only sends reasoning parts to + * the API if they have providerOptions.anthropic.signature. Reasoning parts we create + * (placeholders) or from history (where we didn't capture the signature) will be + * silently dropped by the SDK. + * + * If all parts of an assistant message are unsigned reasoning, the SDK drops them all, + * leaving an empty message that Anthropic rejects with: + * "all messages must have non-empty content except for the optional final assistant message" + * + * This function removes unsigned reasoning upfront and filters resulting empty messages. + * + * NOTE: This is Anthropic-specific. Other providers (e.g., OpenAI) handle reasoning + * differently and don't require signatures. + */ +function stripUnsignedAnthropicReasoning(messages: ModelMessage[]): ModelMessage[] { + const stripped = messages.map((msg) => { + if (msg.role !== "assistant") { + return msg; + } + + const assistantMsg = msg; + if (typeof assistantMsg.content === "string") { + return msg; + } + + // Filter out reasoning parts without anthropic.signature in providerOptions + const content = assistantMsg.content.filter((part) => { + if (part.type !== "reasoning") { + return true; + } + // Check for anthropic.signature in providerOptions + const anthropicMeta = (part.providerOptions as { anthropic?: { signature?: string } }) + ?.anthropic; + return anthropicMeta?.signature != null; + }); + + const result: typeof assistantMsg = { ...assistantMsg, content }; + return result; + }); + + // Filter out messages that became empty after stripping reasoning + return stripped.filter((msg) => { + if (msg.role !== "assistant") { + return true; + } + if (typeof msg.content === "string") { + return msg.content.length > 0; + } + return msg.content.length > 0; + }); +} + /** * Coalesce consecutive parts of the same type within each message. * Streaming creates many individual text/reasoning parts; merge them for easier debugging. @@ -512,7 +595,6 @@ function coalesceConsecutiveParts(messages: ModelMessage[]): ModelMessage[] { }; }); } - /** * Merge consecutive user messages with newline separators. * When filtering removes assistant messages, we can end up with consecutive user messages. @@ -610,9 +692,10 @@ function ensureAnthropicThinkingBeforeToolCalls(messages: ModelMessage[]): Model } // Anthropic extended thinking requires tool-use assistant messages to start with a thinking block. - // If we still have no reasoning available, insert an empty reasoning part as a minimal placeholder. + // If we still have no reasoning available, insert a minimal placeholder reasoning part. + // NOTE: The text cannot be empty - Anthropic API rejects empty content. if (reasoningParts.length === 0) { - reasoningParts = [{ type: "reasoning" as const, text: "" }]; + reasoningParts = [{ type: "reasoning" as const, text: "..." }]; } result.push({ @@ -641,7 +724,7 @@ function ensureAnthropicThinkingBeforeToolCalls(messages: ModelMessage[]): Model result[i] = { ...assistantMsg, content: [ - { type: "reasoning" as const, text: "" }, + { type: "reasoning" as const, text: "..." }, { type: "text" as const, text }, ], }; @@ -658,7 +741,7 @@ function ensureAnthropicThinkingBeforeToolCalls(messages: ModelMessage[]): Model result[i] = { ...assistantMsg, - content: [{ type: "reasoning" as const, text: "" }, ...content], + content: [{ type: "reasoning" as const, text: "..." }, ...content], }; break; } @@ -703,7 +786,9 @@ export function transformModelMessages( // Anthropic: When extended thinking is enabled, preserve reasoning-only messages and ensure // tool-call messages start with reasoning. When it's disabled, filter reasoning-only messages. if (options?.anthropicThinkingEnabled) { - reasoningHandled = ensureAnthropicThinkingBeforeToolCalls(split); + // First strip reasoning without signatures (SDK will drop them anyway, causing empty messages) + const signedReasoning = stripUnsignedAnthropicReasoning(split); + reasoningHandled = ensureAnthropicThinkingBeforeToolCalls(signedReasoning); } else { reasoningHandled = filterReasoningOnlyMessages(split); } diff --git a/src/cli/run.ts b/src/cli/run.ts index b31ca01bf9..27bc900a7a 100644 --- a/src/cli/run.ts +++ b/src/cli/run.ts @@ -195,6 +195,7 @@ program .option("--json", "output NDJSON for programmatic consumption") .option("-q, --quiet", "only output final result") .option("--workspace-id ", "explicit workspace ID (auto-generated if not provided)") + .option("--workspace ", "continue an existing workspace (loads history, skips init)") .option("--config-root ", "mux config directory") .option("--mcp ", "MCP server as name=command (can be repeated)", collectMcpServers, []) .option("--no-mcp-config", "ignore .mux/mcp.jsonc, use only --mcp servers") @@ -227,6 +228,7 @@ interface CLIOptions { json?: boolean; quiet?: boolean; workspaceId?: string; + workspace?: string; configRoot?: string; mcp: MCPServerEntry[]; mcpConfig: boolean; @@ -250,10 +252,6 @@ async function main(): Promise { } // Default is already "warn" for CLI mode (set in log.ts) - // Resolve directory - const projectDir = path.resolve(opts.dir); - await ensureDirectory(projectDir); - // Get message from arg or stdin const stdinMessage = await gatherMessageFromStdin(); const message = messageArg?.trim() ?? stdinMessage.trim(); @@ -266,7 +264,35 @@ async function main(): Promise { // Setup config const config = new Config(opts.configRoot); - const workspaceId = opts.workspaceId ?? generateWorkspaceId(); + + // Determine if continuing an existing workspace + const continueWorkspace = opts.workspace; + const workspaceId = continueWorkspace ?? opts.workspaceId ?? generateWorkspaceId(); + + // Resolve directory - for continuing workspace, try to get from metadata + let projectDir: string; + if (continueWorkspace) { + const metadataPath = path.join(config.sessionsDir, continueWorkspace, "metadata.json"); + try { + const metadataContent = await fs.readFile(metadataPath, "utf-8"); + const metadata = JSON.parse(metadataContent) as { projectPath?: string }; + if (metadata.projectPath) { + projectDir = metadata.projectPath; + log.info(`Continuing workspace ${continueWorkspace}, using project path: ${projectDir}`); + } else { + projectDir = path.resolve(opts.dir); + log.warn(`No projectPath in metadata, using --dir: ${projectDir}`); + } + } catch { + // Metadata doesn't exist or is invalid, fall back to --dir + projectDir = path.resolve(opts.dir); + log.warn(`Could not read metadata for ${continueWorkspace}, using --dir: ${projectDir}`); + } + } else { + projectDir = path.resolve(opts.dir); + await ensureDirectory(projectDir); + } + const model: string = opts.model; const runtimeConfig = parseRuntimeConfig(opts.runtime, config.srcDir); const thinkingLevel = parseThinkingLevel(opts.thinking); @@ -333,11 +359,17 @@ async function main(): Promise { backgroundProcessManager, }); - await session.ensureMetadata({ - workspacePath: projectDir, - projectName: path.basename(projectDir), - runtimeConfig, - }); + // For continuing workspace, metadata should already exist + // For new workspace, create it + if (!continueWorkspace) { + await session.ensureMetadata({ + workspacePath: projectDir, + projectName: path.basename(projectDir), + runtimeConfig, + }); + } else { + log.info(`Continuing workspace ${workspaceId} - using existing metadata`); + } const buildSendOptions = (cliMode: CLIMode): SendMessageOptions => ({ model, diff --git a/src/common/orpc/schemas/stream.ts b/src/common/orpc/schemas/stream.ts index 84e6ee852a..c7db956103 100644 --- a/src/common/orpc/schemas/stream.ts +++ b/src/common/orpc/schemas/stream.ts @@ -201,6 +201,10 @@ export const ReasoningDeltaEventSchema = z.object({ delta: z.string(), tokens: z.number().meta({ description: "Token count for this delta" }), timestamp: z.number().meta({ description: "When delta was received (Date.now())" }), + signature: z + .string() + .optional() + .meta({ description: "Anthropic thinking block signature for replay" }), }); export const ReasoningEndEventSchema = z.object({ diff --git a/src/common/types/message.ts b/src/common/types/message.ts index 1b2853849b..a93b5b49d6 100644 --- a/src/common/types/message.ts +++ b/src/common/types/message.ts @@ -146,6 +146,24 @@ export interface MuxReasoningPart { type: "reasoning"; text: string; timestamp?: number; + /** + * Anthropic thinking block signature for replay. + * Required to send reasoning back to Anthropic - the API validates signatures + * to ensure thinking blocks haven't been tampered with. Reasoning without + * signatures will be stripped before sending to avoid "empty content" errors. + */ + signature?: string; + /** + * Provider options for SDK compatibility. + * When converting to ModelMessages via the SDK's convertToModelMessages, + * this is passed through. For Anthropic thinking blocks, this should contain + * { anthropic: { signature } } to allow reasoning replay. + */ + providerOptions?: { + anthropic?: { + signature?: string; + }; + }; } // File/Image part type for multimodal messages (matches AI SDK FileUIPart) diff --git a/src/node/services/aiService.ts b/src/node/services/aiService.ts index ce5dfe1871..125da82186 100644 --- a/src/node/services/aiService.ts +++ b/src/node/services/aiService.ts @@ -1202,11 +1202,26 @@ export class AIService extends EventEmitter { // Convert MuxMessage to ModelMessage format using Vercel AI SDK utility // Type assertion needed because MuxMessage has custom tool parts for interrupted tools // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-argument - const modelMessages = convertToModelMessages(sanitizedMessages as any, { + const rawModelMessages = convertToModelMessages(sanitizedMessages as any, { // Drop unfinished tool calls (input-streaming/input-available) so downstream // transforms only see tool calls that actually produced outputs. ignoreIncompleteToolCalls: true, }); + + // Self-healing: Filter out any empty ModelMessages that could brick the request. + // The SDK's ignoreIncompleteToolCalls can drop all parts from a message, leaving + // an assistant with empty content array. The API rejects these with "all messages + // must have non-empty content except for the optional final assistant message". + const modelMessages = rawModelMessages.filter((msg) => { + if (msg.role !== "assistant") return true; + if (typeof msg.content === "string") return msg.content.length > 0; + return Array.isArray(msg.content) && msg.content.length > 0; + }); + if (modelMessages.length < rawModelMessages.length) { + log.debug( + `Self-healing: Filtered ${rawModelMessages.length - modelMessages.length} empty ModelMessage(s)` + ); + } log.debug_obj(`${workspaceId}/2_model_messages.json`, modelMessages); // Apply ModelMessage transforms based on provider requirements diff --git a/src/node/services/partialService.test.ts b/src/node/services/partialService.test.ts index 728efd606e..c896ae95cc 100644 --- a/src/node/services/partialService.test.ts +++ b/src/node/services/partialService.test.ts @@ -157,6 +157,45 @@ describe("PartialService - Error Recovery", () => { expect(deletePartial).toHaveBeenCalledWith(workspaceId); }); + test("commitToHistory should skip tool-only incomplete partials", async () => { + const workspaceId = "test-workspace"; + const toolOnlyPartial: MuxMessage = { + id: "msg-1", + role: "assistant", + metadata: { + historySequence: 1, + timestamp: Date.now(), + model: "test-model", + partial: true, + error: "Stream interrupted", + errorType: "network", + }, + parts: [ + { + type: "dynamic-tool", + toolCallId: "call-1", + toolName: "bash", + state: "input-available", + input: { script: "echo test", timeout_secs: 10, display_name: "Test" }, + }, + ], + }; + + partialService.readPartial = mock(() => Promise.resolve(toolOnlyPartial)); + partialService.deletePartial = mock(() => Promise.resolve(Ok(undefined))); + mockHistoryService.getHistory = mock(() => Promise.resolve(Ok([]))); + + const result = await partialService.commitToHistory(workspaceId); + expect(result.success).toBe(true); + + const appendToHistory = mockHistoryService.appendToHistory as ReturnType; + const updateHistory = mockHistoryService.updateHistory as ReturnType; + expect(appendToHistory).not.toHaveBeenCalled(); + expect(updateHistory).not.toHaveBeenCalled(); + + const deletePartial = partialService.deletePartial as ReturnType; + expect(deletePartial).toHaveBeenCalledWith(workspaceId); + }); test("commitToHistory should skip empty errored partial", async () => { const workspaceId = "test-workspace"; const emptyErrorPartial: MuxMessage = { diff --git a/src/node/services/partialService.ts b/src/node/services/partialService.ts index 0f3677ed9e..64a7ac79a8 100644 --- a/src/node/services/partialService.ts +++ b/src/node/services/partialService.ts @@ -152,6 +152,31 @@ export class PartialService { } const existingMessages = historyResult.data; + + const hasCommitWorthyParts = (partial.parts ?? []).some((part) => { + if (part.type === "text") { + return part.text.length > 0; + } + + if (part.type === "reasoning") { + // Reasoning may be needed for provider-specific replay (e.g., Extended Thinking). + // It is real content and safe to persist. + return part.text.length > 0; + } + + if (part.type === "file") { + return true; + } + + if (part.type === "dynamic-tool") { + // Incomplete tool calls (input-available) are dropped when converting messages + // for provider requests (ignoreIncompleteToolCalls: true). Persisting a tool-only + // partial can brick future requests after a crash. + return part.state === "output-available"; + } + + return false; + }); const existingMessage = existingMessages.find( (msg) => msg.metadata?.historySequence === partialSeq ); @@ -159,7 +184,7 @@ export class PartialService { const shouldCommit = (!existingMessage || // No message with this sequence yet (partial.parts?.length ?? 0) > (existingMessage.parts?.length ?? 0)) && // Partial has more parts - (partial.parts?.length ?? 0) > 0; // Don't commit empty messages + hasCommitWorthyParts; // Don't commit tool-only incomplete placeholders if (shouldCommit) { if (existingMessage) { diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index fbe1966daa..63f781ee1e 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -49,6 +49,13 @@ globalThis.AI_SDK_LOG_WARNINGS = false; interface ReasoningDeltaPart { type: "reasoning-delta"; text?: string; + delta?: string; + providerMetadata?: { + anthropic?: { + signature?: string; + redactedData?: string; + }; + }; } // Branded types for compile-time safety @@ -480,6 +487,7 @@ export class StreamManager extends EventEmitter { delta: part.text, tokens, timestamp, + signature: part.signature, }); } else if (part.type === "dynamic-tool") { const inputText = JSON.stringify(part.input); @@ -926,18 +934,45 @@ export class StreamManager extends EventEmitter { case "reasoning-delta": { // Both Anthropic and OpenAI use reasoning-delta for streaming reasoning content - const delta = (part as ReasoningDeltaPart).text ?? ""; + const reasoningPart = part as ReasoningDeltaPart; + const delta = reasoningPart.text ?? reasoningPart.delta ?? ""; + const signature = reasoningPart.providerMetadata?.anthropic?.signature; + + // Signature deltas come separately with empty text - attach to last reasoning part + if (signature && !delta) { + const lastPart = streamInfo.parts.at(-1); + if (lastPart?.type === "reasoning") { + lastPart.signature = signature; + // Also set providerOptions for SDK compatibility when converting to ModelMessages + lastPart.providerOptions = { anthropic: { signature } }; + // Emit signature update event + this.emit("reasoning-delta", { + type: "reasoning-delta", + workspaceId: workspaceId as string, + messageId: streamInfo.messageId, + delta: "", + tokens: 0, + timestamp: Date.now(), + signature, + }); + void this.schedulePartialWrite(workspaceId, streamInfo); + } + break; + } // Append each delta as a new part (merging happens at display time) - const reasoningPart = { + // Include providerOptions for SDK compatibility when converting to ModelMessages + const newPart = { type: "reasoning" as const, text: delta, timestamp: Date.now(), + signature, // May be undefined, will be filled by subsequent signature delta + providerOptions: signature ? { anthropic: { signature } } : undefined, }; - streamInfo.parts.push(reasoningPart); + streamInfo.parts.push(newPart); // Emit using shared logic (ensures replay consistency) - await this.emitPartAsEvent(workspaceId, streamInfo.messageId, reasoningPart); + await this.emitPartAsEvent(workspaceId, streamInfo.messageId, newPart); void this.schedulePartialWrite(workspaceId, streamInfo); break; diff --git a/tests/ipc/emptyAssistantMessage.test.ts b/tests/ipc/emptyAssistantMessage.test.ts new file mode 100644 index 0000000000..0f9e7c3cdb --- /dev/null +++ b/tests/ipc/emptyAssistantMessage.test.ts @@ -0,0 +1,151 @@ +/** + * Test that corrupted chat history with empty assistant messages + * does not brick the workspace (self-healing behavior). + * + * Reproduction of: "messages.95: all messages must have non-empty content + * except for the optional final assistant message" + */ +import { setupWorkspace, shouldRunIntegrationTests, validateApiKeys } from "./setup"; +import { sendMessageWithModel, createStreamCollector, modelString, HAIKU_MODEL } from "./helpers"; +import { HistoryService } from "../../src/node/services/historyService"; +import { createMuxMessage } from "../../src/common/types/message"; + +// Skip all tests if TEST_INTEGRATION is not set +const describeIntegration = shouldRunIntegrationTests() ? describe : describe.skip; + +// Validate API keys before running tests +if (shouldRunIntegrationTests()) { + validateApiKeys(["ANTHROPIC_API_KEY"]); +} + +describeIntegration("empty assistant message self-healing", () => { + test.concurrent( + "should handle corrupted history with empty assistant parts array", + async () => { + const { env, workspaceId, cleanup } = await setupWorkspace("anthropic"); + try { + const historyService = new HistoryService(env.config); + + // Seed history that mimics a crash-corrupted chat.jsonl: + // 1. User message + // 2. Assistant message with content + // 3. User follow-up + // 4. Empty assistant message (crash during stream start - placeholder persisted) + const messages = [ + createMuxMessage("msg-1", "user", "Hello", {}), + createMuxMessage("msg-2", "assistant", "Hi there!", {}), + createMuxMessage("msg-3", "user", "Follow up question", {}), + // Corrupted: empty parts array (placeholder message from crash) + { + id: "msg-4-corrupted", + role: "assistant" as const, + parts: [], // Empty - this is the corruption + metadata: { + timestamp: Date.now(), + model: "anthropic:claude-haiku-4-5", + mode: "exec" as const, + historySequence: 3, + }, + }, + ]; + + // Write corrupted history directly + for (const msg of messages) { + const result = await historyService.appendToHistory(workspaceId, msg as any); + if (!result.success) { + throw new Error(`Failed to seed history: ${result.error}`); + } + } + + // Now try to send a new message - this should NOT fail with + // "all messages must have non-empty content" + const collector = createStreamCollector(env.orpc, workspaceId); + collector.start(); + + const sendResult = await sendMessageWithModel( + env, + workspaceId, + "This should work despite corrupted history", + HAIKU_MODEL + ); + + // The send should succeed (not fail due to corrupted history) + expect(sendResult.success).toBe(true); + + // Wait for stream to complete successfully + const streamEnd = await collector.waitForEvent("stream-end", 30000); + expect(streamEnd).toBeDefined(); + + collector.stop(); + } finally { + await cleanup(); + } + }, + 60000 + ); + + test.concurrent( + "should handle corrupted history with incomplete tool-only assistant message", + async () => { + const { env, workspaceId, cleanup } = await setupWorkspace("anthropic"); + try { + const historyService = new HistoryService(env.config); + + // Seed history with an assistant message that has only an incomplete tool call + // (state: "input-available" means tool was requested but never executed) + const messages = [ + createMuxMessage("msg-1", "user", "Run a command", {}), + // Corrupted: tool-only with incomplete state + { + id: "msg-2-corrupted", + role: "assistant" as const, + parts: [ + { + type: "dynamic-tool" as const, + toolName: "bash", + toolCallId: "call-123", + state: "input-available" as const, // Incomplete - will be dropped by SDK + input: { script: "echo hello" }, + }, + ], + metadata: { + timestamp: Date.now(), + model: "anthropic:claude-haiku-4-5", + mode: "exec" as const, + historySequence: 1, + }, + }, + ]; + + // Write corrupted history directly + for (const msg of messages) { + const result = await historyService.appendToHistory(workspaceId, msg as any); + if (!result.success) { + throw new Error(`Failed to seed history: ${result.error}`); + } + } + + // Now try to send a new message + const collector = createStreamCollector(env.orpc, workspaceId); + collector.start(); + + const sendResult = await sendMessageWithModel( + env, + workspaceId, + "This should work despite corrupted tool history", + HAIKU_MODEL + ); + + expect(sendResult.success).toBe(true); + + const streamEnd = await collector.waitForEvent("stream-end", 30000); + expect(streamEnd).toBeDefined(); + + collector.stop(); + } finally { + await cleanup(); + } + }, + 60000 + ); +});