Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions packages/core/src/session/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,22 @@ export namespace Tool {
})
export type Progress = typeof Progress.Type

export const ProgressReport = EventV2.define({
type: "session.next.tool.progress.report",
...options,
schema: {
...ToolBase,
report: Schema.Struct({
progress: Schema.Number,
total: Schema.Number.pipe(Schema.optional),
message: Schema.String.pipe(Schema.optional),
source: Schema.String.pipe(Schema.optional),
}),
structured: Schema.Record(Schema.String, Schema.Any),
},
})
export type ProgressReport = typeof ProgressReport.Type

export const Success = EventV2.define({
type: "session.next.tool.success",
...options,
Expand Down Expand Up @@ -489,6 +505,7 @@ const DurableDefinitions = [
Tool.Input.Ended,
Tool.Called,
Tool.Progress,
Tool.ProgressReport,
Tool.Success,
Tool.Failed,
Reasoning.Started,
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/session/message-updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,14 @@ export function update(adapter: Adapter, event: SessionEvent.Event) {
}
})
},
"session.next.tool.progress.report": (event) => {
return updateOwnedAssistant(event.data.assistantMessageID, (draft) => {
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "running") {
match.state.structured = event.data.structured
}
})
},
"session.next.tool.success": (event) => {
return updateOwnedAssistant(event.data.assistantMessageID, (draft) => {
const match = latestTool(draft, event.data.callID)
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/session/projector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ export const layer = Layer.effectDiscard(
yield* events.project(SessionEvent.Tool.Input.Ended, (event) => run(db, event))
yield* events.project(SessionEvent.Tool.Called, (event) => run(db, event))
yield* events.project(SessionEvent.Tool.Progress, (event) => run(db, event))
yield* events.project(SessionEvent.Tool.ProgressReport, (event) => run(db, event))
yield* events.project(SessionEvent.Tool.Success, (event) => run(db, event))
yield* events.project(SessionEvent.Tool.Failed, (event) => run(db, event))
yield* events.project(SessionEvent.Reasoning.Started, (event) => run(db, event))
Expand Down
17 changes: 17 additions & 0 deletions packages/core/test/session-tool-progress.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ describe("Tool.Progress", () => {
state: { status: "running", structured: { phase: "checkpoint" }, content: content("saved") },
})

yield* service.publish(SessionEvent.Tool.ProgressReport, {
sessionID,
timestamp,
assistantMessageID,
callID: "call-success",
report: { progress: 2, total: 4, message: "halfway", source: "mcp" },
structured: { source: "mcp", progress: 2, total: 4, message: "halfway" },
})
expect((yield* readAssistant).content[0]).toMatchObject({
state: {
status: "running",
structured: { source: "mcp", progress: 2, total: 4, message: "halfway" },
content: content("saved"),
},
})

const success = yield* service.publish(SessionEvent.Tool.Success, {
sessionID,
timestamp,
Expand Down Expand Up @@ -153,6 +169,7 @@ describe("Tool.Progress", () => {
.all()
.pipe(Effect.orDie)
expect(rows.map((row) => row.type)).toContain(EventV2.versionedType(SessionEvent.Tool.Progress.type, 1))
expect(rows.map((row) => row.type)).toContain(EventV2.versionedType(SessionEvent.Tool.ProgressReport.type, 1))
expect(rows.map((row) => row.type)).toContain(EventV2.versionedType(SessionEvent.Tool.Success.type, 1))
expect(rows.map((row) => row.type)).toContain(EventV2.versionedType(SessionEvent.Tool.Failed.type, 1))
}),
Expand Down
5 changes: 5 additions & 0 deletions packages/opencode/src/mcp/catalog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Client } from "@modelcontextprotocol/sdk/client/index.js"
import {
CallToolResultSchema,
ListToolsResultSchema,
type Progress,
ToolSchema,
type Tool as MCPToolDef,
} from "@modelcontextprotocol/sdk/types.js"
Expand All @@ -11,6 +12,9 @@ import { Effect } from "effect"
const DEFAULT_TIMEOUT = 30_000
const MAX_LIST_PAGES = 1_000

export type ProgressCallback = (progress: Progress) => void
type ToolExecutionOptionsWithProgress = { onprogress?: ProgressCallback }

const TolerantListToolsResultSchema = ListToolsResultSchema.extend({
tools: ToolSchema.omit({ outputSchema: true }).array(),
})
Expand Down Expand Up @@ -61,6 +65,7 @@ export function convertTool(mcpTool: MCPToolDef, client: Client, timeout?: numbe
resetTimeoutOnProgress: true,
signal: options.abortSignal,
timeout,
onprogress: (progress) => (options as ToolExecutionOptionsWithProgress).onprogress?.(progress),
},
)
if (result.isError) throw new Error(formatToolErrorContent(result.content))
Expand Down
13 changes: 8 additions & 5 deletions packages/opencode/src/session/prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1131,8 +1131,8 @@ export const layer = Layer.effect(
throw new Error("Impossible")
})

const runLoop: (sessionID: SessionID) => Effect.Effect<SessionV1.WithParts> = Effect.fn("SessionPrompt.run")(
function* (sessionID: SessionID) {
const runLoop: (sessionID: SessionID) => Effect.Effect<SessionV1.WithParts, never, EventV2Bridge.Service> =
Effect.fn("SessionPrompt.run")(function* (sessionID: SessionID) {
const ctx = yield* InstanceState.context
let structured: unknown
let step = 0
Expand Down Expand Up @@ -1398,13 +1398,16 @@ export const layer = Layer.effect(

yield* compaction.prune({ sessionID }).pipe(Effect.ignore, Effect.forkIn(scope))
return yield* lastAssistant(sessionID)
},
)
})

const loop: (input: LoopInput) => Effect.Effect<SessionV1.WithParts> = Effect.fn("SessionPrompt.loop")(function* (
input: LoopInput,
) {
return yield* state.ensureRunning(input.sessionID, lastAssistant(input.sessionID), runLoop(input.sessionID))
return yield* state.ensureRunning(
input.sessionID,
lastAssistant(input.sessionID),
runLoop(input.sessionID).pipe(Effect.provideService(EventV2Bridge.Service, events)),
)
})

const shell: (input: ShellInput) => Effect.Effect<SessionV1.WithParts, Session.BusyError> = Effect.fn(
Expand Down
41 changes: 39 additions & 2 deletions packages/opencode/src/session/tools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { SessionV1 } from "@opencode-ai/core/v1/session"
import { Provider } from "@/provider/provider"
import { ProviderTransform } from "@/provider/transform"
import { MCP } from "@/mcp"
import type { ProgressCallback } from "@/mcp/catalog"
import { Permission } from "@/permission"
import { Tool } from "@/tool/tool"
import { ToolJsonSchema } from "@/tool/json-schema"
Expand All @@ -12,14 +13,35 @@ import { Truncate } from "@/tool/truncate"
import { Plugin } from "@/plugin"
import type { TaskPromptOps } from "@/tool/task"
import { type Tool as AITool, tool, jsonSchema, type ToolExecutionOptions, asSchema } from "ai"
import { Effect } from "effect"
import { DateTime, Effect } from "effect"
import { MessageV2 } from "./message-v2"
import { Session } from "./session"
import { SessionProcessor } from "./processor"
import { PartID } from "./schema"
import { EffectBridge } from "@/effect/bridge"
import { ProviderV2 } from "@opencode-ai/core/provider"
import { ModelV2 } from "@opencode-ai/core/model"
import { EventV2Bridge } from "@/event-v2-bridge"
import { SessionEvent } from "@opencode-ai/core/session/event"
import { SessionMessage } from "@opencode-ai/core/session/message"

export function mcpProgressToToolProgressReport(progress: Parameters<ProgressCallback>[0]) {
const structured: Record<string, unknown> = {
source: "mcp",
progress: progress.progress,
...(progress.total !== undefined ? { total: progress.total } : {}),
...(progress.message !== undefined ? { message: progress.message } : {}),
}
return {
report: {
progress: progress.progress,
...(progress.total !== undefined ? { total: progress.total } : {}),
...(progress.message !== undefined ? { message: progress.message } : {}),
source: "mcp",
},
structured,
}
}

export const resolve = Effect.fn("SessionTools.resolve")(function* (input: {
agent: Agent.Info
Expand All @@ -37,6 +59,7 @@ export const resolve = Effect.fn("SessionTools.resolve")(function* (input: {
const registry = yield* ToolRegistry.Service
const mcp = yield* MCP.Service
const truncate = yield* Truncate.Service
const events = yield* EventV2Bridge.Service

const context = (args: Record<string, unknown>, options: ToolExecutionOptions): Tool.Context => ({
sessionID: input.session.id,
Expand Down Expand Up @@ -132,7 +155,21 @@ export const resolve = Effect.fn("SessionTools.resolve")(function* (input: {
)
const result: Awaited<ReturnType<NonNullable<typeof execute>>> = yield* Effect.gen(function* () {
yield* ctx.ask({ permission: key, metadata: {}, patterns: ["*"], always: ["*"] })
return yield* Effect.promise(() => execute(args, opts))
const onprogress: ProgressCallback = (progress) => {
const state = mcpProgressToToolProgressReport(progress)
void run.promise(
events
.publish(SessionEvent.Tool.ProgressReport, {
sessionID: ctx.sessionID,
assistantMessageID: SessionMessage.ID.make(input.processor.message.id),
callID: opts.toolCallId,
timestamp: DateTime.makeUnsafe(Date.now()),
...state,
})
.pipe(Effect.ignore),
)
}
return yield* Effect.promise(() => execute(args, { ...opts, onprogress } as never))
}).pipe(
Effect.withSpan("Tool.execute", {
attributes: {
Expand Down
54 changes: 54 additions & 0 deletions packages/opencode/test/mcp/lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ interface MockClientState {
listResourcesCalls: number
getPromptTimeout?: number
readResourceTimeout?: number
callToolOptions?: {
resetTimeoutOnProgress?: boolean
signal?: AbortSignal
timeout?: number
onprogress?: (progress: { progress: number; total?: number; message?: string }) => void
}
requestCalls: number
listToolsShouldFail: boolean
listToolsError: string
Expand Down Expand Up @@ -234,6 +240,12 @@ void mock.module("@modelcontextprotocol/sdk/client/index.js", () => ({
return { contents: [{ uri: params.uri, text: "test" }] }
}

async callTool(_params: unknown, _schema: unknown, options?: MockClientState["callToolOptions"]) {
if (this._state) this._state.callToolOptions = options
options?.onprogress?.({ progress: 2, total: 4, message: "halfway" })
return { content: [{ type: "text", text: "ok" }] }
}

async close() {
if (this._state) this._state.closed = true
}
Expand Down Expand Up @@ -923,6 +935,48 @@ it.instance(
{ config: { mcp: {} } },
)

it.instance(
"MCP tool calls include a progress handler so progress can reset timeouts",
() =>
MCP.Service.use((mcp: MCPNS.Interface) =>
Effect.gen(function* () {
lastCreatedClientName = "progress-server"
const serverState = getOrCreateClientState("progress-server")
serverState.capabilities = { tools: {} }

yield* mcp.add("progress-server", {
type: "local",
command: ["echo", "test"],
timeout: 2500,
})

const tools = yield* mcp.tools()
const tool = tools["progress-server_test_tool"]
expect(tool).toBeDefined()

const abort = new AbortController()
let progress: unknown
yield* Effect.promise(() =>
tool.execute?.({}, {
toolCallId: "call_1",
messages: [],
abortSignal: abort.signal,
onprogress: (value: unknown) => {
progress = value
},
} as never),
)

expect(serverState.callToolOptions?.resetTimeoutOnProgress).toBe(true)
expect(serverState.callToolOptions?.signal).toBe(abort.signal)
expect(serverState.callToolOptions?.timeout).toBe(2500)
expect(typeof serverState.callToolOptions?.onprogress).toBe("function")
expect(progress).toEqual({ progress: 2, total: 4, message: "halfway" })
}),
),
{ config: { mcp: {} } },
)

it.instance(
"prompts() skips disconnected servers",
() =>
Expand Down
21 changes: 21 additions & 0 deletions packages/opencode/test/session/tools.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { describe, expect, it } from "bun:test"
import { mcpProgressToToolProgressReport } from "../../src/session/tools"

describe("SessionTools MCP progress", () => {
it("maps MCP progress into a status report, not partial tool output", () => {
expect(mcpProgressToToolProgressReport({ progress: 2, total: 4, message: "halfway" })).toEqual({
report: {
progress: 2,
total: 4,
message: "halfway",
source: "mcp",
},
structured: {
source: "mcp",
progress: 2,
total: 4,
message: "halfway",
},
})
})
})
Loading
Loading