Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions packages/opencode/src/cli/cmd/tui/component/prompt/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ const money = new Intl.NumberFormat("en-US", {
})

const DRAFT_RETENTION_MIN_CHARS = 20
const RETRY_NOTICE_MIN_VISIBLE_MS = 1000

function randomIndex(count: number) {
if (count <= 0) return 0
Expand Down Expand Up @@ -148,6 +149,29 @@ export function Prompt(props: PromptProps) {
const dialog = useDialog()
const toast = useToast()
const status = createMemo(() => sync.data.session_status?.[props.sessionID ?? ""] ?? { type: "idle" })
const [visibleRetry, setVisibleRetry] = createSignal<ReturnType<typeof status>>()
let retryNoticeTimer: NodeJS.Timeout | undefined
createEffect(() => {
const current = status()
if (current.type !== "retry") return
setVisibleRetry(current)
if (retryNoticeTimer) clearTimeout(retryNoticeTimer)
retryNoticeTimer = setTimeout(() => {
retryNoticeTimer = undefined
setVisibleRetry(undefined)
}, RETRY_NOTICE_MIN_VISIBLE_MS)
retryNoticeTimer.unref()
})
onCleanup(() => {
if (retryNoticeTimer) clearTimeout(retryNoticeTimer)
})
const displayStatus = createMemo(() => {
const current = status()
if (current.type === "retry") return current
const retry = visibleRetry()
if (retry?.type === "retry") return retry
return current
})
const history = usePromptHistory()
const stash = usePromptStash()
const keymap = useOpencodeKeymap()
Expand Down Expand Up @@ -1631,12 +1655,12 @@ export function Prompt(props: PromptProps) {
</box>
<box width="100%" flexDirection="row" justifyContent="space-between">
<Switch>
<Match when={status().type !== "idle"}>
<Match when={displayStatus().type !== "idle"}>
<box
flexDirection="row"
gap={1}
flexGrow={1}
justifyContent={status().type === "retry" ? "space-between" : "flex-start"}
justifyContent={displayStatus().type === "retry" ? "space-between" : "flex-start"}
>
<box flexShrink={0} flexDirection="row" gap={1}>
<box marginLeft={1}>
Expand All @@ -1647,7 +1671,7 @@ export function Prompt(props: PromptProps) {
<box flexDirection="row" gap={1} flexShrink={0}>
{(() => {
const retry = createMemo(() => {
const s = status()
const s = displayStatus()
if (s.type !== "retry") return
return s
})
Expand Down Expand Up @@ -1748,7 +1772,7 @@ export function Prompt(props: PromptProps) {
</Match>
<Match when={true}>{props.hint ?? <text />}</Match>
</Switch>
<Show when={status().type !== "retry"}>
<Show when={displayStatus().type !== "retry"}>
<box gap={2} flexDirection="row">
<Show when={editorContextLabelState() !== "none" ? editorFileLabelDisplay() : undefined}>
{(file) => (
Expand Down
32 changes: 29 additions & 3 deletions packages/opencode/src/plugin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import { Bus } from "../bus"
import * as Log from "@opencode-ai/core/util/log"
import { createOpencodeClient } from "@opencode-ai/sdk"
import { ServerAuth } from "@/server/auth"
import { CodexAuthPlugin } from "./openai/codex"
import { CodexAuthPlugin, type CodexAuthPluginOptions } from "./openai/codex"
import { Session } from "@/session/session"
import { SessionID } from "@/session/schema"
import { SessionStatus } from "@/session/status"
import { NamedError } from "@opencode-ai/core/util/error"
import { CopilotAuthPlugin } from "./github-copilot/copilot"
import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
Expand Down Expand Up @@ -63,12 +65,13 @@ export function experimentalWebSocketsEnabled(input: { enabled: boolean; channel
}

// Built-in plugins that are directly imported (not installed from npm)
function internalPlugins(flags: RuntimeFlags.Info): PluginInstance[] {
function internalPlugins(flags: RuntimeFlags.Info, options?: Pick<CodexAuthPluginOptions, "onWebSocketRetry">): PluginInstance[] {
return [
// Temporary rollout: pre-release builds use WebSockets by default; releases require explicit opt-in.
(input) =>
CodexAuthPlugin(input, {
experimentalWebSockets: experimentalWebSocketsEnabled({ enabled: flags.experimentalWebSockets }),
onWebSocketRetry: options?.onWebSocketRetry,
}),
CopilotAuthPlugin,
GitlabAuthPlugin,
Expand Down Expand Up @@ -162,7 +165,30 @@ export const layer = Layer.effect(
$: typeof Bun === "undefined" ? undefined : Bun.$,
}

for (const plugin of flags.disableDefaultPlugins ? [] : internalPlugins(flags)) {
const builtinPlugins = flags.disableDefaultPlugins
? []
: internalPlugins(flags, {
onWebSocketRetry(retry) {
bridge.fork(
Effect.sync(() => SessionID.make(retry.sessionID)).pipe(
Effect.flatMap((sessionID) =>
bus.publish(SessionStatus.Event.Status, {
sessionID,
status: {
type: "retry",
attempt: retry.attempt,
message: retry.message,
next: retry.next,
},
}),
),
Effect.ignore,
),
)
},
})

for (const plugin of builtinPlugins) {
log.info("loading internal plugin", { name: plugin.name })
const init = yield* Effect.tryPromise({
try: () => plugin(input),
Expand Down
7 changes: 5 additions & 2 deletions packages/opencode/src/plugin/openai/codex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,13 @@ interface TokenResponse {
expires_in?: number
}

interface CodexAuthPluginOptions {
type CreateWebSocketFetchOptions = NonNullable<Parameters<typeof OpenAIWebSocketPool.createWebSocketFetch>[0]>

export interface CodexAuthPluginOptions {
issuer?: string
codexApiEndpoint?: string
experimentalWebSockets?: boolean
onWebSocketRetry?: CreateWebSocketFetchOptions["onRetry"]
}

async function exchangeCodeForTokens(code: string, redirectUri: string, pkce: PkceCodes): Promise<TokenResponse> {
Expand Down Expand Up @@ -407,7 +410,7 @@ export async function CodexAuthPlugin(input: PluginInput, options: CodexAuthPlug
async loader(getAuth) {
const auth = await getAuth()
const websocketFetch = options.experimentalWebSockets
? OpenAIWebSocketPool.createWebSocketFetch({ httpFetch: fetch })
? OpenAIWebSocketPool.createWebSocketFetch({ httpFetch: fetch, onRetry: options.onWebSocketRetry })
: undefined
if (websocketFetch) {
websocketFetches.push(websocketFetch)
Expand Down
53 changes: 43 additions & 10 deletions packages/opencode/src/plugin/openai/ws-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ export interface CreateWebSocketFetchOptions {
idleTimeout?: number
maxConnectionAge?: number
connectionLimitRetries?: number
onRetry?: (input: WebSocketTransportRetry) => void
}

export interface WebSocketTransportRetry {
sessionID: string
attempt: number
message: string
next: number
}

interface PoolEntry {
Expand Down Expand Up @@ -86,8 +94,19 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {

entry.busy = true
entry.lastUsedAt = Date.now()
let transportRetryAttempts = 0
function notifyRetry(message: string) {
transportRetryAttempts++
try {
options?.onRetry?.({ sessionID, attempt: transportRetryAttempts, message, next: Date.now() })
} catch (error) {
log.warn("websocket retry notice failed", { key, error: error instanceof Error ? error.message : String(error) })
}
}

try {
let connectionLimitAttempts = 0
let hasCommitted = false
entry.socket = await socket(
entry,
options?.url ?? url,
Expand All @@ -96,18 +115,21 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
maxConnectionAge,
init?.signal,
)
let resolveFirstEvent: (started: boolean) => void = () => {}
let rejectFirstEvent: (error: Error) => void = () => {}
const firstEvent = new Promise<boolean>((resolve, reject) => {
resolveFirstEvent = resolve
rejectFirstEvent = reject
let resolveCommitted: (started: boolean) => void = () => {}
let rejectCommitted: (error: Error) => void = () => {}
const committed = new Promise<boolean>((resolve, reject) => {
resolveCommitted = resolve
rejectCommitted = reject
})
const response = OpenAIWebSocket.streamResponsesWebSocket({
socket: entry.socket,
body,
idleTimeout,
signal: init?.signal ?? undefined,
onFirstEvent: () => resolveFirstEvent(true),
onCommitted: () => {
hasCommitted = true
resolveCommitted(true)
},
onTerminal: (event) => {
entry.busy = false
entry.lastUsedAt = Date.now()
Expand All @@ -121,14 +143,15 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
entry.busy = false
entry.fallback = true
invalidate(entry)
resolveFirstEvent(false)
if (!hasCommitted) notifyRetry(retryMessage(error))
resolveCommitted(false)
},
onAbort: (error) => {
log.debug("websocket aborted", { key })
entry.busy = false
entry.lastUsedAt = Date.now()
invalidate(entry)
rejectFirstEvent(error)
rejectCommitted(error)
},
onRetryableTerminal: async (event) => {
const error = connectionLimitError(event)
Expand All @@ -137,6 +160,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {

connectionLimitAttempts++
log.warn("websocket connection limit reached", { key, attempt: connectionLimitAttempts })
notifyRetry("WebSocket connection limit reached; retrying request")
invalidate(entry)
entry.socket = await socket(
entry,
Expand All @@ -150,8 +174,8 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
return entry.socket
},
})
if (await firstEvent) return response
log.debug("http fallback", { key, reason: "websocket_failed_before_first_event" })
if (await committed) return response
log.debug("http fallback", { key, reason: "websocket_failed_before_committed_output" })
return httpFetch(input, httpInit)
} catch (error) {
entry.busy = false
Expand All @@ -167,6 +191,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
error: error instanceof Error ? error.message : String(error),
fallback: "http",
})
notifyRetry(retryMessage(error))
invalidate(entry)
return httpFetch(input, httpInit)
}
Expand Down Expand Up @@ -198,6 +223,14 @@ function connectionLimitError(event: Record<string, unknown>) {
return new Error(typeof event.error.message === "string" ? event.error.message : CONNECTION_LIMIT_REACHED_CODE)
}

function retryMessage(error: unknown) {
const message = error instanceof Error ? error.message : String(error)
const lower = message.toLowerCase()
if (lower.includes("timeout") || lower.includes("timed out")) return "WebSocket timed out; retrying request"
if (lower.includes("closed")) return "WebSocket disconnected; retrying request"
return "WebSocket failed; retrying request"
}

async function socket(
entry: PoolEntry,
url: string,
Expand Down
29 changes: 24 additions & 5 deletions packages/opencode/src/plugin/openai/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export interface StreamResponsesWebSocketOptions {
body: Record<string, unknown>
idleTimeout?: number
signal?: AbortSignal
onFirstEvent?: () => void
onCommitted?: () => void
onComplete?: (event: Record<string, unknown>) => void
onTerminal?: (event: Record<string, unknown>) => void
onRetryableTerminal?: (event: Record<string, unknown>) => Promise<WebSocket | undefined>
Expand Down Expand Up @@ -125,7 +125,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
let controller: ReadableStreamDefaultController<Uint8Array> | undefined
let cleanupSocket = () => {}
let completed = false
let emitted = false
let committed = false
let idleTimer: ReturnType<typeof setTimeout> | undefined

function cleanup() {
Expand Down Expand Up @@ -163,6 +163,13 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
}
}

function commit(event: Record<string, unknown> | undefined) {
if (committed) return
if (isReplaySafeEvent(event)) return
committed = true
options.onCommitted?.()
}

async function onMessage(data: WebSocket.RawData, isBinary: boolean) {
if (completed) return
if (isBinary) {
Expand All @@ -180,7 +187,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
}
})()

if (event?.type === "error" && !emitted && options.onRetryableTerminal) {
if (event?.type === "error" && !committed && options.onRetryableTerminal) {
cleanupSocket()
if (idleTimer) clearTimeout(idleTimer)
idleTimer = undefined
Expand All @@ -200,7 +207,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
}
}

if (!emitted) options.onFirstEvent?.()
commit(event)
controller?.enqueue(
encoder.encode(
`${text
Expand All @@ -209,7 +216,6 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
.join("\n")}\n\n`,
),
)
emitted = true
resetIdleTimeout("idle timeout waiting for websocket")

if (!event) return
Expand Down Expand Up @@ -319,4 +325,17 @@ function closeError(message: string, code: number, reason: Buffer) {
return new Error(`${message} (${details.join(": ")})`)
}

function isReplaySafeEvent(event: Record<string, unknown> | undefined) {
// Lifecycle and structural preamble events are safe to replay. Content deltas
// and completed output/tool items commit the stream to avoid duplicates.
return (
event?.type === "response.created" ||
event?.type === "response.in_progress" ||
event?.type === "response.queued" ||
event?.type === "response.output_item.added" ||
event?.type === "response.content_part.added" ||
event?.type === "response.reasoning_summary_part.added"
)
}

export * as OpenAIWebSocket from "./ws"
Loading
Loading