diff --git a/packages/llm/src/route/client.ts b/packages/llm/src/route/client.ts index 2d9de2fd39dd..6712cde42620 100644 --- a/packages/llm/src/route/client.ts +++ b/packages/llm/src/route/client.ts @@ -1,7 +1,8 @@ -import { Cause, Context, Effect, Layer, Schema, Stream } from "effect" +import { Cause, Context, Effect, Layer, Queue, Schema, Stream } from "effect" import type { Auth as AuthDef } from "./auth" import type { Endpoint } from "./endpoint" import { RequestExecutor } from "./executor" +import type { RetryInfo } from "./executor" import type { Framing } from "./framing" import { HttpTransport } from "./transport" import type { Transport, TransportRuntime } from "./transport" @@ -12,10 +13,11 @@ import { applyCachePolicy } from "../cache-policy" import * as ProviderShared from "../protocols/shared" import * as ToolRuntime from "../tool-runtime" import type { Tools } from "../tool" -import type { LLMError, LLMEvent, PreparedRequestOf, ProtocolID } from "../schema" +import type { LLMError, PreparedRequestOf, ProtocolID } from "../schema" import { GenerationOptions, HttpOptions, + LLMEvent, LLMRequest, LLMResponse, ModelID, @@ -439,6 +441,50 @@ const streamRequestWith = (runtime: TransportRuntime) => (request: LLMRequest) = }), ) +const retryEvent = (info: RetryInfo) => + Effect.sync(() => { + const http = "http" in info.error.reason ? info.error.reason.http : undefined + return LLMEvent.retry({ + attempt: info.attempt, + delayMs: info.delayMs, + message: info.error.reason.message, + providerMetadata: { + opencode: { + module: info.error.module, + method: info.error.method, + reason: info.error.reason._tag, + status: ("status" in info.error.reason ? info.error.reason.status : undefined) ?? http?.response?.status, + }, + }, + }) + }) + +const streamRequestObservableWith = (runtime: TransportRuntime) => (request: LLMRequest) => + Stream.callback((queue) => + Effect.gen(function* () { + const pull = yield* Stream.toPull( + streamRequestWith({ + ...runtime, + httpOptions: { + onRetry: (info) => retryEvent(info).pipe(Effect.map((event) => Queue.offerUnsafe(queue, event))), + }, + })(request), + ) + + const drain: Effect.Effect = Effect.gen(function* () { + const chunk = yield* pull + for (const event of chunk) Queue.offerUnsafe(queue, event) + return yield* drain + }).pipe( + Effect.catchIf(Cause.isDone, () => Effect.sync(() => Queue.endUnsafe(queue))), + Effect.catchTag("LLM.Error", (error) => Queue.fail(queue, error)), + ) + + yield* drain + }), + ) + + const isToolRunOptions = (input: LLMRequest | ToolRuntime.RunOptions): input is ToolRuntime.RunOptions => "request" in input && "tools" in input @@ -495,7 +541,7 @@ export const streamRequest = (request: LLMRequest) => export const layer: Layer.Layer = Layer.effect( Service, Effect.gen(function* () { - const stream = streamWith(streamRequestWith({ http: yield* RequestExecutor.Service })) + const stream = streamWith(streamRequestObservableWith({ http: yield* RequestExecutor.Service })) return Service.of({ prepare: prepareWith as Interface["prepare"], stream, generate: generateWith(stream) }) }), ) @@ -505,7 +551,7 @@ export const layerWithWebSocket: Layer.Layer Effect.Effect } +export type RetryInfo = { + readonly attempt: number + readonly delayMs: number + readonly error: LLMError +} + +export type ExecuteOptions = { + readonly onRetry?: (info: RetryInfo) => Effect.Effect +} + export class Service extends Context.Service()("@opencode/LLM/RequestExecutor") {} const BODY_LIMIT = 16_384 @@ -341,14 +352,16 @@ const retryDelay = (error: LLMError, attempt: number) => { const retryStatusFailures = ( effect: Effect.Effect, + options: ExecuteOptions | undefined, retries = MAX_RETRIES, attempt = 0, ): Effect.Effect => Effect.catchTag(effect, "LLM.Error", (error): Effect.Effect => { if (!error.retryable || retries <= 0) return Effect.fail(error) return retryDelay(error, attempt).pipe( + Effect.tap((delay) => options?.onRetry?.({ attempt: attempt + 1, delayMs: delay, error }) ?? Effect.void), Effect.flatMap((delay) => Effect.sleep(delay)), - Effect.flatMap(() => retryStatusFailures(effect, retries - 1, attempt + 1)), + Effect.flatMap(() => retryStatusFailures(effect, options, retries - 1, attempt + 1)), ) }) @@ -364,7 +377,7 @@ export const layer: Layer.Layer = Layer.e .pipe(Effect.mapError(toHttpError(redactedNames)), Effect.flatMap(statusError(request, redactedNames))) }) return Service.of({ - execute: (request) => retryStatusFailures(executeOnce(request)), + execute: (request, options) => retryStatusFailures(executeOnce(request), options), }) }), ) diff --git a/packages/llm/src/route/transport/http.ts b/packages/llm/src/route/transport/http.ts index 2159ce90b0f0..5016deaa0a53 100644 --- a/packages/llm/src/route/transport/http.ts +++ b/packages/llm/src/route/transport/http.ts @@ -102,7 +102,7 @@ export const httpJson = (input: HttpJsonInput): HttpJs frames: (prepared, request, runtime) => Stream.unwrap( runtime.http - .execute(prepared.request) + .execute(prepared.request, runtime.httpOptions) .pipe( Effect.map((response) => prepared.framing.frame( diff --git a/packages/llm/src/route/transport/index.ts b/packages/llm/src/route/transport/index.ts index f4d5fb29b7f6..565db12e136e 100644 --- a/packages/llm/src/route/transport/index.ts +++ b/packages/llm/src/route/transport/index.ts @@ -1,11 +1,12 @@ import type { Effect, Stream } from "effect" -import type { Interface as RequestExecutorInterface } from "../executor" +import type { ExecuteOptions, Interface as RequestExecutorInterface } from "../executor" import type { Interface as WebSocketExecutorInterface } from "./websocket" import type { LLMError, LLMRequest } from "../../schema" export interface TransportRuntime { readonly http: RequestExecutorInterface readonly webSocket?: WebSocketExecutorInterface + readonly httpOptions?: ExecuteOptions } export interface Transport { diff --git a/packages/llm/src/schema/errors.ts b/packages/llm/src/schema/errors.ts index 39bf5b6252d1..6dbeb6aadfd3 100644 --- a/packages/llm/src/schema/errors.ts +++ b/packages/llm/src/schema/errors.ts @@ -123,7 +123,7 @@ export class TransportReason extends Schema.Class("LLM.Error.Tr http: Schema.optional(HttpContext), }) { get retryable() { - return false + return this.kind === "TransportError" || this.kind === "Timeout" } } diff --git a/packages/llm/src/schema/events.ts b/packages/llm/src/schema/events.ts index cee489a689e3..8c759ab6156c 100644 --- a/packages/llm/src/schema/events.ts +++ b/packages/llm/src/schema/events.ts @@ -203,6 +203,15 @@ export const ProviderErrorEvent = Schema.Struct({ }).annotate({ identifier: "LLM.Event.ProviderError" }) export type ProviderErrorEvent = Schema.Schema.Type +export const RetryEvent = Schema.Struct({ + type: Schema.tag("retry"), + attempt: Schema.Number, + delayMs: Schema.Number, + message: Schema.String, + providerMetadata: Schema.optional(ProviderMetadata), +}).annotate({ identifier: "LLM.Event.Retry" }) +export type RetryEvent = Schema.Schema.Type + const llmEventTagged = Schema.Union([ StepStart, TextStart, @@ -220,6 +229,7 @@ const llmEventTagged = Schema.Union([ StepFinish, Finish, ProviderErrorEvent, + RetryEvent, ]).pipe(Schema.toTaggedUnion("type")) type WithID = Omit & { readonly id: ID | string } @@ -265,6 +275,7 @@ export const LLMEvent = Object.assign(llmEventTagged, { usage: input.usage === undefined ? undefined : Usage.from(input.usage), }), providerError: ProviderErrorEvent.make, + retry: RetryEvent.make, is: { stepStart: llmEventTagged.guards["step-start"], textStart: llmEventTagged.guards["text-start"], @@ -282,6 +293,7 @@ export const LLMEvent = Object.assign(llmEventTagged, { stepFinish: llmEventTagged.guards["step-finish"], finish: llmEventTagged.guards.finish, providerError: llmEventTagged.guards["provider-error"], + retry: llmEventTagged.guards.retry, }, }) export type LLMEvent = Schema.Schema.Type diff --git a/packages/llm/test/executor.test.ts b/packages/llm/test/executor.test.ts index b294606ff34f..ad8540484c7d 100644 --- a/packages/llm/test/executor.test.ts +++ b/packages/llm/test/executor.test.ts @@ -1,12 +1,12 @@ import { describe, expect } from "bun:test" -import { Effect, Fiber, Layer, Random, Ref } from "effect" +import { Effect, Fiber, Layer, Random, Ref, Stream } from "effect" import * as TestClock from "effect/testing/TestClock" -import { Headers, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http" -import { LLM, LLMError } from "../src" +import { Headers, HttpClient, HttpClientError, HttpClientRequest, HttpClientResponse } from "effect/unstable/http" +import { LLM, LLMError, TransportReason } from "../src" import { LLMClient, RequestExecutor } from "../src/route" import * as OpenAIChat from "../src/protocols/openai-chat" -import { dynamicResponse } from "./lib/http" -import { deltaChunk } from "./lib/openai-chunks" +import { dynamicResponse, runtimeLayer } from "./lib/http" +import { deltaChunk, finishChunk } from "./lib/openai-chunks" import { sseRaw } from "./lib/sse" import { it } from "./lib/effect" @@ -18,23 +18,26 @@ const secretRequest = HttpClientRequest.post("https://provider.test/v1/chat?api_ HttpClientRequest.setHeaders(Headers.fromInput({ authorization: "Bearer header-secret-456" })), ) +const httpResponsesLayer = (responses: ReadonlyArray) => + Layer.unwrap( + Effect.gen(function* () { + const cursor = yield* Ref.make(0) + return Layer.succeed( + HttpClient.HttpClient, + HttpClient.make((request) => + Effect.gen(function* () { + const index = yield* Ref.getAndUpdate(cursor, (value) => value + 1) + return HttpClientResponse.fromWeb(request, responses[index] ?? responses[responses.length - 1]) + }), + ), + ) + }), + ) + const responsesLayer = (responses: ReadonlyArray) => RequestExecutor.layer.pipe( Layer.provide( - Layer.unwrap( - Effect.gen(function* () { - const cursor = yield* Ref.make(0) - return Layer.succeed( - HttpClient.HttpClient, - HttpClient.make((request) => - Effect.gen(function* () { - const index = yield* Ref.getAndUpdate(cursor, (value) => value + 1) - return HttpClientResponse.fromWeb(request, responses[index] ?? responses[responses.length - 1]) - }), - ), - ) - }), - ), + httpResponsesLayer(responses), ), ) @@ -59,6 +62,30 @@ const countedResponsesLayer = (attempts: Ref.Ref, responses: ReadonlyArr ), ) +const transportThenResponseLayer = (attempts: Ref.Ref, response: Response) => + RequestExecutor.layer.pipe( + Layer.provide( + Layer.succeed( + HttpClient.HttpClient, + HttpClient.make((request) => + Ref.getAndUpdate(attempts, (value) => value + 1).pipe( + Effect.flatMap((index) => + index === 0 + ? Effect.fail( + new HttpClientError.HttpClientError({ + reason: new HttpClientError.TransportError({ + request, + }), + }), + ) + : Effect.succeed(HttpClientResponse.fromWeb(request, response)), + ), + ), + ), + ), + ), + ) + const randomMidpoint = { nextDoubleUnsafe: () => 0.5, nextIntUnsafe: () => 0, @@ -73,6 +100,23 @@ const expectLLMError = (error: unknown) => { const errorHttp = (error: LLMError) => ("http" in error.reason ? error.reason.http : undefined) describe("RequestExecutor", () => { + it.effect("only marks transient transport reasons retryable", () => + Effect.sync(() => { + const retryable = (kind: string | undefined) => + new LLMError({ + module: "RequestExecutor", + method: "execute", + reason: new TransportReason({ message: "HTTP transport failed", kind }), + }).retryable + + expect(retryable("TransportError")).toBe(true) + expect(retryable("Timeout")).toBe(true) + expect(retryable("EncodeError")).toBe(false) + expect(retryable("InvalidUrlError")).toBe(false) + expect(retryable(undefined)).toBe(false) + }), + ) + it.effect("returns redacted diagnostics for retryable rate limits", () => Effect.gen(function* () { const executor = yield* RequestExecutor.Service @@ -225,6 +269,57 @@ describe("RequestExecutor", () => { ), ) + it.effect("reports retry attempts before retrying status responses", () => + Effect.gen(function* () { + const executor = yield* RequestExecutor.Service + const retries: RequestExecutor.RetryInfo[] = [] + const response = yield* executor.execute(request, { + onRetry: (info) => Effect.sync(() => retries.push(info)), + }) + + expect(response.status).toBe(200) + expect(retries).toHaveLength(1) + expect(retries[0].attempt).toBe(1) + expect(retries[0].delayMs).toBe(0) + expect(retries[0].error.reason).toMatchObject({ _tag: "ProviderInternal", status: 503 }) + }).pipe( + Effect.provide( + responsesLayer([ + new Response("busy", { status: 503, headers: { "retry-after-ms": "0" } }), + new Response("ok", { status: 200 }), + ]), + ), + ), + ) + + it.effect("emits retry events before silently retried stream requests", () => + Effect.gen(function* () { + const model = OpenAIChat.model({ id: "gpt-4o-mini", baseURL: "https://api.openai.test/v1" }) + const events = yield* LLMClient.stream(LLM.request({ model, prompt: "Say hello." })).pipe( + Stream.runCollect, + Effect.provide( + runtimeLayer( + httpResponsesLayer([ + new Response("busy", { status: 503, headers: { "retry-after-ms": "0" } }), + new Response( + sseRaw( + `data: ${JSON.stringify(deltaChunk({ role: "assistant", content: "Hello" }))}`, + `data: ${JSON.stringify(finishChunk("stop"))}`, + ), + { + headers: { "content-type": "text/event-stream" }, + }, + ), + ]), + ), + ), + ) + + expect(events[0]).toMatchObject({ type: "retry", attempt: 1, delayMs: 0 }) + expect(events.some((event) => event.type === "text-delta")).toBe(true) + }), + ) + it.effect("marks 504 and 529 status responses retryable", () => Effect.gen(function* () { const failWith = (status: number) => @@ -385,6 +480,30 @@ describe("RequestExecutor", () => { }).pipe(Effect.provideService(Random.Random, randomMidpoint)), ) + it.effect("retries retryable transport failures before returning the stream", () => + Effect.gen(function* () { + const attempts = yield* Ref.make(0) + return yield* Effect.gen(function* () { + const executor = yield* RequestExecutor.Service + const fiber = yield* executor.execute(request).pipe(Effect.forkChild) + + yield* Effect.yieldNow + expect(yield* Ref.get(attempts)).toBe(1) + + yield* TestClock.adjust(499) + yield* Effect.yieldNow + expect(yield* Ref.get(attempts)).toBe(1) + + yield* TestClock.adjust(1) + const response = yield* Fiber.join(fiber) + + expect(response.status).toBe(200) + expect(yield* response.text).toBe("ok") + expect(yield* Ref.get(attempts)).toBe(2) + }).pipe(Effect.provide(transportThenResponseLayer(attempts, new Response("ok", { status: 200 })))) + }).pipe(Effect.provideService(Random.Random, randomMidpoint)), + ) + it.effect("does not retry after a successful response reaches stream parsing", () => Effect.gen(function* () { const attempts = yield* Ref.make(0) diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index d3d6a1dfcc10..e018258e1a86 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -2,6 +2,7 @@ import { BusEvent } from "@/bus/bus-event" import { SessionID, MessageID, PartID } from "./schema" import { NamedError } from "@opencode-ai/core/util/error" import { APICallError, convertToModelMessages, LoadAPIKeyError, type ModelMessage, type UIMessage } from "ai" +import { LLMError } from "@opencode-ai/llm" import { LSP } from "@/lsp/lsp" import { Snapshot } from "@/snapshot" import { SyncEvent } from "../sync" @@ -1168,6 +1169,19 @@ export function fromError( }, { cause: e }, ).toObject() + case e instanceof LLMError: + const http = "http" in e.reason ? e.reason.http : undefined + return new APIError( + { + message: e.reason.message, + statusCode: ("status" in e.reason ? e.reason.status : undefined) ?? http?.response?.status, + isRetryable: e.retryable, + responseHeaders: http?.response?.headers, + responseBody: http?.body, + metadata: http?.request.url ? { url: http.request.url } : undefined, + }, + { cause: e }, + ).toObject() case e instanceof Error: return new NamedError.Unknown({ message: errorMessage(e) }, { cause: e }).toObject() default: diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 5466ed00b32b..29a4bc3ac1f5 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -303,6 +303,15 @@ export const layer = Layer.effect( const handleEvent = Effect.fnUntraced(function* (value: StreamEvent) { switch (value.type) { + case "retry": + yield* status.set(ctx.sessionID, { + type: "retry", + attempt: value.attempt, + message: value.message, + next: Date.now() + value.delayMs, + }) + return + case "reasoning-start": if (value.id in ctx.reasoningMap) return // TODO(v2): Temporary dual-write while migrating session messages to v2 events. diff --git a/packages/opencode/test/session/message-v2.test.ts b/packages/opencode/test/session/message-v2.test.ts index 82bed0e9cc6f..7c0631c2b39c 100644 --- a/packages/opencode/test/session/message-v2.test.ts +++ b/packages/opencode/test/session/message-v2.test.ts @@ -1,5 +1,6 @@ import { describe, expect, test } from "bun:test" import { APICallError } from "ai" +import { HttpContext, HttpRequestDetails, HttpResponseDetails, LLMError, TransportReason } from "@opencode-ai/llm" import { MessageV2 } from "../../src/session/message-v2" import { ProviderTransform } from "@/provider/transform" import type { Provider } from "@/provider/provider" @@ -1546,6 +1547,69 @@ describe("session.message-v2.fromError", () => { expect(result.name).toBe("MessageAbortedError") }) + + test("classifies native LLM transport errors as retryable APIError", () => { + const result = MessageV2.fromError( + new LLMError({ + module: "RequestExecutor", + method: "execute", + reason: new TransportReason({ + message: "HTTP transport failed", + kind: "TransportError", + url: "https://provider.test/v1/chat", + http: new HttpContext({ + request: new HttpRequestDetails({ + method: "POST", + url: "https://provider.test/v1/chat", + headers: { "x-safe": "visible" }, + }), + }), + }), + }), + { providerID }, + ) + + expect(MessageV2.APIError.isInstance(result)).toBe(true) + expect((result as MessageV2.APIError).data).toMatchObject({ + message: "HTTP transport failed", + isRetryable: true, + metadata: { url: "https://provider.test/v1/chat" }, + }) + }) + + test("preserves native LLM HTTP response metadata for retry scheduling", () => { + const result = MessageV2.fromError( + new LLMError({ + module: "RequestExecutor", + method: "execute", + reason: new TransportReason({ + message: "HTTP transport failed", + kind: "TransportError", + http: new HttpContext({ + request: new HttpRequestDetails({ + method: "POST", + url: "https://provider.test/v1/chat", + headers: {}, + }), + response: new HttpResponseDetails({ + status: 503, + headers: { "retry-after-ms": "1000" }, + }), + body: "busy", + }), + }), + }), + { providerID }, + ) + + expect(MessageV2.APIError.isInstance(result)).toBe(true) + expect((result as MessageV2.APIError).data).toMatchObject({ + statusCode: 503, + isRetryable: true, + responseHeaders: { "retry-after-ms": "1000" }, + responseBody: "busy", + }) + }) }) describe("session.message-v2.latest", () => {