From 770e9b3d593ed691a92a5faf62e30e6f2947b8eb Mon Sep 17 00:00:00 2001 From: Tim Richardson Date: Thu, 28 May 2026 11:42:31 +1000 Subject: [PATCH] fix(openai): improve websocket retry handling --- .../cli/cmd/tui/component/prompt/index.tsx | 32 ++++++- packages/opencode/src/plugin/index.ts | 32 ++++++- packages/opencode/src/plugin/openai/codex.ts | 7 +- .../opencode/src/plugin/openai/ws-pool.ts | 53 ++++++++-- packages/opencode/src/plugin/openai/ws.ts | 29 +++++- .../opencode/test/plugin/openai-ws.test.ts | 96 +++++++++++++++++++ 6 files changed, 225 insertions(+), 24 deletions(-) diff --git a/packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx b/packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx index 28b1f62f697f..d8a20d65b731 100644 --- a/packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx +++ b/packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx @@ -93,6 +93,7 @@ const money = new Intl.NumberFormat("en-US", { }) const DRAFT_RETENTION_MIN_CHARS = 20 +const RETRY_NOTICE_MIN_VISIBLE_MS = 1000 function randomIndex(count: number) { if (count <= 0) return 0 @@ -148,6 +149,29 @@ export function Prompt(props: PromptProps) { const dialog = useDialog() const toast = useToast() const status = createMemo(() => sync.data.session_status?.[props.sessionID ?? ""] ?? { type: "idle" }) + const [visibleRetry, setVisibleRetry] = createSignal>() + let retryNoticeTimer: NodeJS.Timeout | undefined + createEffect(() => { + const current = status() + if (current.type !== "retry") return + setVisibleRetry(current) + if (retryNoticeTimer) clearTimeout(retryNoticeTimer) + retryNoticeTimer = setTimeout(() => { + retryNoticeTimer = undefined + setVisibleRetry(undefined) + }, RETRY_NOTICE_MIN_VISIBLE_MS) + retryNoticeTimer.unref() + }) + onCleanup(() => { + if (retryNoticeTimer) clearTimeout(retryNoticeTimer) + }) + const displayStatus = createMemo(() => { + const current = status() + if (current.type === "retry") return current + const retry = visibleRetry() + if (retry?.type === "retry") return retry + return current + }) const history = usePromptHistory() const stash = usePromptStash() const keymap = useOpencodeKeymap() @@ -1631,12 +1655,12 @@ export function Prompt(props: PromptProps) { - + @@ -1647,7 +1671,7 @@ export function Prompt(props: PromptProps) { {(() => { const retry = createMemo(() => { - const s = status() + const s = displayStatus() if (s.type !== "retry") return return s }) @@ -1748,7 +1772,7 @@ export function Prompt(props: PromptProps) { {props.hint ?? } - + {(file) => ( diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 717dff8db268..1dfe3948c55c 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -10,8 +10,10 @@ import { Bus } from "../bus" import * as Log from "@opencode-ai/core/util/log" import { createOpencodeClient } from "@opencode-ai/sdk" import { ServerAuth } from "@/server/auth" -import { CodexAuthPlugin } from "./openai/codex" +import { CodexAuthPlugin, type CodexAuthPluginOptions } from "./openai/codex" import { Session } from "@/session/session" +import { SessionID } from "@/session/schema" +import { SessionStatus } from "@/session/status" import { NamedError } from "@opencode-ai/core/util/error" import { CopilotAuthPlugin } from "./github-copilot/copilot" import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth" @@ -63,12 +65,13 @@ export function experimentalWebSocketsEnabled(input: { enabled: boolean; channel } // Built-in plugins that are directly imported (not installed from npm) -function internalPlugins(flags: RuntimeFlags.Info): PluginInstance[] { +function internalPlugins(flags: RuntimeFlags.Info, options?: Pick): PluginInstance[] { return [ // Temporary rollout: pre-release builds use WebSockets by default; releases require explicit opt-in. (input) => CodexAuthPlugin(input, { experimentalWebSockets: experimentalWebSocketsEnabled({ enabled: flags.experimentalWebSockets }), + onWebSocketRetry: options?.onWebSocketRetry, }), CopilotAuthPlugin, GitlabAuthPlugin, @@ -162,7 +165,30 @@ export const layer = Layer.effect( $: typeof Bun === "undefined" ? undefined : Bun.$, } - for (const plugin of flags.disableDefaultPlugins ? [] : internalPlugins(flags)) { + const builtinPlugins = flags.disableDefaultPlugins + ? [] + : internalPlugins(flags, { + onWebSocketRetry(retry) { + bridge.fork( + Effect.sync(() => SessionID.make(retry.sessionID)).pipe( + Effect.flatMap((sessionID) => + bus.publish(SessionStatus.Event.Status, { + sessionID, + status: { + type: "retry", + attempt: retry.attempt, + message: retry.message, + next: retry.next, + }, + }), + ), + Effect.ignore, + ), + ) + }, + }) + + for (const plugin of builtinPlugins) { log.info("loading internal plugin", { name: plugin.name }) const init = yield* Effect.tryPromise({ try: () => plugin(input), diff --git a/packages/opencode/src/plugin/openai/codex.ts b/packages/opencode/src/plugin/openai/codex.ts index 7ed48d087959..334549c65b91 100644 --- a/packages/opencode/src/plugin/openai/codex.ts +++ b/packages/opencode/src/plugin/openai/codex.ts @@ -106,10 +106,13 @@ interface TokenResponse { expires_in?: number } -interface CodexAuthPluginOptions { +type CreateWebSocketFetchOptions = NonNullable[0]> + +export interface CodexAuthPluginOptions { issuer?: string codexApiEndpoint?: string experimentalWebSockets?: boolean + onWebSocketRetry?: CreateWebSocketFetchOptions["onRetry"] } async function exchangeCodeForTokens(code: string, redirectUri: string, pkce: PkceCodes): Promise { @@ -407,7 +410,7 @@ export async function CodexAuthPlugin(input: PluginInput, options: CodexAuthPlug async loader(getAuth) { const auth = await getAuth() const websocketFetch = options.experimentalWebSockets - ? OpenAIWebSocketPool.createWebSocketFetch({ httpFetch: fetch }) + ? OpenAIWebSocketPool.createWebSocketFetch({ httpFetch: fetch, onRetry: options.onWebSocketRetry }) : undefined if (websocketFetch) { websocketFetches.push(websocketFetch) diff --git a/packages/opencode/src/plugin/openai/ws-pool.ts b/packages/opencode/src/plugin/openai/ws-pool.ts index 0af4c57c9110..47d56fb6c165 100644 --- a/packages/opencode/src/plugin/openai/ws-pool.ts +++ b/packages/opencode/src/plugin/openai/ws-pool.ts @@ -14,6 +14,14 @@ export interface CreateWebSocketFetchOptions { idleTimeout?: number maxConnectionAge?: number connectionLimitRetries?: number + onRetry?: (input: WebSocketTransportRetry) => void +} + +export interface WebSocketTransportRetry { + sessionID: string + attempt: number + message: string + next: number } interface PoolEntry { @@ -86,8 +94,19 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { entry.busy = true entry.lastUsedAt = Date.now() + let transportRetryAttempts = 0 + function notifyRetry(message: string) { + transportRetryAttempts++ + try { + options?.onRetry?.({ sessionID, attempt: transportRetryAttempts, message, next: Date.now() }) + } catch (error) { + log.warn("websocket retry notice failed", { key, error: error instanceof Error ? error.message : String(error) }) + } + } + try { let connectionLimitAttempts = 0 + let hasCommitted = false entry.socket = await socket( entry, options?.url ?? url, @@ -96,18 +115,21 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { maxConnectionAge, init?.signal, ) - let resolveFirstEvent: (started: boolean) => void = () => {} - let rejectFirstEvent: (error: Error) => void = () => {} - const firstEvent = new Promise((resolve, reject) => { - resolveFirstEvent = resolve - rejectFirstEvent = reject + let resolveCommitted: (started: boolean) => void = () => {} + let rejectCommitted: (error: Error) => void = () => {} + const committed = new Promise((resolve, reject) => { + resolveCommitted = resolve + rejectCommitted = reject }) const response = OpenAIWebSocket.streamResponsesWebSocket({ socket: entry.socket, body, idleTimeout, signal: init?.signal ?? undefined, - onFirstEvent: () => resolveFirstEvent(true), + onCommitted: () => { + hasCommitted = true + resolveCommitted(true) + }, onTerminal: (event) => { entry.busy = false entry.lastUsedAt = Date.now() @@ -121,14 +143,15 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { entry.busy = false entry.fallback = true invalidate(entry) - resolveFirstEvent(false) + if (!hasCommitted) notifyRetry(retryMessage(error)) + resolveCommitted(false) }, onAbort: (error) => { log.debug("websocket aborted", { key }) entry.busy = false entry.lastUsedAt = Date.now() invalidate(entry) - rejectFirstEvent(error) + rejectCommitted(error) }, onRetryableTerminal: async (event) => { const error = connectionLimitError(event) @@ -137,6 +160,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { connectionLimitAttempts++ log.warn("websocket connection limit reached", { key, attempt: connectionLimitAttempts }) + notifyRetry("WebSocket connection limit reached; retrying request") invalidate(entry) entry.socket = await socket( entry, @@ -150,8 +174,8 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { return entry.socket }, }) - if (await firstEvent) return response - log.debug("http fallback", { key, reason: "websocket_failed_before_first_event" }) + if (await committed) return response + log.debug("http fallback", { key, reason: "websocket_failed_before_committed_output" }) return httpFetch(input, httpInit) } catch (error) { entry.busy = false @@ -167,6 +191,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { error: error instanceof Error ? error.message : String(error), fallback: "http", }) + notifyRetry(retryMessage(error)) invalidate(entry) return httpFetch(input, httpInit) } @@ -198,6 +223,14 @@ function connectionLimitError(event: Record) { return new Error(typeof event.error.message === "string" ? event.error.message : CONNECTION_LIMIT_REACHED_CODE) } +function retryMessage(error: unknown) { + const message = error instanceof Error ? error.message : String(error) + const lower = message.toLowerCase() + if (lower.includes("timeout") || lower.includes("timed out")) return "WebSocket timed out; retrying request" + if (lower.includes("closed")) return "WebSocket disconnected; retrying request" + return "WebSocket failed; retrying request" +} + async function socket( entry: PoolEntry, url: string, diff --git a/packages/opencode/src/plugin/openai/ws.ts b/packages/opencode/src/plugin/openai/ws.ts index 7176ab210422..0bd85141c4a2 100644 --- a/packages/opencode/src/plugin/openai/ws.ts +++ b/packages/opencode/src/plugin/openai/ws.ts @@ -17,7 +17,7 @@ export interface StreamResponsesWebSocketOptions { body: Record idleTimeout?: number signal?: AbortSignal - onFirstEvent?: () => void + onCommitted?: () => void onComplete?: (event: Record) => void onTerminal?: (event: Record) => void onRetryableTerminal?: (event: Record) => Promise @@ -125,7 +125,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption let controller: ReadableStreamDefaultController | undefined let cleanupSocket = () => {} let completed = false - let emitted = false + let committed = false let idleTimer: ReturnType | undefined function cleanup() { @@ -163,6 +163,13 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption } } + function commit(event: Record | undefined) { + if (committed) return + if (isReplaySafeEvent(event)) return + committed = true + options.onCommitted?.() + } + async function onMessage(data: WebSocket.RawData, isBinary: boolean) { if (completed) return if (isBinary) { @@ -180,7 +187,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption } })() - if (event?.type === "error" && !emitted && options.onRetryableTerminal) { + if (event?.type === "error" && !committed && options.onRetryableTerminal) { cleanupSocket() if (idleTimer) clearTimeout(idleTimer) idleTimer = undefined @@ -200,7 +207,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption } } - if (!emitted) options.onFirstEvent?.() + commit(event) controller?.enqueue( encoder.encode( `${text @@ -209,7 +216,6 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption .join("\n")}\n\n`, ), ) - emitted = true resetIdleTimeout("idle timeout waiting for websocket") if (!event) return @@ -319,4 +325,17 @@ function closeError(message: string, code: number, reason: Buffer) { return new Error(`${message} (${details.join(": ")})`) } +function isReplaySafeEvent(event: Record | undefined) { + // Lifecycle and structural preamble events are safe to replay. Content deltas + // and completed output/tool items commit the stream to avoid duplicates. + return ( + event?.type === "response.created" || + event?.type === "response.in_progress" || + event?.type === "response.queued" || + event?.type === "response.output_item.added" || + event?.type === "response.content_part.added" || + event?.type === "response.reasoning_summary_part.added" + ) +} + export * as OpenAIWebSocket from "./ws" diff --git a/packages/opencode/test/plugin/openai-ws.test.ts b/packages/opencode/test/plugin/openai-ws.test.ts index 1fe26cfcb482..7e8134bc290c 100644 --- a/packages/opencode/test/plugin/openai-ws.test.ts +++ b/packages/opencode/test/plugin/openai-ws.test.ts @@ -253,12 +253,15 @@ describe("plugin.openai.ws-pool", () => { }) }) const httpRequests: Headers[] = [] + const retries: { sessionID: string; attempt: number; message: string }[] = [] const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, httpFetch: mockFetch(async (_input, init) => { httpRequests.push(new Headers(init?.headers)) return new Response("http") }), + onRetry: (retry) => + retries.push({ sessionID: retry.sessionID, attempt: retry.attempt, message: retry.message }), }) const response = await fetch("https://api.openai.com/v1/responses", streamRequest()) @@ -270,6 +273,9 @@ describe("plugin.openai.ws-pool", () => { expect(connections).toBe(2) expect(messages).toBe(2) expect(httpRequests).toHaveLength(0) + expect(retries).toEqual([ + { sessionID: "session-1", attempt: 1, message: "WebSocket connection limit reached; retrying request" }, + ]) fetch.close() }) @@ -318,6 +324,7 @@ describe("plugin.openai.ws-pool", () => { socket.once("message", () => {}) }) const httpRequests: Headers[] = [] + const retries: { sessionID: string; attempt: number; message: string }[] = [] const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, idleTimeout: 20, @@ -325,6 +332,8 @@ describe("plugin.openai.ws-pool", () => { httpRequests.push(new Headers(init?.headers)) return new Response("http") }), + onRetry: (retry) => + retries.push({ sessionID: retry.sessionID, attempt: retry.attempt, message: retry.message }), }) const first = await fetch("https://api.openai.com/v1/responses", streamRequest()) @@ -334,6 +343,7 @@ describe("plugin.openai.ws-pool", () => { expect(await second.text()).toBe("http") expect(connections).toBe(1) expect(httpRequests).toHaveLength(2) + expect(retries).toEqual([{ sessionID: "session-1", attempt: 1, message: "WebSocket timed out; retrying request" }]) fetch.close() }) @@ -344,6 +354,7 @@ describe("plugin.openai.ws-pool", () => { }) }) const httpRequests: Headers[] = [] + const retries: { sessionID: string; attempt: number; message: string }[] = [] const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, idleTimeout: 20, @@ -351,6 +362,8 @@ describe("plugin.openai.ws-pool", () => { httpRequests.push(new Headers(init?.headers)) return new Response("http") }), + onRetry: (retry) => + retries.push({ sessionID: retry.sessionID, attempt: retry.attempt, message: retry.message }), }) const first = await fetch("https://api.openai.com/v1/responses", streamRequest()) @@ -359,6 +372,7 @@ describe("plugin.openai.ws-pool", () => { expect(await second.text()).toBe("http") expect(httpRequests).toHaveLength(1) + expect(retries).toEqual([]) fetch.close() }) @@ -467,6 +481,88 @@ describe("plugin.openai.ws-pool", () => { fetch.close() }) + test("replays over HTTP after replay-safe metadata then an unexpected close", async () => { + let connections = 0 + await using server = await createWebSocketServer((socket) => { + connections += 1 + socket.once("message", () => { + socket.send(JSON.stringify({ type: "response.created", response: { id: "resp_meta" } }), () => { + socket.terminate() + }) + }) + }) + const httpRequests: Headers[] = [] + const fetch = OpenAIWebSocketPool.createWebSocketFetch({ + url: server.url, + httpFetch: mockFetch(async (_input, init) => { + httpRequests.push(new Headers(init?.headers)) + return new Response("http") + }), + }) + + const first = await fetch("https://api.openai.com/v1/responses", streamRequest()) + expect(await first.text()).toBe("http") + const second = await fetch("https://api.openai.com/v1/responses", streamRequest()) + + expect(await second.text()).toBe("http") + expect(connections).toBe(1) + expect(httpRequests).toHaveLength(2) + fetch.close() + }) + + test("replays over HTTP after response preamble then an unexpected close", async () => { + let connections = 0 + await using server = await createWebSocketServer((socket) => { + connections += 1 + socket.once("message", () => { + socket.send(JSON.stringify({ type: "response.created", response: { id: "resp_preamble" } }), () => { + socket.send( + JSON.stringify({ + type: "response.output_item.added", + output_index: 0, + item: { type: "message", id: "item_preamble", status: "in_progress", role: "assistant", content: [] }, + }), + () => { + socket.send( + JSON.stringify({ + type: "response.content_part.added", + item_id: "item_preamble", + output_index: 0, + content_index: 0, + part: { type: "output_text", text: "", annotations: [] }, + }), + () => socket.terminate(), + ) + }, + ) + }) + }) + }) + const httpRequests: Headers[] = [] + const retries: { sessionID: string; attempt: number; message: string }[] = [] + const fetch = OpenAIWebSocketPool.createWebSocketFetch({ + url: server.url, + httpFetch: mockFetch(async (_input, init) => { + httpRequests.push(new Headers(init?.headers)) + return new Response("http") + }), + onRetry: (retry) => + retries.push({ sessionID: retry.sessionID, attempt: retry.attempt, message: retry.message }), + }) + + const first = await fetch("https://api.openai.com/v1/responses", streamRequest()) + expect(await first.text()).toBe("http") + const second = await fetch("https://api.openai.com/v1/responses", streamRequest()) + + expect(await second.text()).toBe("http") + expect(connections).toBe(1) + expect(httpRequests).toHaveLength(2) + expect(retries).toEqual([ + { sessionID: "session-1", attempt: 1, message: "WebSocket disconnected; retrying request" }, + ]) + fetch.close() + }) + test("does not keep HTTP fallback active after aborting a websocket response", async () => { let connections = 0 await using server = await createWebSocketServer((socket) => {