From 3a12dd1764ec9aa3dc88268a6c95fd726657c462 Mon Sep 17 00:00:00 2001 From: Shoubhit Dash Date: Mon, 25 May 2026 22:06:06 +0530 Subject: [PATCH] feat(acp-next): add usage service --- packages/opencode/src/acp-next/usage.ts | 251 ++++++++++++++ packages/opencode/test/acp-next/usage.test.ts | 314 ++++++++++++++++++ 2 files changed, 565 insertions(+) create mode 100644 packages/opencode/src/acp-next/usage.ts create mode 100644 packages/opencode/test/acp-next/usage.test.ts diff --git a/packages/opencode/src/acp-next/usage.ts b/packages/opencode/src/acp-next/usage.ts new file mode 100644 index 000000000000..bcf86709f9c2 --- /dev/null +++ b/packages/opencode/src/acp-next/usage.ts @@ -0,0 +1,251 @@ +import type { AgentSideConnection, Usage } from "@agentclientprotocol/sdk" +import * as Log from "@opencode-ai/core/util/log" +import { InstanceRef } from "@/effect/instance-ref" +import { InstanceStore } from "@/project/instance-store" +import { ModelID, ProviderID } from "@/provider/schema" +import { Provider } from "@/provider/provider" +import { Context, Effect, Layer, SynchronizedRef } from "effect" + +const log = Log.create({ service: "acp-next-usage" }) + +export type AssistantTokenCost = { + readonly cost: number + readonly tokens: { + readonly input: number + readonly output: number + readonly reasoning: number + readonly cache: { + readonly read: number + readonly write: number + } + } +} + +export type AssistantMessage = AssistantTokenCost & { + readonly role: "assistant" + readonly providerID?: string + readonly modelID?: string +} + +export type SessionMessage = { + readonly info: { readonly role: string } | AssistantMessage +} + +export type MessagesInput = { + readonly sessionID: string + readonly directory: string +} + +export type SDK = { + readonly session: { + readonly messages: ( + parameters: { readonly sessionID: string; readonly directory: string }, + options: { readonly throwOnError: true }, + ) => Promise<{ readonly data?: readonly SessionMessage[] | null }> + } +} + +export interface MessageLoaderInterface { + readonly messages: (input: MessagesInput) => Effect.Effect +} + +export interface ContextLimitLoaderInterface { + readonly providers: (directory: string) => Effect.Effect, unknown> +} + +export type UsageConnection = Pick + +export interface Interface { + readonly buildUsage: (message: AssistantTokenCost) => Usage + readonly latestAssistantMessage: (messages: readonly SessionMessage[]) => AssistantMessage | undefined + readonly totalSessionCost: (messages: readonly SessionMessage[]) => number + readonly contextLimit: (input: { + readonly directory: string + readonly providerID: ProviderID + readonly modelID: ModelID + }) => Effect.Effect + readonly sendUpdate: (input: { + readonly connection: UsageConnection + readonly sessionID: string + readonly directory: string + }) => Effect.Effect +} + +export class MessageLoader extends Context.Service()( + "@opencode/ACPNextUsageMessageLoader", +) {} + +export class ContextLimitLoader extends Context.Service()( + "@opencode/ACPNextUsageContextLimitLoader", +) {} + +export class Service extends Context.Service()("@opencode/ACPNextUsage") {} + +export function messageLoaderFromSDK(sdk: SDK): MessageLoaderInterface { + return MessageLoader.of({ + messages: (input) => + Effect.promise(() => + sdk.session.messages({ sessionID: input.sessionID, directory: input.directory }, { throwOnError: true }).then( + (response) => response.data ?? [], + ), + ), + }) +} + +export const messageLoaderLayer = (sdk: SDK) => Layer.succeed(MessageLoader, messageLoaderFromSDK(sdk)) + +export function buildUsage(message: AssistantTokenCost): Usage { + const cachedReadTokens = message.tokens.cache.read + const cachedWriteTokens = message.tokens.cache.write + const thoughtTokens = message.tokens.reasoning + + return { + inputTokens: message.tokens.input, + outputTokens: message.tokens.output, + totalTokens: + message.tokens.input + message.tokens.output + thoughtTokens + cachedReadTokens + cachedWriteTokens, + ...(thoughtTokens > 0 ? { thoughtTokens } : {}), + ...(cachedReadTokens > 0 ? { cachedReadTokens } : {}), + ...(cachedWriteTokens > 0 ? { cachedWriteTokens } : {}), + } +} + +export function latestAssistantMessage(messages: readonly SessionMessage[]): AssistantMessage | undefined { + return messages + .filter((message): message is { readonly info: AssistantMessage } => message.info.role === "assistant") + .at(-1)?.info +} + +export function totalSessionCost(messages: readonly SessionMessage[]): number { + return messages + .filter((message): message is { readonly info: AssistantMessage } => message.info.role === "assistant") + .reduce((sum, message) => sum + message.info.cost, 0) +} + +export function findContextLimit( + providers: Record, + providerID: ProviderID, + modelID: ModelID, +): number | undefined { + return providers[providerID]?.models[modelID]?.limit.context +} + +export const contextLimitLoaderLayer = Layer.effect( + ContextLimitLoader, + Effect.gen(function* () { + const store = yield* InstanceStore.Service + const provider = yield* Provider.Service + + return ContextLimitLoader.of({ + providers: Effect.fn("ACPNextUsageContextLimitLoader.providers")(function* (directory) { + const ctx = yield* store.load({ directory }) + return yield* Effect.gen(function* () { + return yield* provider.list() + }).pipe(Effect.provideService(InstanceRef, ctx)) + }), + }) + }), +) + +export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const messageLoader = yield* MessageLoader + const contextLimitLoader = yield* ContextLimitLoader + const limits = yield* SynchronizedRef.make(new Map>()) + + const cachedLimit = Effect.fnUntraced(function* (input: { + readonly directory: string + readonly providerID: ProviderID + readonly modelID: ModelID + }) { + return yield* SynchronizedRef.modifyEffect( + limits, + Effect.fnUntraced(function* (items) { + const key = `${input.directory}\u0000${input.providerID}\u0000${input.modelID}` + const current = items.get(key) + if (current) return [current, items] as const + const next = yield* Effect.cached( + contextLimitLoader.providers(input.directory).pipe( + Effect.map((providers) => findContextLimit(providers, input.providerID, input.modelID)), + Effect.catch((error) => + Effect.sync(() => { + log.error("failed to get providers for usage context limit", { error }) + return undefined + }), + ), + ), + ) + return [next, new Map(items).set(key, next)] as const + }), + ) + }) + + const contextLimit = Effect.fn("ACPNextUsage.contextLimit")(function* (input: { + readonly directory: string + readonly providerID: ProviderID + readonly modelID: ModelID + }) { + return yield* (yield* cachedLimit(input)) + }) + + const sendUpdate = Effect.fn("ACPNextUsage.sendUpdate")(function* (input: { + readonly connection: UsageConnection + readonly sessionID: string + readonly directory: string + }) { + const messages = yield* messageLoader.messages({ sessionID: input.sessionID, directory: input.directory }).pipe( + Effect.catch((error) => + Effect.sync(() => { + log.error("failed to fetch messages for usage update", { error }) + return undefined + }), + ), + ) + if (!messages) return + + const message = latestAssistantMessage(messages) + if (!message) return + if (!message.providerID || !message.modelID) return + + const size = yield* contextLimit({ + directory: input.directory, + providerID: ProviderID.make(message.providerID), + modelID: ModelID.make(message.modelID), + }) + if (!size) return + + yield* Effect.promise(() => + input.connection + .sessionUpdate({ + sessionId: input.sessionID, + update: { + sessionUpdate: "usage_update", + used: message.tokens.input + message.tokens.cache.read, + size, + cost: { amount: totalSessionCost(messages), currency: "USD" }, + }, + }) + .catch((error) => { + log.error("failed to send usage update", { error }) + }), + ) + }) + + return Service.of({ + buildUsage, + latestAssistantMessage, + totalSessionCost, + contextLimit, + sendUpdate, + }) + }), +) + +export const defaultLayer = layer.pipe( + Layer.provide(contextLimitLoaderLayer), + Layer.provide(Provider.defaultLayer), + Layer.provide(InstanceStore.defaultLayer), +) + +export * as UsageService from "./usage" diff --git a/packages/opencode/test/acp-next/usage.test.ts b/packages/opencode/test/acp-next/usage.test.ts new file mode 100644 index 000000000000..77c17d4f72ea --- /dev/null +++ b/packages/opencode/test/acp-next/usage.test.ts @@ -0,0 +1,314 @@ +import { describe, expect, test } from "bun:test" +import type { SessionNotification } from "@agentclientprotocol/sdk" +import { UsageService } from "@/acp-next/usage" +import { ModelID, ProviderID } from "@/provider/schema" +import { Provider } from "@/provider/provider" +import { Effect, Layer } from "effect" +import { it } from "../lib/effect" + +const assistant = ( + input: Partial & Pick, +): UsageService.SessionMessage => ({ + info: { + role: "assistant", + providerID: "anthropic", + modelID: "claude-sonnet", + tokens: { + input: 10, + output: 20, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + ...input, + }, +}) + +const user = (): UsageService.SessionMessage => ({ + info: { role: "user" }, +}) + +const assistantWithoutProvider = (): UsageService.SessionMessage => ({ + info: { + role: "assistant", + modelID: "claude-sonnet", + cost: 1, + tokens: { + input: 10, + output: 20, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + }, +}) + +const model = (providerID: ProviderID, modelID: ModelID, context: number): Provider.Model => ({ + id: modelID, + providerID, + api: { + id: modelID, + url: "https://example.com", + npm: "@ai-sdk/openai-compatible", + }, + name: modelID, + family: "test", + capabilities: { + temperature: true, + reasoning: false, + attachment: false, + toolcall: true, + input: { text: true, audio: false, image: false, video: false, pdf: false }, + output: { text: true, audio: false, image: false, video: false, pdf: false }, + interleaved: false, + }, + cost: { + input: 0, + output: 0, + cache: { read: 0, write: 0 }, + }, + limit: { + context, + output: 4096, + }, + status: "active", + options: {}, + headers: {}, + release_date: "2026-01-01", +}) + +const providers = (context = 128_000): Record => { + const providerID = ProviderID.make("anthropic") + const modelID = ModelID.make("claude-sonnet") + return { + [providerID]: { + id: providerID, + name: "Anthropic", + source: "config", + env: [], + options: {}, + models: { + [modelID]: model(providerID, modelID, context), + }, + }, + } +} + +const fakeLayer = (input: { + readonly messages?: Effect.Effect + readonly providers?: (directory: string) => Effect.Effect, unknown> +}) => + UsageService.layer.pipe( + Layer.provide( + Layer.mergeAll( + Layer.succeed( + UsageService.MessageLoader, + UsageService.MessageLoader.of({ + messages: () => input.messages ?? Effect.succeed([]), + }), + ), + Layer.succeed( + UsageService.ContextLimitLoader, + UsageService.ContextLimitLoader.of({ + providers: input.providers ?? (() => Effect.succeed(providers())), + }), + ), + ), + ), + ) + +const connection = (updates: SessionNotification[]) => ({ + sessionUpdate(params: SessionNotification) { + updates.push(params) + return Promise.resolve() + }, +}) + +describe("acp-next usage", () => { + test("builds ACP Usage from assistant token shape", () => { + expect( + UsageService.buildUsage({ + cost: 0.02, + tokens: { + input: 100, + output: 40, + reasoning: 7, + cache: { read: 11, write: 13 }, + }, + }), + ).toEqual({ + inputTokens: 100, + outputTokens: 40, + thoughtTokens: 7, + cachedReadTokens: 11, + cachedWriteTokens: 13, + totalTokens: 171, + }) + }) + + test("omits optional token fields when they are zero", () => { + expect( + UsageService.buildUsage({ + cost: 0, + tokens: { + input: 3, + output: 4, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + }), + ).toEqual({ + inputTokens: 3, + outputTokens: 4, + totalTokens: 7, + }) + }) + + test("finds the latest assistant message", () => { + expect( + UsageService.latestAssistantMessage([assistant({ cost: 1, modelID: "older" }), user(), assistant({ cost: 2 })]), + ).toMatchObject({ cost: 2 }) + }) + + test("calculates total session cost from assistant messages", () => { + expect(UsageService.totalSessionCost([assistant({ cost: 1.25 }), user(), assistant({ cost: 2.5 })])).toBe(3.75) + }) + + it.effect("loads context limits from providers and caches by directory/provider/model", () => { + const calls: string[] = [] + return Effect.gen(function* () { + const usage = yield* UsageService.Service + const first = yield* usage.contextLimit({ + directory: "/workspace", + providerID: ProviderID.make("anthropic"), + modelID: ModelID.make("claude-sonnet"), + }) + const second = yield* usage.contextLimit({ + directory: "/workspace", + providerID: ProviderID.make("anthropic"), + modelID: ModelID.make("claude-sonnet"), + }) + + expect(first).toBe(200_000) + expect(second).toBe(200_000) + expect(calls).toEqual(["/workspace"]) + }).pipe( + Effect.provide( + fakeLayer({ + providers: (directory) => + Effect.sync(() => { + calls.push(directory) + return providers(200_000) + }), + }), + ), + ) + }) + + it.effect("sends ACP usage_update with context size and cumulative assistant cost", () => { + const updates: SessionNotification[] = [] + return Effect.gen(function* () { + const usage = yield* UsageService.Service + yield* usage.sendUpdate({ + connection: connection(updates), + sessionID: "ses_1", + directory: "/workspace", + }) + + expect(updates).toEqual([ + { + sessionId: "ses_1", + update: { + sessionUpdate: "usage_update", + used: 15, + size: 128_000, + cost: { amount: 3, currency: "USD" }, + }, + }, + ]) + }).pipe( + Effect.provide( + fakeLayer({ + messages: Effect.succeed([ + assistant({ cost: 1 }), + assistant({ + cost: 2, + tokens: { + input: 10, + output: 20, + reasoning: 0, + cache: { read: 5, write: 0 }, + }, + }), + ]), + }), + ), + ) + }) + + it.effect("skips usage update when messages cannot be fetched", () => { + const updates: SessionNotification[] = [] + return Effect.gen(function* () { + const usage = yield* UsageService.Service + yield* usage.sendUpdate({ + connection: connection(updates), + sessionID: "ses_1", + directory: "/workspace", + }) + + expect(updates).toEqual([]) + }).pipe(Effect.provide(fakeLayer({ messages: Effect.fail(new Error("boom")) }))) + }) + + it.effect("skips usage update when no assistant message exists", () => { + const updates: SessionNotification[] = [] + return Effect.gen(function* () { + const usage = yield* UsageService.Service + yield* usage.sendUpdate({ + connection: connection(updates), + sessionID: "ses_1", + directory: "/workspace", + }) + + expect(updates).toEqual([]) + }).pipe(Effect.provide(fakeLayer({ messages: Effect.succeed([user()]) }))) + }) + + it.effect("skips usage update when assistant message has no provider or model", () => { + const updates: SessionNotification[] = [] + return Effect.gen(function* () { + const usage = yield* UsageService.Service + yield* usage.sendUpdate({ + connection: connection(updates), + sessionID: "ses_1", + directory: "/workspace", + }) + + expect(updates).toEqual([]) + }).pipe( + Effect.provide( + fakeLayer({ + messages: Effect.succeed([assistantWithoutProvider()]), + }), + ), + ) + }) + + it.effect("skips usage update when context size is unknown", () => { + const updates: SessionNotification[] = [] + return Effect.gen(function* () { + const usage = yield* UsageService.Service + yield* usage.sendUpdate({ + connection: connection(updates), + sessionID: "ses_1", + directory: "/workspace", + }) + + expect(updates).toEqual([]) + }).pipe( + Effect.provide( + fakeLayer({ + messages: Effect.succeed([assistant({ cost: 1, providerID: "missing" })]), + }), + ), + ) + }) +})