Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 96 additions & 16 deletions packages/app/src/context/server-sync.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,50 @@ import type { ServerScope } from "@/utils/server-scope"
import { persisted } from "@/utils/persist"
import { toggleMcp } from "./global-sync/mcp"

// Map a directory-scoped event to the session/message it concerns, so the listener can route it to
// the store that actually holds that session even when the event's own instance directory has no
// registered store. This is the cross-instance subagent case: a child anchored in a monorepo
// subproject (directory=<subproject>) the UI never opened as its own project — the child is loaded
// into and displayed from its PARENT project's store (via childSessions). Directory-level events
// (server.instance.disposed / vcs / lsp) return no ids and must NOT use the fallback.
function eventRoutingIdentity(event: { type: string; properties?: unknown }): {
sessionID?: string
messageID?: string
} {
const p = (event.properties ?? {}) as {
info?: { id?: string; sessionID?: string }
part?: { sessionID?: string; messageID?: string }
sessionID?: string
messageID?: string
}
switch (event.type) {
case "session.created":
case "session.updated":
case "session.deleted":
return { sessionID: p.info?.id }
case "message.updated":
return { sessionID: p.info?.sessionID, messageID: p.info?.id }
case "message.part.updated":
return { sessionID: p.part?.sessionID, messageID: p.part?.messageID }
case "message.removed":
return { sessionID: p.sessionID, messageID: p.messageID }
case "message.part.removed":
case "message.part.delta":
return { messageID: p.messageID }
case "session.diff":
case "todo.updated":
case "session.status":
case "permission.asked":
case "permission.replied":
case "question.asked":
case "question.replied":
case "question.rejected":
return { sessionID: p.sessionID }
default:
return {}
}
}

type GlobalStore = {
ready: boolean
error?: InitError
Expand Down Expand Up @@ -394,22 +438,58 @@ export function createServerSyncContextInner(serverSDK: ServerSDK) {
}

const existing = children.children[key]
if (!existing) return
children.mark(key)
const [store, setStore] = existing
applyDirectoryEvent({
event,
directory,
store,
setStore,
push: queue.push,
setSessionTodo,
retainedLimit: sessionMeta.get(key)?.limit,
vcsCache: children.vcsCache.get(key),
loadLsp: () => {
void queryClient.fetchQuery(queryOptionsApi.lsp(key))
},
})
if (existing) {
children.mark(key)
const [store, setStore] = existing
applyDirectoryEvent({
event,
directory,
store,
setStore,
push: queue.push,
setSessionTodo,
retainedLimit: sessionMeta.get(key)?.limit,
vcsCache: children.vcsCache.get(key),
loadLsp: () => {
void queryClient.fetchQuery(queryOptionsApi.lsp(key))
},
})
return
}

// Fallback: the event's own instance directory has no registered store. Happens for a subagent
// child anchored in a subproject (directory=<subproject>) that the UI never opened as its own
// project — the child session is loaded into and displayed from its PARENT project's store.
// Without this, the child's live events (message/part deltas, status, questions) are dropped and
// the child view stays frozen until it is re-entered (which re-fetches a snapshot). Route such
// session-scoped events to every registered store that already holds that session/message.
const identity = eventRoutingIdentity(event)
if (identity.sessionID === undefined && identity.messageID === undefined) return
for (const targetKey of Object.keys(children.children)) {
const target = children.children[targetKey]
if (!target) continue
const [store, setStore] = target
const holdsSession =
identity.sessionID !== undefined &&
(store.message[identity.sessionID] !== undefined || store.session.some((s) => s.id === identity.sessionID))
const holdsMessage = identity.messageID !== undefined && store.part[identity.messageID] !== undefined
if (!holdsSession && !holdsMessage) continue
const targetDirKey = directoryKey(targetKey)
children.mark(targetKey)
applyDirectoryEvent({
event,
directory: targetKey,
store,
setStore,
push: queue.push,
setSessionTodo,
retainedLimit: sessionMeta.get(targetDirKey)?.limit,
vcsCache: children.vcsCache.get(targetDirKey),
loadLsp: () => {
void queryClient.fetchQuery(queryOptionsApi.lsp(targetDirKey))
},
})
}
})

onCleanup(unsub)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { EventV2Bridge } from "@/event-v2-bridge"
import { InstanceState } from "@/effect/instance-state"
import { GlobalBus } from "@/bus/global"
import { EventV2 } from "@opencode-ai/core/event"
import { Session } from "@/session/session"
import { SessionID } from "@/session/schema"
import { Effect, Queue } from "effect"
import * as Stream from "effect/Stream"
import { HttpServerResponse } from "effect/unstable/http"
Expand Down Expand Up @@ -31,13 +33,73 @@ function eventResponse(events: EventV2.Interface) {
const queue = yield* Queue.unbounded<EventV2.Payload>()
const unsubscribe = yield* events.listen((event) => Effect.sync(() => Queue.offerUnsafe(queue, event)))
yield* Effect.addFinalizer(() => unsubscribe)
const sessions = yield* Session.Service

// OPENCODE_EVENT_SUBTREE (default on): a subscription on directory D receives events from every
// session whose tree is ROOTED in D — the open session plus all of its (possibly cross-directory)
// descendant subagents. This is what lets a monorepo-root TUI observe a subagent launched in a
// subproject (live progress, child-session navigation, live questions). Matching is by session
// lineage (parentID), NOT by the child's own directory, so it is robust to subagents anchored
// anywhere — including worktrees outside the subscription directory. Events without an owning
// session (global/instance events) stay scoped to the exact instance directory. Set
// OPENCODE_EVENT_SUBTREE=0 to restore strict matching (events only for sessions whose own
// directory equals the subscription directory).
// TODO(PR anomalyco/opencode#29271): revisit this env gate before upstreaming (opt-in or drop it).
const subtreeEvents = !["0", "false", "off", "no"].includes(
(process.env["OPENCODE_EVENT_SUBTREE"] ?? "").toLowerCase(),
)

// Memoized lineage resolver: maps a session id to the directory of its root ancestor (the
// subscription anchor for the whole tree). Cached per connection; each session is walked at most
// once, so subsequent events for it are an O(1) map lookup.
const rootDirectoryCache = new Map<string, string>()
const rootDirectoryOf = (sessionID: string): Effect.Effect<string | undefined> =>
Effect.gen(function* () {
const chain: string[] = []
let current: string | undefined = sessionID
let root: string | undefined = undefined
while (current) {
const cached = rootDirectoryCache.get(current)
if (cached) {
root = cached
break
}
if (chain.includes(current)) break // cycle guard (should not happen)
chain.push(current)
const info: Session.Info | undefined = yield* sessions
.get(SessionID.make(current))
.pipe(Effect.catchCause(() => Effect.succeed(undefined)))
if (!info) break
if (!info.parentID) {
root = info.directory
break
}
current = info.parentID
}
if (root) for (const id of chain) rootDirectoryCache.set(id, root)
return root
})

const sessionIDOf = (event: EventV2.Payload): string | undefined => {
const sid = (event.data as { sessionID?: unknown } | undefined)?.sessionID
return typeof sid === "string" ? sid : undefined
}

const matches = (event: EventV2.Payload): Effect.Effect<boolean> =>
Effect.gen(function* () {
if (event.location?.workspaceID !== undefined && event.location.workspaceID !== workspaceID) return false
const sessionID = sessionIDOf(event)
// Global/instance events (no owning session) keep the historical exact-directory behaviour.
if (sessionID === undefined) return event.location?.directory === instance.directory
if (!subtreeEvents) return event.location?.directory === instance.directory
const root = yield* rootDirectoryOf(sessionID)
return root === instance.directory
})

const stream = Stream.fromQueue(queue).pipe(
Stream.filter(
(event) =>
event.location?.directory === instance.directory &&
(event.location.workspaceID === undefined || event.location.workspaceID === workspaceID),
),
Stream.map((event) => ({ id: event.id, type: event.type, properties: event.data })),
Stream.mapEffect((event) => matches(event).pipe(Effect.map((keep) => ({ event, keep })))),
Stream.filter(({ keep }) => keep),
Stream.map(({ event }) => ({ id: event.id, type: event.type, properties: event.data })),
)
const disposed = Stream.callback<{ id: string; type: string; properties: unknown }>((queue) => {
const listener = (event: {
Expand Down
4 changes: 4 additions & 0 deletions packages/opencode/src/tool/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import { Agent } from "../agent/agent"
import { Skill } from "../skill"
import { Permission } from "@/permission"
import { BackgroundJob } from "@/background/job"
import { InstanceStore } from "@/project/instance-store"
import { InstanceLayer } from "@/project/instance-layer"
import { RuntimeFlags } from "@/effect/runtime-flags"
import { ProviderV2 } from "@opencode-ai/core/provider"
import { ModelV2 } from "@opencode-ai/core/model"
Expand Down Expand Up @@ -335,6 +337,7 @@ export const defaultLayer = Layer.suspend(() =>
Layer.provide(Format.defaultLayer),
Layer.provide(CrossSpawnSpawner.defaultLayer),
Layer.provide(Truncate.defaultLayer),
Layer.provide(InstanceLayer.layer),
)
.pipe(Layer.provide(Database.defaultLayer), Layer.provide(RuntimeFlags.defaultLayer)),
)
Expand Down Expand Up @@ -435,6 +438,7 @@ export const node = LayerNode.make(layer.pipe(Layer.provide(Ripgrep.defaultLayer
Truncate.node,
RuntimeFlags.node,
Database.node,
InstanceStore.node,
])

export * as ToolRegistry from "./registry"
61 changes: 40 additions & 21 deletions packages/opencode/src/tool/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { Effect, Exit, Schema, Scope } from "effect"
import { EffectBridge } from "@/effect/bridge"
import { RuntimeFlags } from "@/effect/runtime-flags"
import { Database } from "@opencode-ai/core/database/database"
import { InstanceStore } from "@/project/instance-store"

export interface TaskPromptOps {
cancel(sessionID: SessionID): Effect.Effect<void>
Expand Down Expand Up @@ -44,6 +45,10 @@ const BaseParameterFields = {
description: Schema.String.annotate({ description: "A short (3-5 words) description of the task" }),
prompt: Schema.String.annotate({ description: "The task for the agent to perform" }),
subagent_type: Schema.String.annotate({ description: "The type of specialized agent to use for this task" }),
directory: Schema.optional(Schema.String).annotate({
description:
"Optional absolute path to the working directory for the subagent. When set, the subagent runs rooted at this path: its tools, agent registry, AGENTS.md chain and skill walk-up start from there instead of inheriting the parent session's directory. Omit to inherit the parent session's directory.",
}),
task_id: Schema.optional(Schema.String).annotate({
description:
"This should only be set if you mean to resume a previous task (you can pass a prior task_id and the task will continue the same subagent session as before instead of creating a fresh one)",
Expand Down Expand Up @@ -88,6 +93,7 @@ export const TaskTool = Tool.define(
const scope = yield* Scope.Scope
const flags = yield* RuntimeFlags.Service
const database = yield* Database.Service
const instanceStore = yield* InstanceStore.Service

const run = Effect.fn("TaskTool.execute")(function* (
params: Schema.Schema.Type<typeof Parameters>,
Expand All @@ -113,14 +119,23 @@ export const TaskTool = Tool.define(
})
}

const next = yield* agent.get(params.subagent_type)
const session = params.task_id
? yield* sessions.get(SessionID.make(params.task_id)).pipe(Effect.catchCause(() => Effect.succeed(undefined)))
: undefined

// Directory anchoring: an explicit `directory` wins; on resume we fall back to the child
// session's stored directory. When set, the agent is resolved, the child session is created
// and its prompt loop runs inside that directory's instance (so project-local agents, skills
// and AGENTS.md resolve via walk-up from there). Omitted → inherit the parent's instance.
const directory = params.directory ?? session?.directory
const provideInstance = <A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> =>
directory ? instanceStore.provide({ directory }, effect) : effect

const next = yield* provideInstance(agent.get(params.subagent_type))
if (!next) {
return yield* Effect.fail(new Error(`Unknown agent type: ${params.subagent_type} is not a valid agent type`))
}

const session = params.task_id
? yield* sessions.get(SessionID.make(params.task_id)).pipe(Effect.catchCause(() => Effect.succeed(undefined)))
: undefined
const parent = yield* sessions.get(ctx.sessionID)
const childPermission = deriveSubagentSessionPermission({
parentSessionPermission: parent.permission ?? [],
Expand All @@ -141,21 +156,25 @@ export const TaskTool = Tool.define(
]
const nextSession =
session ??
(yield* sessions.create({
parentID: ctx.sessionID,
title: params.description + ` (@${next.name} subagent)`,
agent: next.name,
permission: [
...childPermission,
...childToolDenies.filter(
(deny) =>
!childPermission.some(
(rule) =>
rule.permission === deny.permission && rule.pattern === deny.pattern && rule.action === deny.action,
),
),
],
}))
(yield* provideInstance(
sessions.create({
parentID: ctx.sessionID,
title: params.description + ` (@${next.name} subagent)`,
agent: next.name,
permission: [
...childPermission,
...childToolDenies.filter(
(deny) =>
!childPermission.some(
(rule) =>
rule.permission === deny.permission &&
rule.pattern === deny.pattern &&
rule.action === deny.action,
),
),
],
}),
))

const msg = yield* MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID }).pipe(
Effect.provideService(Database.Service, database),
Expand Down Expand Up @@ -239,7 +258,7 @@ export const TaskTool = Tool.define(
)
})

if (yield* background.extend({ id: nextSession.id, run: runTask() })) {
if (yield* background.extend({ id: nextSession.id, run: provideInstance(runTask()) })) {
return {
title: params.description,
metadata: {
Expand Down Expand Up @@ -268,7 +287,7 @@ export const TaskTool = Tool.define(
}),
notify(nextSession.id),
]),
run: runTask().pipe(Effect.onInterrupt(() => ops.cancel(nextSession.id))),
run: provideInstance(runTask()).pipe(Effect.onInterrupt(() => ops.cancel(nextSession.id))),
})

function backgroundResult() {
Expand Down
2 changes: 2 additions & 0 deletions packages/opencode/test/session/prompt.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import { SystemPrompt } from "../../src/session/system"
import { Shell } from "@opencode-ai/core/shell"
import { Snapshot } from "../../src/snapshot"
import { ToolRegistry } from "@/tool/registry"
import { InstanceLayer } from "@/project/instance-layer"
import { Truncate } from "@/tool/truncate"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { Ripgrep } from "@opencode-ai/core/ripgrep"
Expand Down Expand Up @@ -193,6 +194,7 @@ function makePrompt(input?: { processor?: "blocking" }) {
Layer.provide(Ripgrep.defaultLayer),
Layer.provide(Format.defaultLayer),
Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })),
Layer.provide(InstanceLayer.layer),
Layer.provideMerge(todo),
Layer.provideMerge(question),
Layer.provideMerge(deps),
Expand Down
Loading