diff --git a/packages/opencode/src/tool/task.ts b/packages/opencode/src/tool/task.ts index 5ca1666add7c..be9930789730 100644 --- a/packages/opencode/src/tool/task.ts +++ b/packages/opencode/src/tool/task.ts @@ -10,10 +10,12 @@ import { Agent } from "../agent/agent" import { deriveSubagentSessionPermission } from "../agent/subagent-permissions" import type { SessionPrompt } from "../session/prompt" import { Config } from "@/config/config" -import { Effect, Exit, Schema, Scope } from "effect" +import { Effect, Exit, Fiber, Option, Schema, Scope } from "effect" +import * as Stream from "effect/Stream" import { EffectBridge } from "@/effect/bridge" import { RuntimeFlags } from "@/effect/runtime-flags" import { Database } from "@opencode-ai/core/database/database" +import { EventV2Bridge } from "@/event-v2-bridge" export interface TaskPromptOps { cancel(sessionID: SessionID): Effect.Effect @@ -88,6 +90,7 @@ export const TaskTool = Tool.define( const scope = yield* Scope.Scope const flags = yield* RuntimeFlags.Service const database = yield* Database.Service + const events = yield* EventV2Bridge.Service const run = Effect.fn("TaskTool.execute")(function* ( params: Schema.Schema.Type, @@ -171,9 +174,14 @@ export const TaskTool = Tool.define( const ops = ctx.extra?.promptOps as TaskPromptOps if (!ops) return yield* Effect.fail(new Error("TaskTool requires promptOps in ctx.extra")) + type TaskRaceResult = + | { kind: "prompt"; value: SessionV1.WithParts } + | { kind: "error"; message: string } + const runTask = Effect.fn("TaskTool.runTask")(function* () { const parts = yield* ops.resolvePromptParts(params.prompt) - const result = yield* ops.prompt({ + + const promptInput = { messageID: MessageID.ascending(), sessionID: nextSession.id, model: { @@ -188,8 +196,49 @@ export const TaskTool = Tool.define( ...Object.fromEntries((cfg.experimental?.primary_tools ?? []).map((item) => [item, false])), }, parts, - }) - return result.parts.findLast((item) => item.type === "text")?.text ?? "" + } + + // Subscribe to error events BEFORE the prompt starts to avoid a race + // where the error fires before the listener is wired up. + const errorFiber = yield* events + .subscribe(Session.Event.Error) + .pipe( + Stream.filter((event) => event.data.sessionID === nextSession.id), + Stream.take(1), + Stream.runHead, + Effect.map((maybeEvent): TaskRaceResult => { + const err = Option.isSome(maybeEvent) ? maybeEvent.value.data.error : undefined + const message = err + ? typeof err === "object" && err !== null + ? JSON.stringify((err as Record).data ?? err) + : String(err) + : "Subagent session error" + return { kind: "error", message } + }), + Effect.forkIn(scope), + ) + + const promptEff = ops.prompt(promptInput).pipe( + Effect.map((value): TaskRaceResult => ({ kind: "prompt", value })), + ) + + const timeoutEff = Effect.sleep("120 seconds").pipe( + Effect.map( + (): TaskRaceResult => ({ + kind: "error", + message: "Subagent session timed out after 120s of inactivity", + }), + ), + ) + + const result = yield* Effect.raceFirst( + promptEff, + Effect.raceFirst(Fiber.join(errorFiber), timeoutEff), + ).pipe( + Effect.ensuring(Fiber.interrupt(errorFiber)), + ) + if (result.kind === "error") return yield* Effect.fail(new Error(result.message)) + return result.value.parts.findLast((item) => item.type === "text")?.text ?? "" }) const inject = Effect.fn("TaskTool.injectBackgroundResult")(function* ( diff --git a/packages/opencode/test/tool/task.test.ts b/packages/opencode/test/tool/task.test.ts index 6a9604a01efa..5a22b2f61ea6 100644 --- a/packages/opencode/test/tool/task.test.ts +++ b/packages/opencode/test/tool/task.test.ts @@ -1,7 +1,7 @@ import { afterEach, describe, expect } from "bun:test" import { SessionV1 } from "@opencode-ai/core/v1/session" import { Database } from "@opencode-ai/core/database/database" -import { Deferred, Effect, Exit, Fiber, Layer } from "effect" +import { Cause, Deferred, Effect, Exit, Fiber, Layer } from "effect" import { Agent } from "../../src/agent/agent" import { BackgroundJob } from "@/background/job" import { EventV2Bridge } from "@/event-v2-bridge" @@ -898,4 +898,62 @@ describe("tool.task", () => { expect((yield* jobs.get(grandchild.id))?.status).toBe("cancelled") }), ) + + it.instance("fails fast when session error event fires during subagent prompt", () => + Effect.gen(function* () { + const events = yield* EventV2Bridge.Service + const sessions = yield* Session.Service + const { chat, assistant } = yield* seed() + const child = yield* sessions.create({ parentID: chat.id, title: "error test child" }) + const tool = yield* TaskTool + const def = yield* tool.init() + const started = defer() + + const promptOps: TaskPromptOps = { + cancel: () => Effect.void, + resolvePromptParts: (template) => Effect.succeed([{ type: "text" as const, text: template }]), + prompt: (input) => + Effect.promise(() => { + started.resolve() + return new Promise(() => {}) + }), + } + + const fiber = yield* def + .execute( + { + description: "test", + prompt: "hi", + subagent_type: "general", + task_id: child.id, + }, + { + sessionID: chat.id, + messageID: assistant.id, + agent: "build", + abort: new AbortController().signal, + extra: { promptOps }, + messages: [], + metadata: () => Effect.void, + ask: () => Effect.void, + }, + ) + .pipe(Effect.forkChild) + + // Wait for the prompt to start, then give a small grace period + yield* Effect.promise(() => started.promise) + yield* Effect.sleep("500 millis") + + yield* events.publish(Session.Event.Error, { + sessionID: child.id, + error: { name: "UnknownError" as const, data: { message: "Provider quota exhausted" } }, + }) + + const exit = yield* Fiber.await(fiber).pipe(Effect.timeoutOption("10 seconds")) + expect(exit._tag === "Some" && Exit.isFailure(exit.value)).toBe(true) + if (exit._tag === "Some" && Exit.isFailure(exit.value)) + expect(Cause.pretty(exit.value.cause)).toContain("Provider quota exhausted") + + }), + ) })