From 47ef2f940acc13d1dd46c23920d49f90134c1b42 Mon Sep 17 00:00:00 2001 From: ualtinok Date: Mon, 1 Jun 2026 06:38:34 +0200 Subject: [PATCH] fix(session): fire system transform before messages transform --- packages/opencode/src/agent/agent.ts | 5 - packages/opencode/src/session/compaction.ts | 89 +++- packages/opencode/src/session/llm.ts | 31 ++ packages/opencode/src/session/llm/request.ts | 23 +- packages/opencode/src/session/prompt.ts | 90 +++- .../opencode/test/session/compaction.test.ts | 190 +++++++- packages/opencode/test/session/prompt.test.ts | 413 +++++++++++++++++- packages/plugin/src/index.ts | 2 +- 8 files changed, 757 insertions(+), 86 deletions(-) diff --git a/packages/opencode/src/agent/agent.ts b/packages/opencode/src/agent/agent.ts index b1430314fffe..c815504135b3 100644 --- a/packages/opencode/src/agent/agent.ts +++ b/packages/opencode/src/agent/agent.ts @@ -18,7 +18,6 @@ import { Permission } from "@/permission" import { mergeDeep, pipe, sortBy, values } from "remeda" import { Global } from "@opencode-ai/core/global" import path from "path" -import { Plugin } from "@/plugin" import { Skill } from "../skill" import { Effect, Context, Layer, Schema } from "effect" import { InstanceState } from "@/effect/instance-state" @@ -90,7 +89,6 @@ export const layer = Layer.effect( Effect.gen(function* () { const config = yield* Config.Service const auth = yield* Auth.Service - const plugin = yield* Plugin.Service const skill = yield* Skill.Service const provider = yield* Provider.Service const locations = yield* LocationServiceMap @@ -376,7 +374,6 @@ export const layer = Layer.effect( : undefined const system = [PROMPT_GENERATE] - yield* plugin.trigger("experimental.chat.system.transform", { model: resolved }, { system }) const existing = yield* InstanceState.useEffect(state, (s) => s.list()) // TODO: clean this up so provider specific logic doesnt bleed over @@ -437,7 +434,6 @@ export const layer = Layer.effect( ) export const defaultLayer = layer.pipe( - Layer.provide(Plugin.defaultLayer), Layer.provide(Provider.defaultLayer), Layer.provide(Auth.defaultLayer), Layer.provide(Config.defaultLayer), @@ -450,7 +446,6 @@ const locationServiceMapNode = LayerNode.make(LocationServiceMap.layer, []) export const node = LayerNode.make(layer, [ Config.node, Auth.node, - Plugin.node, Skill.node, Provider.node, locationServiceMapNode, diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index c7ac963c690e..0dc256621f95 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -5,6 +5,7 @@ import { Session } from "./session" import { SessionID, MessageID, PartID } from "./schema" import { Provider } from "@/provider/provider" import { MessageV2 } from "./message-v2" +import { LLM } from "./llm" import { Token } from "@/util/token" import { SessionProcessor } from "./processor" import { Agent } from "@/agent/agent" @@ -12,7 +13,7 @@ import { Plugin } from "@/plugin" import { Config } from "@/config/config" import { NotFoundError } from "@/storage/storage" -import { Effect, Layer, Context } from "effect" +import { Cause, Effect, Layer, Context } from "effect" import * as DateTime from "effect/DateTime" import { InstanceState } from "@/effect/instance-state" import { isOverflow as overflow, usable } from "./overflow" @@ -25,6 +26,7 @@ import { ProviderV2 } from "@opencode-ai/core/provider" import { ModelV2 } from "@opencode-ai/core/model" import { EventV2 } from "@opencode-ai/core/event" import { buildPrompt } from "@opencode-ai/core/session/compaction" +import { SessionStatus } from "./status" export const Event = { Compacted: EventV2.define({ @@ -174,6 +176,7 @@ export const layer = Layer.effect( const provider = yield* Provider.Service const events = yield* EventV2Bridge.Service const flags = yield* RuntimeFlags.Service + const status = yield* SessionStatus.Service const isOverflow = Effect.fn("SessionCompaction.isOverflow")(function* (input: { tokens: SessionV1.Assistant["tokens"] @@ -349,19 +352,7 @@ export const layer = Layer.effect( cfg, model, }) - // Allow plugins to inject context or replace compaction prompt. - const compacting = yield* plugin.trigger( - "experimental.session.compacting", - { sessionID: input.sessionID }, - { context: [], prompt: undefined }, - ) - const nextPrompt = compacting.prompt ?? buildPrompt({ previousSummary, context: compacting.context }) const msgs = structuredClone(selected.head) - yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs }) - const modelMessages = yield* MessageV2.toModelMessagesEffect(msgs, model, { - stripMedia: true, - toolOutputMaxChars: TOOL_OUTPUT_MAX_CHARS, - }) const tailIndex = selected.tail_start_id ? history.findIndex((message) => message.info.id === selected.tail_start_id) : -1 @@ -402,22 +393,78 @@ export const layer = Layer.effect( }, } yield* session.updateMessage(msg) - const processor = yield* processors.create({ - assistantMessage: msg, - sessionID: input.sessionID, - model, + const system = LLM.buildSystem({ agent, model, parts: [], user: userMessage }) + const removeInterruptedSummary = Effect.gen(function* () { + yield* session.removeMessage({ sessionID: input.sessionID, messageID: msg.id }).pipe(Effect.ignore) + yield* status.set(input.sessionID, { type: "idle" }) }) + const finalizeFailedSummary = (error: unknown) => + Effect.gen(function* () { + msg.error = MessageV2.fromError(error, { providerID: model.providerID }) + msg.finish = "error" + msg.time.completed = Date.now() + yield* session.updateMessage(msg) + yield* events.publish(Session.Event.Error, { sessionID: input.sessionID, error: msg.error }) + yield* status.set(input.sessionID, { type: "idle" }) + }) + const prepared = yield* Effect.gen(function* () { + // Allow plugins to inject context or replace compaction prompt. + const compacting = yield* plugin.trigger( + "experimental.session.compacting", + { sessionID: input.sessionID }, + { context: [], prompt: undefined }, + ) + const systemHeader = system[0] + yield* plugin.trigger( + "experimental.chat.system.transform", + { sessionID: input.sessionID, model }, + { system }, + ) + LLM.rejoinSystemForCaching(system, systemHeader) + yield* plugin.trigger( + "experimental.chat.messages.transform", + { sessionID: input.sessionID, model: { providerID: model.providerID, modelID: model.id } }, + { messages: msgs }, + ) + const messages = yield* MessageV2.toModelMessagesEffect(msgs, model, { + stripMedia: true, + toolOutputMaxChars: TOOL_OUTPUT_MAX_CHARS, + }) + return { + messages, + nextPrompt: compacting.prompt ?? buildPrompt({ previousSummary, context: compacting.context }), + } + }).pipe( + Effect.catchCauseIf((cause) => !Cause.hasInterruptsOnly(cause), (cause) => + finalizeFailedSummary(Cause.squash(cause)).pipe(Effect.as(undefined)), + ), + Effect.onInterrupt(() => removeInterruptedSummary), + ) + if (!prepared) return "stop" + const processor = yield* processors + .create({ + assistantMessage: msg, + sessionID: input.sessionID, + model, + }) + .pipe( + Effect.catchCauseIf((cause) => !Cause.hasInterruptsOnly(cause), (cause) => + finalizeFailedSummary(Cause.squash(cause)).pipe(Effect.as(undefined)), + ), + Effect.onInterrupt(() => removeInterruptedSummary), + ) + if (!processor) return "stop" const result = yield* processor.process({ user: userMessage, agent, sessionID: input.sessionID, tools: {}, - system: [], + system, messages: [ - ...modelMessages, + ...prepared.messages, { role: "user", - content: [{ type: "text", text: nextPrompt }], + content: [{ type: "text", text: prepared.nextPrompt }], }, ], model, @@ -603,6 +650,7 @@ export const defaultLayer = Layer.suspend(() => Layer.provide(Config.defaultLayer), Layer.provide(RuntimeFlags.defaultLayer), Layer.provide(EventV2Bridge.defaultLayer), + Layer.provide(SessionStatus.defaultLayer), ), ) @@ -615,6 +663,7 @@ export const node = LayerNode.make(layer, [ Provider.node, EventV2Bridge.node, RuntimeFlags.node, + SessionStatus.node, ]) export * as SessionCompaction from "./compaction" diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index adacfc431549..625da2b6a6d9 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -29,6 +29,7 @@ import * as OtelTracer from "@effect/opentelemetry/Tracer" import { LLMAISDK } from "./llm/ai-sdk" import { LLMNativeRuntime } from "./llm/native-runtime" import { LLMRequestPrep } from "./llm/request" +import { SystemPrompt } from "./system" export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX @@ -39,6 +40,7 @@ export type StreamInput = { model: Provider.Model agent: Agent.Info permission?: PermissionV1.Ruleset + /** Final system prompt for this request. Callers own system assembly and chat system-transform hooks. */ system: string[] messages: ModelMessage[] small?: boolean @@ -51,6 +53,35 @@ export type StreamRequest = StreamInput & { abort: AbortSignal } + +/** Assemble final system prompt from agent/provider prompt, custom parts, and user override. */ +export function buildSystem(input: { + agent: Agent.Info + model: Provider.Model + parts: string[] + user?: { system?: string } +}): string[] { + return [ + [ + ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)), + ...input.parts, + ...(input.user?.system ? [input.user.system] : []), + ] + .filter((x) => x) + .join("\n"), + ] +} + +/** Rejoin system prompt into 2-part structure for prompt-prefix caching if header unchanged. */ +export function rejoinSystemForCaching(system: string[], header: string) { + if (system.length > 2 && system[0] === header) { + const rest = system.slice(1) + system.length = 0 + system.push(header, rest.join("\n")) + } +} + + export interface Interface { readonly stream: (input: StreamInput) => Stream.Stream } diff --git a/packages/opencode/src/session/llm/request.ts b/packages/opencode/src/session/llm/request.ts index 2785d9852631..6b1be47848a9 100644 --- a/packages/opencode/src/session/llm/request.ts +++ b/packages/opencode/src/session/llm/request.ts @@ -8,7 +8,6 @@ import type { Agent } from "@/agent/agent" import type { MessageV2 } from "../message-v2" import type { Provider } from "@/provider/provider" import { ProviderTransform } from "@/provider/transform" -import { SystemPrompt } from "../system" import { InstallationVersion } from "@opencode-ai/core/installation/version" import { Effect, Record } from "effect" import { jsonSchema, tool as aiTool, type ModelMessage, type Tool } from "ai" @@ -55,27 +54,7 @@ const mergeOptions = (target: Record, source: Record | export const prepare = Effect.fn("LLMRequestPrep.prepare")(function* (input: PrepareInput) { const isOpenaiOauth = input.provider.id === "openai" && input.auth?.type === "oauth" - const system = [ - [ - ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)), - ...input.system, - ...(input.user.system ? [input.user.system] : []), - ] - .filter((x) => x) - .join("\n"), - ] - - const header = system[0] - yield* input.plugin.trigger( - "experimental.chat.system.transform", - { sessionID: input.sessionID, model: input.model }, - { system }, - ) - if (system.length > 2 && system[0] === header) { - const rest = system.slice(1) - system.length = 0 - system.push(header, rest.join("\n")) - } + const system = input.system const variant = !input.small && input.model.variants && input.user.model.variant diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index dad796c998ad..e2c14ed2d63f 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -57,10 +57,12 @@ import { AgentAttachment, FileAttachment, Prompt, Source } from "@opencode-ai/co import * as DateTime from "effect/DateTime" import { eq } from "drizzle-orm" import { SessionTable } from "@opencode-ai/core/session/sql" +import { errorMessage } from "@/util/error" import { SessionReminders } from "./reminders" import { SessionTools } from "./tools" import { LLMEvent } from "@opencode-ai/llm" + // @ts-ignore globalThis.AI_SDK_LOG_WARNINGS = false @@ -206,11 +208,14 @@ export const layer = Layer.effect( const msgs = onlySubtasks ? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }] : yield* MessageV2.toModelMessagesEffect(context, mdl) + // Title generation is an auxiliary request without a matching messages.transform + // hook, so prebuild the system prompt without firing chat system hooks. + const system = LLM.buildSystem({ agent: ag, model: mdl, parts: [], user: firstInfo }) const text = yield* llm .stream({ agent: ag, user: firstInfo, - system: [], + system, small: true, tools: {}, model: mdl, @@ -1188,7 +1193,7 @@ export const layer = Layer.effect( session, modelID: lastUser.model.modelID, providerID: lastUser.model.providerID, - history: msgs, + history: structuredClone(msgs), }).pipe(Effect.ignore, Effect.forkIn(scope)) const model = yield* getModel(lastUser.model.providerID, lastUser.model.modelID, sessionID) @@ -1263,13 +1268,53 @@ export const layer = Layer.effect( yield* sessions.updateMessage(msg) }) + const finalizeFailedAssistant = (error: unknown) => + Effect.gen(function* () { + msg.error = MessageV2.fromError(error, { providerID: model.providerID }) + msg.finish = "error" + msg.time.completed = Date.now() + yield* sessions.updateMessage(msg) + if (flags.experimentalEventSystem) { + const assistantMessageID = SessionMessage.ID.create() + yield* events.publish(SessionEvent.Step.Started, { + sessionID, + assistantMessageID, + agent: msg.agent, + model: { + id: ModelV2.ID.make(msg.modelID), + providerID: ProviderV2.ID.make(msg.providerID), + variant: ModelV2.VariantID.make(msg.variant ?? "default"), + }, + timestamp: DateTime.makeUnsafe(msg.time.created), + }) + yield* events.publish(SessionEvent.Step.Failed, { + sessionID, + assistantMessageID, + error: { + type: "unknown", + message: errorMessage(error), + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } + yield* events.publish(Session.Event.Error, { sessionID, error: msg.error }) + yield* status.set(sessionID, { type: "idle" }) + }) + const handle = yield* processor .create({ assistantMessage: msg, sessionID, model, }) - .pipe(Effect.onInterrupt(() => finalizeInterruptedAssistant)) + .pipe( + Effect.catchCauseIf( + (cause) => !Cause.hasInterruptsOnly(cause), + (cause) => finalizeFailedAssistant(Cause.squash(cause)).pipe(Effect.as(undefined)), + ), + Effect.onInterrupt(() => finalizeInterruptedAssistant), + ) + if (!handle) break const outcome: "break" | "continue" = yield* Effect.gen(function* () { const lastUserMsg = msgs.findLast((m) => m.info.role === "user") @@ -1304,17 +1349,41 @@ export const layer = Layer.effect( if (step === 1) yield* summary.summarize({ sessionID, messageID: lastUser.id }).pipe(Effect.ignore, Effect.forkIn(scope)) - yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs }) - - const [skills, env, instructions, modelMsgs] = yield* Effect.all([ + // Build complete system prompt + const [skills, env, instructions] = yield* Effect.all([ sys.skills(agent), sys.environment(model), instruction.system().pipe(Effect.orDie), - MessageV2.toModelMessagesEffect(msgs, model), ]) - const system = [...env, ...instructions, ...(skills ? [skills] : [])] const format = lastUser.format ?? { type: "text" as const } - if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT) + const system = LLM.buildSystem({ + agent, + model, + parts: [ + ...env, + ...instructions, + ...(skills ? [skills] : []), + ...(format.type === "json_schema" ? [STRUCTURED_OUTPUT_SYSTEM_PROMPT] : []), + ], + user: lastUser, + }) + + // system.transform fires first so plugins can inspect final system state + const systemHeader = system[0] + yield* plugin.trigger( + "experimental.chat.system.transform", + { sessionID, model }, + { system }, + ) + LLM.rejoinSystemForCaching(system, systemHeader) + + // messages.transform fires after so plugins can react to system prompt state + yield* plugin.trigger( + "experimental.chat.messages.transform", + { sessionID, model: { providerID: model.providerID, modelID: model.id } }, + { messages: msgs }, + ) + const modelMsgs = yield* MessageV2.toModelMessagesEffect(msgs, model) const result = yield* handle.process({ user: lastUser, agent, @@ -1374,6 +1443,9 @@ export const layer = Layer.effect( } return "continue" as const }).pipe( + Effect.catchCauseIf((cause) => !Cause.hasInterruptsOnly(cause), (cause) => + finalizeFailedAssistant(Cause.squash(cause)).pipe(Effect.as("break" as const)), + ), Effect.ensuring(instruction.clear(handle.message.id)), Effect.onInterrupt(() => finalizeInterruptedAssistant), ) diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts index 63276bfe197d..1ffdafc9469b 100644 --- a/packages/opencode/test/session/compaction.test.ts +++ b/packages/opencode/test/session/compaction.test.ts @@ -218,6 +218,15 @@ function layer(result: "continue" | "compact") { ) } +function blockingProcessor(ready: Deferred.Deferred) { + return Layer.succeed( + SessionProcessorModule.SessionProcessor.Service, + SessionProcessorModule.SessionProcessor.Service.of({ + create: () => Effect.sync(() => Deferred.doneUnsafe(ready, Effect.void)).pipe(Effect.andThen(Effect.never)), + }), + ) +} + function cfg(compaction?: ConfigV1.Info["compaction"]) { const base = Schema.decodeUnknownSync(ConfigV1.Info)({}) as ConfigV1.Info return TestConfig.layer({ @@ -235,6 +244,7 @@ const deps = Layer.mergeAll( RuntimeFlags.layer({ experimentalEventSystem: true }), Database.defaultLayer, EventV2Bridge.defaultLayer, + SessionStatus.layer.pipe(Layer.provide(EventV2Bridge.defaultLayer)), ) const env = Layer.mergeAll( @@ -259,6 +269,7 @@ type CompactionProcessOptions = { result?: "continue" | "compact" llm?: Layer.Layer plugin?: Layer.Layer + processor?: Layer.Layer provider?: ReturnType config?: Layer.Layer } @@ -270,8 +281,10 @@ function withCompaction(options?: CompactionProcessOptions) { function compactionProcessLayer(options?: CompactionProcessOptions) { const events = EventV2Bridge.defaultLayer const status = SessionStatus.layer.pipe(Layer.provide(events)) - const processor = options?.llm - ? SessionProcessorModule.SessionProcessor.layer.pipe( + const processor = options?.processor + ? options.processor + : options?.llm + ? SessionProcessorModule.SessionProcessor.layer.pipe( Layer.provide(summary), Layer.provide(Image.defaultLayer), Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })), @@ -381,6 +394,37 @@ function autocontinue(enabled: boolean) { }) } +function chatHookOrder(order: string[], inputs: Array<{ name: string; input: unknown }>) { + return Layer.mock(Plugin.Service)({ + trigger: (name: Name, input: Input, output: Output) => + Effect.sync(() => { + if (name === "experimental.chat.system.transform") { + order.push("system") + inputs.push({ name, input }) + } + if (name === "experimental.chat.messages.transform") { + order.push("messages") + inputs.push({ name, input }) + } + return output + }), + list: () => Effect.succeed([]), + init: () => Effect.void, + }) +} + +function failingChatHook(name: string) { + return Layer.mock(Plugin.Service)({ + trigger: (hook: Name, _input: Input, output: Output) => + Effect.sync(() => { + if (hook === name) throw new Error(`${name} failed`) + return output + }), + list: () => Effect.succeed([]), + init: () => Effect.void, + }) +} + describe("session.compaction.isOverflow", () => { it.live( "returns true when token count exceeds usable context", @@ -879,31 +923,93 @@ describe("session.compaction.process", () => { ) itCompaction.instance( - "marks summary message as errored on compact result", - Effect.gen(function* () { - const ssn = yield* SessionNs.Service - const session = yield* ssn.create({}) - const msg = yield* createUserMessage(session.id, "hello") - const msgs = yield* ssn.messages({ sessionID: session.id }) + "fires system transform before messages transform during compaction", + () => { + const order: string[] = [] + const inputs: Array<{ name: string; input: unknown }> = [] + return Effect.gen(function* () { + const ssn = yield* SessionNs.Service + const session = yield* ssn.create({}) + const msg = yield* createUserMessage(session.id, "hello") + const msgs = yield* ssn.messages({ sessionID: session.id }) - const result = yield* SessionCompaction.use.process({ - parentID: msg.id, - messages: msgs, - sessionID: session.id, - auto: false, - }) + const result = yield* SessionCompaction.use.process({ + parentID: msg.id, + messages: msgs, + sessionID: session.id, + auto: false, + }) - const summary = (yield* ssn.messages({ sessionID: session.id })).find( - (msg) => msg.info.role === "assistant" && msg.info.summary, - ) + expect(result).toBe("continue") + expect(order).toEqual(["system", "messages"]) + expect(inputs.find((item) => item.name === "experimental.chat.messages.transform")?.input).toMatchObject({ + sessionID: session.id, + model: { providerID: ProviderV2.ID.make("test"), modelID: ModelV2.ID.make("test-model") }, + }) + }).pipe(withCompaction({ plugin: chatHookOrder(order, inputs) })) + }, + ) - expect(result).toBe("stop") - expect(summary?.info.role).toBe("assistant") - if (summary?.info.role === "assistant") { - expect(summary.info.finish).toBe("error") - expect(JSON.stringify(summary.info.error)).toContain("Session too large to compact") - } - }).pipe(withCompaction({ result: "compact" })), + itCompaction.instance( + "records summary error when chat transform fails before compaction stream", + () => + Effect.gen(function* () { + const ssn = yield* SessionNs.Service + const status = yield* SessionStatus.Service + const session = yield* ssn.create({}) + const msg = yield* createUserMessage(session.id, "hello") + const msgs = yield* ssn.messages({ sessionID: session.id }) + + const result = yield* SessionCompaction.use.process({ + parentID: msg.id, + messages: msgs, + sessionID: session.id, + auto: false, + }) + + const summary = (yield* ssn.messages({ sessionID: session.id })).find( + (item) => item.info.role === "assistant" && item.info.summary, + )?.info + expect(result).toBe("stop") + expect(summary?.role).toBe("assistant") + if (summary?.role === "assistant") { + expect(JSON.stringify(summary.error)).toContain("experimental.chat.system.transform failed") + expect(summary.finish).toBe("error") + expect(summary.time.completed).toBeNumber() + } + expect((yield* status.get(session.id)).type).toBe("idle") + }).pipe(withCompaction({ plugin: failingChatHook("experimental.chat.system.transform") })), + ) + + itCompaction.instance( + "records summary error when messages transform fails before compaction stream", + () => + Effect.gen(function* () { + const ssn = yield* SessionNs.Service + const status = yield* SessionStatus.Service + const session = yield* ssn.create({}) + const msg = yield* createUserMessage(session.id, "hello") + const msgs = yield* ssn.messages({ sessionID: session.id }) + + const result = yield* SessionCompaction.use.process({ + parentID: msg.id, + messages: msgs, + sessionID: session.id, + auto: false, + }) + + const summary = (yield* ssn.messages({ sessionID: session.id })).find( + (item) => item.info.role === "assistant" && item.info.summary, + )?.info + expect(result).toBe("stop") + expect(summary?.role).toBe("assistant") + if (summary?.role === "assistant") { + expect(JSON.stringify(summary.error)).toContain("experimental.chat.messages.transform failed") + expect(summary.finish).toBe("error") + expect(summary.time.completed).toBeNumber() + } + expect((yield* status.get(session.id)).type).toBe("idle") + }).pipe(withCompaction({ plugin: failingChatHook("experimental.chat.messages.transform") })), ) it.instance( @@ -1272,6 +1378,7 @@ describe("session.compaction.process", () => { const ready = yield* Deferred.make() return yield* Effect.gen(function* () { const ssn = yield* SessionNs.Service + const status = yield* SessionStatus.Service const session = yield* ssn.create({}) const msg = yield* createUserMessage(session.id, "hello") const msgs = yield* ssn.messages({ sessionID: session.id }) @@ -1292,11 +1399,46 @@ describe("session.compaction.process", () => { expect(Exit.isFailure(exit)).toBe(true) if (Exit.isFailure(exit)) expect(Cause.hasInterrupts(exit.cause)).toBe(true) expect(all.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(false) + expect((yield* status.get(session.id)).type).toBe("idle") }).pipe(withCompaction({ plugin: plugin(ready) })) }), { git: true }, ) + itCompaction.instance( + "does not leave a summary assistant when interrupted during processor creation", + () => + Effect.gen(function* () { + const ready = yield* Deferred.make() + return yield* Effect.gen(function* () { + const ssn = yield* SessionNs.Service + const status = yield* SessionStatus.Service + const session = yield* ssn.create({}) + const msg = yield* createUserMessage(session.id, "hello") + const msgs = yield* ssn.messages({ sessionID: session.id }) + const fiber = yield* SessionCompaction.use + .process({ + parentID: msg.id, + messages: msgs, + sessionID: session.id, + auto: false, + }) + .pipe(Effect.forkChild) + + yield* Deferred.await(ready).pipe(Effect.timeout("1 second")) + yield* Fiber.interrupt(fiber) + const exit = yield* Fiber.await(fiber).pipe(Effect.timeout("250 millis")) + const all = yield* ssn.messages({ sessionID: session.id }) + + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) expect(Cause.hasInterrupts(exit.cause)).toBe(true) + expect(all.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(false) + expect((yield* status.get(session.id)).type).toBe("idle") + }).pipe(withCompaction({ processor: blockingProcessor(ready) })) + }), + { git: true }, + ) + itCompaction.instance( "silently drops reasoning-delta arriving without prior reasoning-start", () => { diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index 5cd97f78e88a..6793d9d67301 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -131,6 +131,84 @@ const mcp = Layer.succeed( }), ) +function isMessageTransformOutput(output: unknown): output is { messages: SessionV1.WithParts[] } { + if (!output || typeof output !== "object" || !("messages" in output)) return false + return Array.isArray(output.messages) +} + +function messagesTransformPlugin(marker: string) { + return Layer.succeed( + Plugin.Service, + Plugin.Service.of({ + init: () => Effect.void, + list: () => Effect.succeed([]), + trigger: (name, _input, output) => + Effect.sync(() => { + if (name !== "experimental.chat.messages.transform" || !isMessageTransformOutput(output)) return output + const part = output.messages.flatMap((message) => message.parts).find((part) => part.type === "text") + if (part) part.text = `${part.text}\n${marker}` + return output + }), + }), + ) +} + +function hookOrderPlugin(order: string[], inputs?: Array<{ name: string; input: unknown }>) { + return Layer.succeed( + Plugin.Service, + Plugin.Service.of({ + init: () => Effect.void, + list: () => Effect.succeed([]), + trigger: (name, input, output) => + Effect.sync(() => { + if (name === "experimental.chat.system.transform") order.push("system") + if (name === "experimental.chat.messages.transform") order.push("messages") + inputs?.push({ name, input }) + return output + }), + }), + ) +} + +function failingSystemTransformPlugin() { + return Layer.mock(Plugin.Service)({ + trigger: (name: Name, _input: Input, output: Output) => + Effect.sync(() => { + if (name === "experimental.chat.system.transform") throw new Error("system transform failed") + return output + }), + list: () => Effect.succeed([]), + init: () => Effect.void, + }) +} + +function failingMessagesTransformPlugin() { + return Layer.mock(Plugin.Service)({ + trigger: (name: Name, _input: Input, output: Output) => + Effect.sync(() => { + if (name === "experimental.chat.messages.transform") throw new Error("messages transform failed") + return output + }), + list: () => Effect.succeed([]), + init: () => Effect.void, + }) +} + +function failingCompactingPlugin() { + return Layer.mock(Plugin.Service)({ + trigger: (name: Name, _input: Input, output: Output) => + Effect.sync(() => { + if (name === "experimental.session.compacting") throw new Error("session compacting failed") + return output + }), + list: () => Effect.succeed([]), + init: () => Effect.void, + }) +} + +const hookOrder: string[] = [] +const hookOrderInputs: Array<{ name: string; input: unknown }> = [] + const lsp = Layer.succeed( LSP.Service, LSP.Service.of({ @@ -154,7 +232,6 @@ const lsp = Layer.succeed( const status = SessionStatus.layer.pipe(Layer.provideMerge(EventV2Bridge.defaultLayer)) const run = SessionRunState.layer.pipe(Layer.provide(status)) const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer) - const processorCreateStarted: Array<() => void> = [] const blockingProcessor = Layer.succeed( SessionProcessor.Service, @@ -162,8 +239,16 @@ const blockingProcessor = Layer.succeed( create: () => Effect.sync(() => processorCreateStarted.shift()?.()).pipe(Effect.andThen(Effect.never)), }), ) +const failingProcessor = Layer.succeed( + SessionProcessor.Service, + SessionProcessor.Service.of({ + create: () => Effect.sync(() => { + throw new Error("processor create failed") + }), + }), +) -function makePrompt(input?: { processor?: "blocking" }) { +function makePrompt(input?: { processor?: "blocking" | "failing"; plugin?: Layer.Layer }) { const deps = Layer.mergeAll( Session.defaultLayer, Snapshot.defaultLayer, @@ -172,7 +257,7 @@ function makePrompt(input?: { processor?: "blocking" }) { AgentSvc.defaultLayer, Command.defaultLayer, Permission.defaultLayer, - Plugin.defaultLayer, + input?.plugin ?? Plugin.defaultLayer, Config.defaultLayer, ProviderSvc.defaultLayer, lsp, @@ -201,6 +286,8 @@ function makePrompt(input?: { processor?: "blocking" }) { const proc = input?.processor === "blocking" ? blockingProcessor + : input?.processor === "failing" + ? failingProcessor : SessionProcessor.layer.pipe( Layer.provide(summary), Layer.provide(Image.defaultLayer), @@ -229,11 +316,11 @@ function makePrompt(input?: { processor?: "blocking" }) { ) } -function makeHttp(input?: { processor?: "blocking" }) { +function makeHttp(input?: { processor?: "blocking" | "failing"; plugin?: Layer.Layer }) { return Layer.mergeAll(TestLLMServer.layer, makePrompt(input)) } -function makeHttpNoLLMServer(input?: { processor?: "blocking" }) { +function makeHttpNoLLMServer(input?: { processor?: "blocking" | "failing"; plugin?: Layer.Layer }) { return makePrompt(input) } @@ -242,6 +329,12 @@ const noLLMServer = testEffect(makeHttpNoLLMServer()) const raceNoLLMServer = testEffect(makeHttpNoLLMServer({ processor: "blocking" })) const unix = process.platform !== "win32" ? it.instance : it.instance.skip const unixNoLLMServer = process.platform !== "win32" ? noLLMServer.instance : noLLMServer.instance.skip +const transformIt = testEffect(makeHttp({ plugin: messagesTransformPlugin("plugin-injected-context") })) +const hookOrderIt = testEffect(makeHttp({ plugin: hookOrderPlugin(hookOrder, hookOrderInputs) })) +const failingSystemIt = testEffect(makeHttp({ plugin: failingSystemTransformPlugin() })) +const failingMessagesIt = testEffect(makeHttp({ plugin: failingMessagesTransformPlugin() })) +const failingCompactingIt = testEffect(makeHttp({ plugin: failingCompactingPlugin() })) +const failingProcessorIt = testEffect(makeHttp({ processor: "failing" })) // Config that registers a custom "test" provider with a "test-model" model // so provider model lookup succeeds inside the loop. @@ -582,6 +675,316 @@ it.instance("loop stops provider overflow instead of auto-compacting when disabl }), ) +transformIt.instance( + "serializes messages after plugin transform mutations", + () => + Effect.gen(function* () { + const { llm } = yield* useServerConfig(providerCfg) + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ + title: "Plugin transform", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello" }], + }) + yield* llm.text("world") + + yield* prompt.loop({ sessionID: chat.id }) + + const inputs = yield* llm.inputs + expect(JSON.stringify(inputs.at(-1)?.messages)).toContain("plugin-injected-context") + }), + { git: true }, +) + +hookOrderIt.instance( + "fires system transform before messages transform in prompt loop", + () => + Effect.gen(function* () { + hookOrder.length = 0 + hookOrderInputs.length = 0 + const { llm } = yield* useServerConfig(providerCfg) + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello" }], + }) + yield* llm.text("world") + + yield* prompt.loop({ sessionID: chat.id }) + + expect(hookOrder).toEqual(["system", "messages"]) + expect(hookOrderInputs.find((item) => item.name === "experimental.chat.messages.transform")?.input).toMatchObject({ + sessionID: chat.id, + model: { providerID: ProviderV2.ID.make("test"), modelID: ModelV2.ID.make("test-model") }, + }) + }), + { git: true }, +) + +failingSystemIt.instance( + "records assistant error when system transform fails before stream", + () => + Effect.gen(function* () { + yield* useServerConfig(providerCfg) + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const status = yield* SessionStatus.Service + const chat = yield* sessions.create({ + title: "System transform failure", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello" }], + }) + + const result = yield* prompt.loop({ sessionID: chat.id }) + + expect(result.info.role).toBe("assistant") + if (result.info.role === "assistant") { + expect(JSON.stringify(result.info.error)).toContain("system transform failed") + expect(result.info.finish).toBe("error") + expect(result.info.time.completed).toBeNumber() + } + const retry = yield* prompt.loop({ sessionID: chat.id }) + expect(retry.info.id).toBe(result.info.id) + expect((yield* sessions.messages({ sessionID: chat.id })).filter((msg) => msg.info.role === "assistant")).toHaveLength( + 1, + ) + expect((yield* status.get(chat.id)).type).toBe("idle") + }), + { git: true }, +) + +failingMessagesIt.instance( + "records assistant error when messages transform fails before stream", + () => + Effect.gen(function* () { + yield* useServerConfig(providerCfg) + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const status = yield* SessionStatus.Service + const chat = yield* sessions.create({ + title: "Messages transform failure", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello" }], + }) + + const result = yield* prompt.loop({ sessionID: chat.id }) + + expect(result.info.role).toBe("assistant") + if (result.info.role === "assistant") { + expect(JSON.stringify(result.info.error)).toContain("messages transform failed") + expect(result.info.finish).toBe("error") + expect(result.info.time.completed).toBeNumber() + } + const retry = yield* prompt.loop({ sessionID: chat.id }) + expect(retry.info.id).toBe(result.info.id) + expect((yield* sessions.messages({ sessionID: chat.id })).filter((msg) => msg.info.role === "assistant")).toHaveLength( + 1, + ) + expect((yield* status.get(chat.id)).type).toBe("idle") + }), + { git: true }, +) + +failingProcessorIt.instance( + "records assistant error when processor creation fails before stream", + () => + Effect.gen(function* () { + yield* useServerConfig(providerCfg) + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const status = yield* SessionStatus.Service + const chat = yield* sessions.create({ + title: "Processor create failure", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello" }], + }) + + const result = yield* prompt.loop({ sessionID: chat.id }) + + expect(result.info.role).toBe("assistant") + if (result.info.role === "assistant") { + expect(JSON.stringify(result.info.error)).toContain("processor create failed") + expect(result.info.finish).toBe("error") + expect(result.info.time.completed).toBeNumber() + } + const retry = yield* prompt.loop({ sessionID: chat.id }) + expect(retry.info.id).toBe(result.info.id) + expect((yield* sessions.messages({ sessionID: chat.id })).filter((msg) => msg.info.role === "assistant")).toHaveLength( + 1, + ) + expect((yield* status.get(chat.id)).type).toBe("idle") + }), + { git: true }, +) + +failingCompactingIt.instance( + "does not retry failed pre-stream compaction summaries", + () => + Effect.gen(function* () { + yield* useServerConfig(providerCfg) + const compact = yield* SessionCompaction.Service + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const status = yield* SessionStatus.Service + const chat = yield* sessions.create({ + title: "Compaction pre-stream failure", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello" }], + }) + yield* compact.create({ sessionID: chat.id, agent: "build", model: ref, auto: false }) + + const result = yield* prompt.loop({ sessionID: chat.id }) + + expect(result.info.role).toBe("assistant") + if (result.info.role === "assistant") { + expect(result.info.summary).toBe(true) + expect(JSON.stringify(result.info.error)).toContain("session compacting failed") + expect(result.info.finish).toBe("error") + } + const retry = yield* prompt.loop({ sessionID: chat.id }) + const summaries = (yield* sessions.messages({ sessionID: chat.id })).filter( + (msg) => msg.info.role === "assistant" && msg.info.summary, + ) + expect(retry.info.id).toBe(result.info.id) + expect(summaries).toHaveLength(1) + expect((yield* status.get(chat.id)).type).toBe("idle") + }), + { git: true }, +) + +it.instance( + "keeps structured output instructions before user system override", + () => + Effect.gen(function* () { + const { llm } = yield* useServerConfig(providerCfg) + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ + title: "Structured output system order", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + system: "USER SYSTEM OVERRIDE", + format: new SessionV1.OutputFormatJsonSchema({ + type: "json_schema", + schema: { + type: "object", + properties: { answer: { type: "string" } }, + required: ["answer"], + }, + retryCount: 0, + }), + parts: [{ type: "text", text: "hello" }], + }) + yield* llm.tool("StructuredOutput", { answer: "ok" }) + + yield* prompt.loop({ sessionID: chat.id }) + + const request = (yield* llm.inputs).find((input) => JSON.stringify(input).includes("StructuredOutput tool")) + expect(request).toBeDefined() + const serialized = JSON.stringify(request ?? {}) + expect(serialized.indexOf("StructuredOutput tool")).toBeGreaterThanOrEqual(0) + expect(serialized.indexOf("USER SYSTEM OVERRIDE")).toBeGreaterThan(serialized.indexOf("StructuredOutput tool")) + }), + { git: true }, +) + +hookOrderIt.instance( + "builds system prompt for direct title generation streams without extra chat hooks", + () => + Effect.gen(function* () { + hookOrder.length = 0 + const { llm } = yield* useServerConfig(providerCfg) + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + system: "TITLE USER SYSTEM", + parts: [{ type: "text", text: "hello" }], + }) + yield* llm.text("world") + + yield* prompt.loop({ sessionID: chat.id }) + + const titleRequest = yield* pollWithTimeout( + llm.inputs.pipe(Effect.map((inputs) => inputs.find((input) => JSON.stringify(input).includes("Generate a title")))), + "title request was not sent", + ) + expect(JSON.stringify(titleRequest)).toContain("TITLE USER SYSTEM") + expect(hookOrder).toEqual(["system", "messages"]) + }), + { git: true }, +) + +transformIt.instance( + "does not leak prompt message-transform mutations into title generation streams", + () => + Effect.gen(function* () { + const { llm } = yield* useServerConfig(providerCfg) + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello" }], + }) + yield* llm.text("world") + + yield* prompt.loop({ sessionID: chat.id }) + + const titleRequest = yield* pollWithTimeout( + llm.inputs.pipe(Effect.map((inputs) => inputs.find((input) => JSON.stringify(input).includes("Generate a title")))), + "title request was not sent", + ) + expect(JSON.stringify(titleRequest)).not.toContain("plugin-injected-context") + }), + { git: true }, +) + noLLMServer.instance.skip( "prompt emits v2 prompted and synthetic events (v2 projector disabled)", () => diff --git a/packages/plugin/src/index.ts b/packages/plugin/src/index.ts index edfa0139dfca..14242ea014b8 100644 --- a/packages/plugin/src/index.ts +++ b/packages/plugin/src/index.ts @@ -280,7 +280,7 @@ export interface Hooks { }, ) => Promise "experimental.chat.messages.transform"?: ( - input: {}, + input: { sessionID?: string; model?: { providerID: string; modelID: string } }, output: { messages: { info: Message