From 03b8a31173f587be603313769a6584b9a06598de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Cruz?= Date: Fri, 5 Jun 2026 13:19:36 +0100 Subject: [PATCH] fix(session): retry empty stream truncations and discard partial parts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Detect provider stream truncation (finish reason "unknown" with zero output tokens) and retry it as a transient failure, capped at 3 attempts. On an EmptyOther retry — and when the retry cap is hit — discard the parts the failed attempt persisted (everything created after a per-call part floor) so the message reflects only the final attempt instead of accumulating an orphan step-start / partial text or reasoning. The discard is scoped to truncations; other retryable errors (rate limits, 5xx) retry untouched. Surface APIError instances through MessageV2.fromError so the TUI receives the structured message and metadata. Refs #14108 --- packages/opencode/src/session/message-v2.ts | 5 ++ packages/opencode/src/session/processor.ts | 57 +++++++++++++- packages/opencode/src/session/retry.ts | 20 ++++- packages/opencode/test/session/prompt.test.ts | 32 ++++++++ packages/opencode/test/session/retry.test.ts | 77 +++++++++++++++++++ 5 files changed, 188 insertions(+), 3 deletions(-) diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index c57fcd53c96f..f456274dc3a8 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -711,6 +711,11 @@ export function fromError( }, { cause: e }, ).toObject() + // Convert APIError class instances thrown via `Effect.fail(new APIError(...))` + // to their wire form so the TUI receives the structured message and metadata + // instead of being wrapped by the generic Error fallback below. + case APIError.isInstance(e): + return e instanceof Error ? e.toObject() : e case e instanceof Error: return new NamedError.Unknown({ message: errorMessage(e) }, { cause: e }).toObject() default: diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 907134df013a..9cbc4366d1f1 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -86,6 +86,10 @@ interface ProcessorContext extends Input { currentTextID: string | undefined reasoningMap: Record v2AssistantMessageID: SessionMessage.ID | undefined + // Part id created just before the current attempt begins; parts with a + // greater id were produced by the attempt and are discarded when it is + // retried after a stream truncation. + partFloor: PartID } type StreamEvent = LLMEvent @@ -128,6 +132,7 @@ export const layer = Layer.effect( currentTextID: undefined, reasoningMap: {}, v2AssistantMessageID: undefined, + partFloor: PartID.ascending(), } const mirrorAssistant = flags.experimentalEventSystem && !input.assistantMessage.summary let aborted = false @@ -395,7 +400,7 @@ export const layer = Layer.effect( time: { start: Date.now() }, metadata: value.providerMetadata, } - yield* session.updatePart(ctx.reasoningMap[value.id]) + yield* session.updatePart(ctx.reasoningMap[value.id]) return case "reasoning-delta": @@ -701,6 +706,20 @@ export const layer = Layer.effect( usage: value.usage ?? new Usage({}), metadata: value.providerMetadata, }) + // Detect stream truncation: the AI SDK reports the unmapped + // fallback reason when the upstream provider stream ends without a + // proper stop_reason. No usage and no output means the connection + // was cut mid-generation, which is a transient failure that should + // be retried. + if (value.reason === "unknown" && usage.tokens.output === 0) { + return yield* Effect.fail( + new SessionV1.APIError({ + message: "Provider stream ended without a stop reason", + isRetryable: true, + metadata: { code: "EmptyOther" }, + }), + ) + } if (!ctx.assistantMessage.summary) { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. if (mirrorAssistant) { @@ -846,6 +865,27 @@ export const layer = Layer.effect( } }) + // Discards every part the failed attempt persisted (anything created + // after partFloor) so a successful retry replaces rather than appends to + // the truncated content. The assistant message is created fresh per + // process() call, so the floor scopes removal to this attempt's output. + const discardAttempt = Effect.fn("SessionProcessor.discardAttempt")(function* () { + const existing = yield* MessageV2.parts(ctx.assistantMessage.id).pipe( + Effect.provideService(Database.Service, database), + ) + for (const part of existing) { + if (part.id <= ctx.partFloor) continue + yield* session.removePart({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.id, + partID: part.id, + }) + } + ctx.currentText = undefined + ctx.reasoningMap = {} + ctx.toolcalls = {} + }) + const cleanup = Effect.fn("SessionProcessor.cleanup")(function* () { if (ctx.snapshot) { const patch = yield* snapshot.patch(ctx.snapshot) @@ -933,6 +973,11 @@ export const layer = Layer.effect( yield* events.publish(Session.Event.Error, { sessionID: ctx.sessionID, error }) return } + // Retries are exhausted: drop the truncated attempt's partial parts so + // the failed message doesn't keep an orphan step-start / partial text. + if (SessionV1.APIError.isInstance(error) && error.data.metadata?.code === "EmptyOther") { + yield* discardAttempt() + } if (!ctx.assistantMessage.summary) { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. if (mirrorAssistant) { @@ -959,6 +1004,9 @@ export const layer = Layer.effect( slog.info("process") ctx.needsCompaction = false ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true + // Record the high-water mark before any attempt persists parts so a + // truncation retry can discard exactly this call's output. + ctx.partFloor = PartID.ascending() return yield* Effect.gen(function* () { yield* Effect.gen(function* () { @@ -1003,7 +1051,12 @@ export const layer = Layer.effect( timestamp: DateTime.makeUnsafe(Date.now()), }) : Effect.void - return flushV2Fragments().pipe( + // Only stream truncations leave partial parts worth discarding; + // other retryable errors (rate limits, 5xx) retry untouched. + const truncated = + SessionV1.APIError.isInstance(info.error) && info.error.data.metadata?.code === "EmptyOther" + return (truncated ? discardAttempt() : Effect.void).pipe( + Effect.andThen(flushV2Fragments()), Effect.andThen(event), Effect.andThen( status.set(ctx.sessionID, { diff --git a/packages/opencode/src/session/retry.ts b/packages/opencode/src/session/retry.ts index 4139665bd2bd..5cac46fb509d 100644 --- a/packages/opencode/src/session/retry.ts +++ b/packages/opencode/src/session/retry.ts @@ -176,13 +176,30 @@ function parseJSON(value: unknown) { export function policy(opts: { provider: string parse: (error: unknown) => Err - set: (input: { attempt: number; message: string; action?: Retryable["action"]; next: number }) => Effect.Effect + set: (input: { + attempt: number + message: string + action?: Retryable["action"] + next: number + error: Err + }) => Effect.Effect }) { return Schedule.fromStepWithMetadata( Effect.succeed((meta: Schedule.InputMetadata) => { const error = opts.parse(meta.input) const retry = retryable(error, opts.provider) if (!retry) return Cause.done(meta.attempt) + // Cap empty-other stream-truncation retries to avoid infinite loops if a + // provider keeps closing streams without a stop_reason. Other retryable + // classifications (rate limits, 5xx, ZlibError, etc.) keep their existing + // unbounded behaviour. + if ( + SessionV1.APIError.isInstance(error) && + error.data.metadata?.code === "EmptyOther" && + meta.attempt >= 3 + ) { + return Cause.done(meta.attempt) + } return Effect.gen(function* () { const wait = delay(meta.attempt, SessionV1.APIError.isInstance(error) ? error : undefined) const now = yield* Clock.currentTimeMillis @@ -191,6 +208,7 @@ export function policy(opts: { message: retry.message, action: retry.action, next: now + wait, + error, }) return [meta.attempt, Duration.millis(wait)] as [number, Duration.Duration] }) diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index cb9694b56dbd..4b3790d77c34 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -2316,3 +2316,35 @@ noLLMServer.instance( }), 30_000, ) + +it.instance("retry discards in-flight parts from the failed attempt", () => + Effect.gen(function* () { + const { llm } = yield* useServerConfig(providerCfg) + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ + title: "Discard test", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello" }], + }) + // Attempt 1: emit partial text but never a finish_reason. The AI SDK + // flushes with finishReason="other" and usage.outputTokens=0, which the + // processor catches as EmptyOther and triggers a retry. + yield* llm.push(reply().text("partial first attempt").item()) + yield* llm.push(reply().text("final answer").stop().item()) + + const result = yield* prompt.loop({ sessionID: chat.id }) + + expect(yield* llm.hits).toHaveLength(2) + const texts = result.parts.filter((p) => p.type === "text").map((p) => (p as SessionV1.TextPart).text) + expect(texts).toEqual(["final answer"]) + // The discarded attempt's step-start must be removed too, otherwise the + // message keeps an orphan step-start per retry. + expect(result.parts.filter((p) => p.type === "step-start")).toHaveLength(1) + }), +) diff --git a/packages/opencode/test/session/retry.test.ts b/packages/opencode/test/session/retry.test.ts index f5edf1af24b1..62b9f0cf5b89 100644 --- a/packages/opencode/test/session/retry.test.ts +++ b/packages/opencode/test/session/retry.test.ts @@ -11,6 +11,7 @@ import { ProviderError } from "../../src/provider/error" import { SessionID } from "../../src/session/schema" import { SessionStatus } from "../../src/session/status" import { testEffect } from "../lib/effect" +import { provideTmpdirInstance } from "../fixture/fixture" import { ProviderV2 } from "@opencode-ai/core/provider" const providerID = ProviderV2.ID.make("test") @@ -343,6 +344,63 @@ describe("session.retry.retryable", () => { "Usage limit reached. It will reset in 15 minutes. To continue using this model now, enable usage from your available balance", ) }) + + test("retries EmptyOther stream truncation failures", () => { + const error = new SessionV1.APIError({ + message: "Provider stream ended without a stop reason", + isRetryable: true, + metadata: { code: "EmptyOther" }, + }).toObject() as SessionV1.APIError + + expect(SessionRetry.retryable(error, retryProvider)).toEqual({ + message: "Provider stream ended without a stop reason", + }) + }) + + it.live("policy stops retrying EmptyOther after 3 attempts", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const sessionID = SessionID.make("session-empty-other-test") + // retry-after-ms=0 keeps the test fast; the cap is driven by metadata.code. + const error = new SessionV1.APIError({ + message: "Provider stream ended without a stop reason", + isRetryable: true, + metadata: { code: "EmptyOther" }, + responseHeaders: { "retry-after-ms": "0" }, + }).toObject() as SessionV1.APIError + const status = yield* SessionStatus.Service + + const step = yield* Schedule.toStepWithMetadata( + SessionRetry.policy({ + provider: retryProvider, + parse: (err) => err as SessionV1.APIError, + set: (info) => + status.set(sessionID, { + type: "retry", + attempt: info.attempt, + message: info.message, + next: info.next, + }), + }), + ) + // attempt=1 and attempt=2 run normally and update status. + yield* step(error) + yield* step(error) + // attempt=3 hits the EmptyOther cap and signals Cause.done. + // Effect.exit captures the schedule termination so it doesn't + // leak as an unhandled failure. + const thirdExit = yield* Effect.exit(step(error)) + + expect(thirdExit._tag).toBe("Failure") + + expect(yield* status.get(sessionID)).toMatchObject({ + type: "retry", + attempt: 2, + message: "Provider stream ended without a stop reason", + }) + }), + ), + ) }) describe("session.message-v2.fromError", () => { @@ -397,6 +455,25 @@ describe("session.message-v2.fromError", () => { expect(retryable).toEqual({ message: "Connection reset by server" }) }) + test("converts APIError class instances to wire form for storage", () => { + // The processor throws via `yield* new SessionV1.APIError(...)`; fromError + // must convert the class instance to its wire form so the TUI renders the + // structured message and metadata rather than a JSON-stringified + // UnknownError wrapper. + const thrown = new SessionV1.APIError({ + message: "Provider stream ended without a stop reason", + isRetryable: true, + metadata: { code: "EmptyOther" }, + }) + + const result = MessageV2.fromError(thrown, { providerID }) + + expect(SessionV1.APIError.isInstance(result)).toBe(true) + expect((result as SessionV1.APIError).data.message).toBe("Provider stream ended without a stop reason") + expect((result as SessionV1.APIError).data.metadata?.code).toBe("EmptyOther") + expect((result as { name: string }).name).toBe("APIError") + }) + test("marks OpenAI 404 status codes as retryable", () => { const error = new APICallError({ message: "boom",