diff --git a/packages/opencode/src/provider/provider.ts b/packages/opencode/src/provider/provider.ts index cdbad6637848..593648167443 100644 --- a/packages/opencode/src/provider/provider.ts +++ b/packages/opencode/src/provider/provider.ts @@ -1065,10 +1065,13 @@ export namespace Provider { const fetchFn = customFetch ?? fetch const opts = init ?? {} - if (options["timeout"] !== undefined && options["timeout"] !== null) { + // Apply timeout: use configured value, fall back to 300s default. + // Only skip when explicitly set to `false`. + const timeout = options["timeout"] ?? 300_000 + if (timeout !== false) { const signals: AbortSignal[] = [] if (opts.signal) signals.push(opts.signal) - if (options["timeout"] !== false) signals.push(AbortSignal.timeout(options["timeout"])) + signals.push(AbortSignal.timeout(timeout)) const combined = signals.length > 1 ? AbortSignal.any(signals) : signals[0] diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 178751a2227a..6c819d9f1555 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -840,6 +840,14 @@ export namespace MessageV2 { }, { cause: e }, ).toObject() + case e instanceof Error && e.name === "StreamIdleTimeoutError": + return new MessageV2.APIError( + { + message: e.message, + isRetryable: true, + }, + { cause: e }, + ).toObject() case APICallError.isInstance(e): const parsed = ProviderError.parseAPICallError({ providerID: ctx.providerID, diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index e7532d20073b..aa146020514c 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -18,8 +18,40 @@ import { Question } from "@/question" export namespace SessionProcessor { const DOOM_LOOP_THRESHOLD = 3 + const MAX_RETRY_ATTEMPTS = 10 + const STREAM_IDLE_TIMEOUT_MS = 120_000 // 2 minutes with no stream chunk const log = Log.create({ service: "session.processor" }) + class StreamIdleTimeoutError extends Error { + readonly isRetryable = true + constructor(ms: number) { + super(`Stream idle for ${ms}ms with no data received`) + this.name = "StreamIdleTimeoutError" + } + } + + async function* withIdleTimeout(stream: AsyncIterable, ms: number, signal: AbortSignal): AsyncGenerator { + const iterator = stream[Symbol.asyncIterator]() + while (true) { + const result = await Promise.race([ + iterator.next(), + new Promise((_, reject) => { + const timer = setTimeout(() => reject(new StreamIdleTimeoutError(ms)), ms) + signal.addEventListener( + "abort", + () => { + clearTimeout(timer) + reject(signal.reason ?? new DOMException("Aborted", "AbortError")) + }, + { once: true }, + ) + }), + ]) + if (result.done) return + yield result.value + } + } + export type Info = Awaited> export type Result = Awaited> @@ -52,7 +84,7 @@ export namespace SessionProcessor { let reasoningMap: Record = {} const stream = await LLM.stream(streamInput) - for await (const value of stream.fullStream) { + for await (const value of withIdleTimeout(stream.fullStream, STREAM_IDLE_TIMEOUT_MS, input.abort)) { input.abort.throwIfAborted() switch (value.type) { case "start": @@ -357,7 +389,7 @@ export namespace SessionProcessor { // TODO: Handle context overflow error } const retry = SessionRetry.retryable(error) - if (retry !== undefined) { + if (retry !== undefined && attempt < MAX_RETRY_ATTEMPTS) { attempt++ const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined) SessionStatus.set(input.sessionID, { @@ -369,6 +401,9 @@ export namespace SessionProcessor { await SessionRetry.sleep(delay, input.abort).catch(() => {}) continue } + if (retry !== undefined) { + log.error("max retries exceeded", { attempt, message: retry }) + } input.assistantMessage.error = error Bus.publish(Session.Event.Error, { sessionID: input.assistantMessage.sessionID, diff --git a/packages/opencode/src/tool/task.ts b/packages/opencode/src/tool/task.ts index 8c8cf827abaf..4c3ded615526 100644 --- a/packages/opencode/src/tool/task.ts +++ b/packages/opencode/src/tool/task.ts @@ -10,6 +10,7 @@ import { iife } from "@/util/iife" import { defer } from "@/util/defer" import { Config } from "../config/config" import { PermissionNext } from "@/permission/next" +import { abortAfterAny } from "@/util/abort" const parameters = z.object({ description: z.string().describe("A short (3-5 words) description of the task"), @@ -118,11 +119,17 @@ export const TaskTool = Tool.define("task", async (ctx) => { const messageID = Identifier.ascending("message") + // Subagent timeout: 10 minutes default to prevent indefinite hangs + const SUBAGENT_TIMEOUT_MS = 10 * 60 * 1000 + const timeout = abortAfterAny(SUBAGENT_TIMEOUT_MS, ctx.abort) function cancel() { SessionPrompt.cancel(session.id) } - ctx.abort.addEventListener("abort", cancel) - using _ = defer(() => ctx.abort.removeEventListener("abort", cancel)) + timeout.signal.addEventListener("abort", cancel) + using _ = defer(() => { + timeout.signal.removeEventListener("abort", cancel) + timeout.clearTimeout() + }) const promptParts = await SessionPrompt.resolvePromptParts(params.prompt) const result = await SessionPrompt.prompt({