Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 53 additions & 4 deletions packages/opencode/src/tool/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import { Agent } from "../agent/agent"
import { deriveSubagentSessionPermission } from "../agent/subagent-permissions"
import type { SessionPrompt } from "../session/prompt"
import { Config } from "@/config/config"
import { Effect, Exit, Schema, Scope } from "effect"
import { Effect, Exit, Fiber, Option, Schema, Scope } from "effect"
import * as Stream from "effect/Stream"
import { EffectBridge } from "@/effect/bridge"
import { RuntimeFlags } from "@/effect/runtime-flags"
import { Database } from "@opencode-ai/core/database/database"
import { EventV2Bridge } from "@/event-v2-bridge"

export interface TaskPromptOps {
cancel(sessionID: SessionID): Effect.Effect<void>
Expand Down Expand Up @@ -88,6 +90,7 @@ export const TaskTool = Tool.define(
const scope = yield* Scope.Scope
const flags = yield* RuntimeFlags.Service
const database = yield* Database.Service
const events = yield* EventV2Bridge.Service

const run = Effect.fn("TaskTool.execute")(function* (
params: Schema.Schema.Type<typeof Parameters>,
Expand Down Expand Up @@ -171,9 +174,14 @@ export const TaskTool = Tool.define(
const ops = ctx.extra?.promptOps as TaskPromptOps
if (!ops) return yield* Effect.fail(new Error("TaskTool requires promptOps in ctx.extra"))

type TaskRaceResult =
| { kind: "prompt"; value: SessionV1.WithParts }
| { kind: "error"; message: string }

const runTask = Effect.fn("TaskTool.runTask")(function* () {
const parts = yield* ops.resolvePromptParts(params.prompt)
const result = yield* ops.prompt({

const promptInput = {
messageID: MessageID.ascending(),
sessionID: nextSession.id,
model: {
Expand All @@ -188,8 +196,49 @@ export const TaskTool = Tool.define(
...Object.fromEntries((cfg.experimental?.primary_tools ?? []).map((item) => [item, false])),
},
parts,
})
return result.parts.findLast((item) => item.type === "text")?.text ?? ""
}

// Subscribe to error events BEFORE the prompt starts to avoid a race
// where the error fires before the listener is wired up.
const errorFiber = yield* events
.subscribe(Session.Event.Error)
.pipe(
Stream.filter((event) => event.data.sessionID === nextSession.id),
Stream.take(1),
Stream.runHead,
Effect.map((maybeEvent): TaskRaceResult => {
const err = Option.isSome(maybeEvent) ? maybeEvent.value.data.error : undefined
const message = err
? typeof err === "object" && err !== null
? JSON.stringify((err as Record<string, unknown>).data ?? err)
: String(err)
: "Subagent session error"
return { kind: "error", message }
}),
Effect.forkIn(scope),
)

const promptEff = ops.prompt(promptInput).pipe(
Effect.map((value): TaskRaceResult => ({ kind: "prompt", value })),
)

const timeoutEff = Effect.sleep("120 seconds").pipe(
Effect.map(
(): TaskRaceResult => ({
kind: "error",
message: "Subagent session timed out after 120s of inactivity",
}),
),
)

const result = yield* Effect.raceFirst(
promptEff,
Effect.raceFirst(Fiber.join(errorFiber), timeoutEff),
).pipe(
Effect.ensuring(Fiber.interrupt(errorFiber)),
)
if (result.kind === "error") return yield* Effect.fail(new Error(result.message))
return result.value.parts.findLast((item) => item.type === "text")?.text ?? ""
})

const inject = Effect.fn("TaskTool.injectBackgroundResult")(function* (
Expand Down
60 changes: 59 additions & 1 deletion packages/opencode/test/tool/task.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { afterEach, describe, expect } from "bun:test"
import { SessionV1 } from "@opencode-ai/core/v1/session"
import { Database } from "@opencode-ai/core/database/database"
import { Deferred, Effect, Exit, Fiber, Layer } from "effect"
import { Cause, Deferred, Effect, Exit, Fiber, Layer } from "effect"
import { Agent } from "../../src/agent/agent"
import { BackgroundJob } from "@/background/job"
import { EventV2Bridge } from "@/event-v2-bridge"
Expand Down Expand Up @@ -898,4 +898,62 @@ describe("tool.task", () => {
expect((yield* jobs.get(grandchild.id))?.status).toBe("cancelled")
}),
)

it.instance("fails fast when session error event fires during subagent prompt", () =>
Effect.gen(function* () {
const events = yield* EventV2Bridge.Service
const sessions = yield* Session.Service
const { chat, assistant } = yield* seed()
const child = yield* sessions.create({ parentID: chat.id, title: "error test child" })
const tool = yield* TaskTool
const def = yield* tool.init()
const started = defer<void>()

const promptOps: TaskPromptOps = {
cancel: () => Effect.void,
resolvePromptParts: (template) => Effect.succeed([{ type: "text" as const, text: template }]),
prompt: (input) =>
Effect.promise(() => {
started.resolve()
return new Promise<never>(() => {})
}),
}

const fiber = yield* def
.execute(
{
description: "test",
prompt: "hi",
subagent_type: "general",
task_id: child.id,
},
{
sessionID: chat.id,
messageID: assistant.id,
agent: "build",
abort: new AbortController().signal,
extra: { promptOps },
messages: [],
metadata: () => Effect.void,
ask: () => Effect.void,
},
)
.pipe(Effect.forkChild)

// Wait for the prompt to start, then give a small grace period
yield* Effect.promise(() => started.promise)
yield* Effect.sleep("500 millis")

yield* events.publish(Session.Event.Error, {
sessionID: child.id,
error: { name: "UnknownError" as const, data: { message: "Provider quota exhausted" } },
})

const exit = yield* Fiber.await(fiber).pipe(Effect.timeoutOption("10 seconds"))
expect(exit._tag === "Some" && Exit.isFailure(exit.value)).toBe(true)
if (exit._tag === "Some" && Exit.isFailure(exit.value))
expect(Cause.pretty(exit.value.cause)).toContain("Provider quota exhausted")

}),
)
})
Loading