Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions packages/opencode/src/provider/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1065,10 +1065,13 @@ export namespace Provider {
const fetchFn = customFetch ?? fetch
const opts = init ?? {}

if (options["timeout"] !== undefined && options["timeout"] !== null) {
// Apply timeout: use configured value, fall back to 300s default.
// Only skip when explicitly set to `false`.
const timeout = options["timeout"] ?? 300_000
if (timeout !== false) {
const signals: AbortSignal[] = []
if (opts.signal) signals.push(opts.signal)
if (options["timeout"] !== false) signals.push(AbortSignal.timeout(options["timeout"]))
signals.push(AbortSignal.timeout(timeout))

const combined = signals.length > 1 ? AbortSignal.any(signals) : signals[0]

Expand Down
8 changes: 8 additions & 0 deletions packages/opencode/src/session/message-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,14 @@ export namespace MessageV2 {
},
{ cause: e },
).toObject()
case e instanceof Error && e.name === "StreamIdleTimeoutError":
return new MessageV2.APIError(
{
message: e.message,
isRetryable: true,
},
{ cause: e },
).toObject()
case APICallError.isInstance(e):
const parsed = ProviderError.parseAPICallError({
providerID: ctx.providerID,
Expand Down
39 changes: 37 additions & 2 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,40 @@ import { Question } from "@/question"

export namespace SessionProcessor {
const DOOM_LOOP_THRESHOLD = 3
const MAX_RETRY_ATTEMPTS = 10
const STREAM_IDLE_TIMEOUT_MS = 120_000 // 2 minutes with no stream chunk
const log = Log.create({ service: "session.processor" })

class StreamIdleTimeoutError extends Error {
readonly isRetryable = true
constructor(ms: number) {
super(`Stream idle for ${ms}ms with no data received`)
this.name = "StreamIdleTimeoutError"
}
}

async function* withIdleTimeout<T>(stream: AsyncIterable<T>, ms: number, signal: AbortSignal): AsyncGenerator<T> {
const iterator = stream[Symbol.asyncIterator]()
while (true) {
const result = await Promise.race([
iterator.next(),
new Promise<never>((_, reject) => {
const timer = setTimeout(() => reject(new StreamIdleTimeoutError(ms)), ms)
signal.addEventListener(
"abort",
() => {
clearTimeout(timer)
reject(signal.reason ?? new DOMException("Aborted", "AbortError"))
},
{ once: true },
)
}),
])
if (result.done) return
yield result.value
}
}

export type Info = Awaited<ReturnType<typeof create>>
export type Result = Awaited<ReturnType<Info["process"]>>

Expand Down Expand Up @@ -52,7 +84,7 @@ export namespace SessionProcessor {
let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
const stream = await LLM.stream(streamInput)

for await (const value of stream.fullStream) {
for await (const value of withIdleTimeout(stream.fullStream, STREAM_IDLE_TIMEOUT_MS, input.abort)) {
input.abort.throwIfAborted()
switch (value.type) {
case "start":
Expand Down Expand Up @@ -357,7 +389,7 @@ export namespace SessionProcessor {
// TODO: Handle context overflow error
}
const retry = SessionRetry.retryable(error)
if (retry !== undefined) {
if (retry !== undefined && attempt < MAX_RETRY_ATTEMPTS) {
attempt++
const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined)
SessionStatus.set(input.sessionID, {
Expand All @@ -369,6 +401,9 @@ export namespace SessionProcessor {
await SessionRetry.sleep(delay, input.abort).catch(() => {})
continue
}
if (retry !== undefined) {
log.error("max retries exceeded", { attempt, message: retry })
}
input.assistantMessage.error = error
Bus.publish(Session.Event.Error, {
sessionID: input.assistantMessage.sessionID,
Expand Down
11 changes: 9 additions & 2 deletions packages/opencode/src/tool/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { iife } from "@/util/iife"
import { defer } from "@/util/defer"
import { Config } from "../config/config"
import { PermissionNext } from "@/permission/next"
import { abortAfterAny } from "@/util/abort"

const parameters = z.object({
description: z.string().describe("A short (3-5 words) description of the task"),
Expand Down Expand Up @@ -118,11 +119,17 @@ export const TaskTool = Tool.define("task", async (ctx) => {

const messageID = Identifier.ascending("message")

// Subagent timeout: 10 minutes default to prevent indefinite hangs
const SUBAGENT_TIMEOUT_MS = 10 * 60 * 1000
const timeout = abortAfterAny(SUBAGENT_TIMEOUT_MS, ctx.abort)
function cancel() {
SessionPrompt.cancel(session.id)
}
ctx.abort.addEventListener("abort", cancel)
using _ = defer(() => ctx.abort.removeEventListener("abort", cancel))
timeout.signal.addEventListener("abort", cancel)
using _ = defer(() => {
timeout.signal.removeEventListener("abort", cancel)
timeout.clearTimeout()
})
const promptParts = await SessionPrompt.resolvePromptParts(params.prompt)

const result = await SessionPrompt.prompt({
Expand Down
Loading