diff --git a/packages/app/src/context/server-sync.tsx b/packages/app/src/context/server-sync.tsx index fc79fdaa8478..3946a10ef2d1 100644 --- a/packages/app/src/context/server-sync.tsx +++ b/packages/app/src/context/server-sync.tsx @@ -38,6 +38,50 @@ import type { ServerScope } from "@/utils/server-scope" import { persisted } from "@/utils/persist" import { toggleMcp } from "./global-sync/mcp" +// Map a directory-scoped event to the session/message it concerns, so the listener can route it to +// the store that actually holds that session even when the event's own instance directory has no +// registered store. This is the cross-instance subagent case: a child anchored in a monorepo +// subproject (directory=) the UI never opened as its own project — the child is loaded +// into and displayed from its PARENT project's store (via childSessions). Directory-level events +// (server.instance.disposed / vcs / lsp) return no ids and must NOT use the fallback. +function eventRoutingIdentity(event: { type: string; properties?: unknown }): { + sessionID?: string + messageID?: string +} { + const p = (event.properties ?? {}) as { + info?: { id?: string; sessionID?: string } + part?: { sessionID?: string; messageID?: string } + sessionID?: string + messageID?: string + } + switch (event.type) { + case "session.created": + case "session.updated": + case "session.deleted": + return { sessionID: p.info?.id } + case "message.updated": + return { sessionID: p.info?.sessionID, messageID: p.info?.id } + case "message.part.updated": + return { sessionID: p.part?.sessionID, messageID: p.part?.messageID } + case "message.removed": + return { sessionID: p.sessionID, messageID: p.messageID } + case "message.part.removed": + case "message.part.delta": + return { messageID: p.messageID } + case "session.diff": + case "todo.updated": + case "session.status": + case "permission.asked": + case "permission.replied": + case "question.asked": + case "question.replied": + case "question.rejected": + return { sessionID: p.sessionID } + default: + return {} + } +} + type GlobalStore = { ready: boolean error?: InitError @@ -394,22 +438,58 @@ export function createServerSyncContextInner(serverSDK: ServerSDK) { } const existing = children.children[key] - if (!existing) return - children.mark(key) - const [store, setStore] = existing - applyDirectoryEvent({ - event, - directory, - store, - setStore, - push: queue.push, - setSessionTodo, - retainedLimit: sessionMeta.get(key)?.limit, - vcsCache: children.vcsCache.get(key), - loadLsp: () => { - void queryClient.fetchQuery(queryOptionsApi.lsp(key)) - }, - }) + if (existing) { + children.mark(key) + const [store, setStore] = existing + applyDirectoryEvent({ + event, + directory, + store, + setStore, + push: queue.push, + setSessionTodo, + retainedLimit: sessionMeta.get(key)?.limit, + vcsCache: children.vcsCache.get(key), + loadLsp: () => { + void queryClient.fetchQuery(queryOptionsApi.lsp(key)) + }, + }) + return + } + + // Fallback: the event's own instance directory has no registered store. Happens for a subagent + // child anchored in a subproject (directory=) that the UI never opened as its own + // project — the child session is loaded into and displayed from its PARENT project's store. + // Without this, the child's live events (message/part deltas, status, questions) are dropped and + // the child view stays frozen until it is re-entered (which re-fetches a snapshot). Route such + // session-scoped events to every registered store that already holds that session/message. + const identity = eventRoutingIdentity(event) + if (identity.sessionID === undefined && identity.messageID === undefined) return + for (const targetKey of Object.keys(children.children)) { + const target = children.children[targetKey] + if (!target) continue + const [store, setStore] = target + const holdsSession = + identity.sessionID !== undefined && + (store.message[identity.sessionID] !== undefined || store.session.some((s) => s.id === identity.sessionID)) + const holdsMessage = identity.messageID !== undefined && store.part[identity.messageID] !== undefined + if (!holdsSession && !holdsMessage) continue + const targetDirKey = directoryKey(targetKey) + children.mark(targetKey) + applyDirectoryEvent({ + event, + directory: targetKey, + store, + setStore, + push: queue.push, + setSessionTodo, + retainedLimit: sessionMeta.get(targetDirKey)?.limit, + vcsCache: children.vcsCache.get(targetDirKey), + loadLsp: () => { + void queryClient.fetchQuery(queryOptionsApi.lsp(targetDirKey)) + }, + }) + } }) onCleanup(unsub) diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts index 4f24fbb4b694..b0990af27446 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts @@ -2,6 +2,8 @@ import { EventV2Bridge } from "@/event-v2-bridge" import { InstanceState } from "@/effect/instance-state" import { GlobalBus } from "@/bus/global" import { EventV2 } from "@opencode-ai/core/event" +import { Session } from "@/session/session" +import { SessionID } from "@/session/schema" import { Effect, Queue } from "effect" import * as Stream from "effect/Stream" import { HttpServerResponse } from "effect/unstable/http" @@ -31,13 +33,73 @@ function eventResponse(events: EventV2.Interface) { const queue = yield* Queue.unbounded() const unsubscribe = yield* events.listen((event) => Effect.sync(() => Queue.offerUnsafe(queue, event))) yield* Effect.addFinalizer(() => unsubscribe) + const sessions = yield* Session.Service + + // OPENCODE_EVENT_SUBTREE (default on): a subscription on directory D receives events from every + // session whose tree is ROOTED in D — the open session plus all of its (possibly cross-directory) + // descendant subagents. This is what lets a monorepo-root TUI observe a subagent launched in a + // subproject (live progress, child-session navigation, live questions). Matching is by session + // lineage (parentID), NOT by the child's own directory, so it is robust to subagents anchored + // anywhere — including worktrees outside the subscription directory. Events without an owning + // session (global/instance events) stay scoped to the exact instance directory. Set + // OPENCODE_EVENT_SUBTREE=0 to restore strict matching (events only for sessions whose own + // directory equals the subscription directory). + // TODO(PR anomalyco/opencode#29271): revisit this env gate before upstreaming (opt-in or drop it). + const subtreeEvents = !["0", "false", "off", "no"].includes( + (process.env["OPENCODE_EVENT_SUBTREE"] ?? "").toLowerCase(), + ) + + // Memoized lineage resolver: maps a session id to the directory of its root ancestor (the + // subscription anchor for the whole tree). Cached per connection; each session is walked at most + // once, so subsequent events for it are an O(1) map lookup. + const rootDirectoryCache = new Map() + const rootDirectoryOf = (sessionID: string): Effect.Effect => + Effect.gen(function* () { + const chain: string[] = [] + let current: string | undefined = sessionID + let root: string | undefined = undefined + while (current) { + const cached = rootDirectoryCache.get(current) + if (cached) { + root = cached + break + } + if (chain.includes(current)) break // cycle guard (should not happen) + chain.push(current) + const info: Session.Info | undefined = yield* sessions + .get(SessionID.make(current)) + .pipe(Effect.catchCause(() => Effect.succeed(undefined))) + if (!info) break + if (!info.parentID) { + root = info.directory + break + } + current = info.parentID + } + if (root) for (const id of chain) rootDirectoryCache.set(id, root) + return root + }) + + const sessionIDOf = (event: EventV2.Payload): string | undefined => { + const sid = (event.data as { sessionID?: unknown } | undefined)?.sessionID + return typeof sid === "string" ? sid : undefined + } + + const matches = (event: EventV2.Payload): Effect.Effect => + Effect.gen(function* () { + if (event.location?.workspaceID !== undefined && event.location.workspaceID !== workspaceID) return false + const sessionID = sessionIDOf(event) + // Global/instance events (no owning session) keep the historical exact-directory behaviour. + if (sessionID === undefined) return event.location?.directory === instance.directory + if (!subtreeEvents) return event.location?.directory === instance.directory + const root = yield* rootDirectoryOf(sessionID) + return root === instance.directory + }) + const stream = Stream.fromQueue(queue).pipe( - Stream.filter( - (event) => - event.location?.directory === instance.directory && - (event.location.workspaceID === undefined || event.location.workspaceID === workspaceID), - ), - Stream.map((event) => ({ id: event.id, type: event.type, properties: event.data })), + Stream.mapEffect((event) => matches(event).pipe(Effect.map((keep) => ({ event, keep })))), + Stream.filter(({ keep }) => keep), + Stream.map(({ event }) => ({ id: event.id, type: event.type, properties: event.data })), ) const disposed = Stream.callback<{ id: string; type: string; properties: unknown }>((queue) => { const listener = (event: { diff --git a/packages/opencode/src/tool/registry.ts b/packages/opencode/src/tool/registry.ts index 541d1f4bbbd0..df69c2e317f9 100644 --- a/packages/opencode/src/tool/registry.ts +++ b/packages/opencode/src/tool/registry.ts @@ -49,6 +49,8 @@ import { Agent } from "../agent/agent" import { Skill } from "../skill" import { Permission } from "@/permission" import { BackgroundJob } from "@/background/job" +import { InstanceStore } from "@/project/instance-store" +import { InstanceLayer } from "@/project/instance-layer" import { RuntimeFlags } from "@/effect/runtime-flags" import { ProviderV2 } from "@opencode-ai/core/provider" import { ModelV2 } from "@opencode-ai/core/model" @@ -335,6 +337,7 @@ export const defaultLayer = Layer.suspend(() => Layer.provide(Format.defaultLayer), Layer.provide(CrossSpawnSpawner.defaultLayer), Layer.provide(Truncate.defaultLayer), + Layer.provide(InstanceLayer.layer), ) .pipe(Layer.provide(Database.defaultLayer), Layer.provide(RuntimeFlags.defaultLayer)), ) @@ -435,6 +438,7 @@ export const node = LayerNode.make(layer.pipe(Layer.provide(Ripgrep.defaultLayer Truncate.node, RuntimeFlags.node, Database.node, + InstanceStore.node, ]) export * as ToolRegistry from "./registry" diff --git a/packages/opencode/src/tool/task.ts b/packages/opencode/src/tool/task.ts index b0a866c90e23..2b5980d0bb5c 100644 --- a/packages/opencode/src/tool/task.ts +++ b/packages/opencode/src/tool/task.ts @@ -14,6 +14,7 @@ import { Effect, Exit, Schema, Scope } from "effect" import { EffectBridge } from "@/effect/bridge" import { RuntimeFlags } from "@/effect/runtime-flags" import { Database } from "@opencode-ai/core/database/database" +import { InstanceStore } from "@/project/instance-store" export interface TaskPromptOps { cancel(sessionID: SessionID): Effect.Effect @@ -44,6 +45,10 @@ const BaseParameterFields = { description: Schema.String.annotate({ description: "A short (3-5 words) description of the task" }), prompt: Schema.String.annotate({ description: "The task for the agent to perform" }), subagent_type: Schema.String.annotate({ description: "The type of specialized agent to use for this task" }), + directory: Schema.optional(Schema.String).annotate({ + description: + "Optional absolute path to the working directory for the subagent. When set, the subagent runs rooted at this path: its tools, agent registry, AGENTS.md chain and skill walk-up start from there instead of inheriting the parent session's directory. Omit to inherit the parent session's directory.", + }), task_id: Schema.optional(Schema.String).annotate({ description: "This should only be set if you mean to resume a previous task (you can pass a prior task_id and the task will continue the same subagent session as before instead of creating a fresh one)", @@ -88,6 +93,7 @@ export const TaskTool = Tool.define( const scope = yield* Scope.Scope const flags = yield* RuntimeFlags.Service const database = yield* Database.Service + const instanceStore = yield* InstanceStore.Service const run = Effect.fn("TaskTool.execute")(function* ( params: Schema.Schema.Type, @@ -113,14 +119,23 @@ export const TaskTool = Tool.define( }) } - const next = yield* agent.get(params.subagent_type) + const session = params.task_id + ? yield* sessions.get(SessionID.make(params.task_id)).pipe(Effect.catchCause(() => Effect.succeed(undefined))) + : undefined + + // Directory anchoring: an explicit `directory` wins; on resume we fall back to the child + // session's stored directory. When set, the agent is resolved, the child session is created + // and its prompt loop runs inside that directory's instance (so project-local agents, skills + // and AGENTS.md resolve via walk-up from there). Omitted → inherit the parent's instance. + const directory = params.directory ?? session?.directory + const provideInstance = (effect: Effect.Effect): Effect.Effect => + directory ? instanceStore.provide({ directory }, effect) : effect + + const next = yield* provideInstance(agent.get(params.subagent_type)) if (!next) { return yield* Effect.fail(new Error(`Unknown agent type: ${params.subagent_type} is not a valid agent type`)) } - const session = params.task_id - ? yield* sessions.get(SessionID.make(params.task_id)).pipe(Effect.catchCause(() => Effect.succeed(undefined))) - : undefined const parent = yield* sessions.get(ctx.sessionID) const childPermission = deriveSubagentSessionPermission({ parentSessionPermission: parent.permission ?? [], @@ -141,21 +156,25 @@ export const TaskTool = Tool.define( ] const nextSession = session ?? - (yield* sessions.create({ - parentID: ctx.sessionID, - title: params.description + ` (@${next.name} subagent)`, - agent: next.name, - permission: [ - ...childPermission, - ...childToolDenies.filter( - (deny) => - !childPermission.some( - (rule) => - rule.permission === deny.permission && rule.pattern === deny.pattern && rule.action === deny.action, - ), - ), - ], - })) + (yield* provideInstance( + sessions.create({ + parentID: ctx.sessionID, + title: params.description + ` (@${next.name} subagent)`, + agent: next.name, + permission: [ + ...childPermission, + ...childToolDenies.filter( + (deny) => + !childPermission.some( + (rule) => + rule.permission === deny.permission && + rule.pattern === deny.pattern && + rule.action === deny.action, + ), + ), + ], + }), + )) const msg = yield* MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID }).pipe( Effect.provideService(Database.Service, database), @@ -239,7 +258,7 @@ export const TaskTool = Tool.define( ) }) - if (yield* background.extend({ id: nextSession.id, run: runTask() })) { + if (yield* background.extend({ id: nextSession.id, run: provideInstance(runTask()) })) { return { title: params.description, metadata: { @@ -268,7 +287,7 @@ export const TaskTool = Tool.define( }), notify(nextSession.id), ]), - run: runTask().pipe(Effect.onInterrupt(() => ops.cancel(nextSession.id))), + run: provideInstance(runTask()).pipe(Effect.onInterrupt(() => ops.cancel(nextSession.id))), }) function backgroundResult() { diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index 08828018a4a3..119d894aef13 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -46,6 +46,7 @@ import { SystemPrompt } from "../../src/session/system" import { Shell } from "@opencode-ai/core/shell" import { Snapshot } from "../../src/snapshot" import { ToolRegistry } from "@/tool/registry" +import { InstanceLayer } from "@/project/instance-layer" import { Truncate } from "@/tool/truncate" import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" import { Ripgrep } from "@opencode-ai/core/ripgrep" @@ -193,6 +194,7 @@ function makePrompt(input?: { processor?: "blocking" }) { Layer.provide(Ripgrep.defaultLayer), Layer.provide(Format.defaultLayer), Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })), + Layer.provide(InstanceLayer.layer), Layer.provideMerge(todo), Layer.provideMerge(question), Layer.provideMerge(deps),