Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions packages/llm/src/route/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Cause, Context, Effect, Layer, Schema, Stream } from "effect"
import { Cause, Context, Effect, Layer, Queue, Schema, Stream } from "effect"
import type { Auth as AuthDef } from "./auth"
import type { Endpoint } from "./endpoint"
import { RequestExecutor } from "./executor"
import type { RetryInfo } from "./executor"
import type { Framing } from "./framing"
import { HttpTransport } from "./transport"
import type { Transport, TransportRuntime } from "./transport"
Expand All @@ -12,10 +13,11 @@ import { applyCachePolicy } from "../cache-policy"
import * as ProviderShared from "../protocols/shared"
import * as ToolRuntime from "../tool-runtime"
import type { Tools } from "../tool"
import type { LLMError, LLMEvent, PreparedRequestOf, ProtocolID } from "../schema"
import type { LLMError, PreparedRequestOf, ProtocolID } from "../schema"
import {
GenerationOptions,
HttpOptions,
LLMEvent,
LLMRequest,
LLMResponse,
ModelID,
Expand Down Expand Up @@ -439,6 +441,50 @@ const streamRequestWith = (runtime: TransportRuntime) => (request: LLMRequest) =
}),
)

const retryEvent = (info: RetryInfo) =>
Effect.sync(() => {
const http = "http" in info.error.reason ? info.error.reason.http : undefined
return LLMEvent.retry({
attempt: info.attempt,
delayMs: info.delayMs,
message: info.error.reason.message,
providerMetadata: {
opencode: {
module: info.error.module,
method: info.error.method,
reason: info.error.reason._tag,
status: ("status" in info.error.reason ? info.error.reason.status : undefined) ?? http?.response?.status,
},
},
})
})

const streamRequestObservableWith = (runtime: TransportRuntime) => (request: LLMRequest) =>
Stream.callback<LLMEvent, LLMError>((queue) =>
Effect.gen(function* () {
const pull = yield* Stream.toPull(
streamRequestWith({
...runtime,
httpOptions: {
onRetry: (info) => retryEvent(info).pipe(Effect.map((event) => Queue.offerUnsafe(queue, event))),
},
})(request),
)

const drain: Effect.Effect<void, never> = Effect.gen(function* () {
const chunk = yield* pull
for (const event of chunk) Queue.offerUnsafe(queue, event)
return yield* drain
}).pipe(
Effect.catchIf(Cause.isDone, () => Effect.sync(() => Queue.endUnsafe(queue))),
Effect.catchTag("LLM.Error", (error) => Queue.fail(queue, error)),
)

yield* drain
}),
)


const isToolRunOptions = (input: LLMRequest | ToolRuntime.RunOptions<Tools>): input is ToolRuntime.RunOptions<Tools> =>
"request" in input && "tools" in input

Expand Down Expand Up @@ -495,7 +541,7 @@ export const streamRequest = (request: LLMRequest) =>
export const layer: Layer.Layer<Service, never, RequestExecutor.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const stream = streamWith(streamRequestWith({ http: yield* RequestExecutor.Service }))
const stream = streamWith(streamRequestObservableWith({ http: yield* RequestExecutor.Service }))
return Service.of({ prepare: prepareWith as Interface["prepare"], stream, generate: generateWith(stream) })
}),
)
Expand All @@ -505,7 +551,7 @@ export const layerWithWebSocket: Layer.Layer<Service, never, RequestExecutor.Ser
Service,
Effect.gen(function* () {
const stream = streamWith(
streamRequestWith({
streamRequestObservableWith({
http: yield* RequestExecutor.Service,
webSocket: yield* WebSocketExecutor.Service,
}),
Expand Down
17 changes: 15 additions & 2 deletions packages/llm/src/route/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,20 @@ import {
export interface Interface {
readonly execute: (
request: HttpClientRequest.HttpClientRequest,
options?: ExecuteOptions,
) => Effect.Effect<HttpClientResponse.HttpClientResponse, LLMError>
}

export type RetryInfo = {
readonly attempt: number
readonly delayMs: number
readonly error: LLMError
}

export type ExecuteOptions = {
readonly onRetry?: (info: RetryInfo) => Effect.Effect<void>
}

export class Service extends Context.Service<Service, Interface>()("@opencode/LLM/RequestExecutor") {}

const BODY_LIMIT = 16_384
Expand Down Expand Up @@ -341,14 +352,16 @@ const retryDelay = (error: LLMError, attempt: number) => {

const retryStatusFailures = <A, R>(
effect: Effect.Effect<A, LLMError, R>,
options: ExecuteOptions | undefined,
retries = MAX_RETRIES,
attempt = 0,
): Effect.Effect<A, LLMError, R> =>
Effect.catchTag(effect, "LLM.Error", (error): Effect.Effect<A, LLMError, R> => {
if (!error.retryable || retries <= 0) return Effect.fail(error)
return retryDelay(error, attempt).pipe(
Effect.tap((delay) => options?.onRetry?.({ attempt: attempt + 1, delayMs: delay, error }) ?? Effect.void),
Effect.flatMap((delay) => Effect.sleep(delay)),
Effect.flatMap(() => retryStatusFailures(effect, retries - 1, attempt + 1)),
Effect.flatMap(() => retryStatusFailures(effect, options, retries - 1, attempt + 1)),
)
})

Expand All @@ -364,7 +377,7 @@ export const layer: Layer.Layer<Service, never, HttpClient.HttpClient> = Layer.e
.pipe(Effect.mapError(toHttpError(redactedNames)), Effect.flatMap(statusError(request, redactedNames)))
})
return Service.of({
execute: (request) => retryStatusFailures(executeOnce(request)),
execute: (request, options) => retryStatusFailures(executeOnce(request), options),
})
}),
)
Expand Down
2 changes: 1 addition & 1 deletion packages/llm/src/route/transport/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export const httpJson = <Body, Frame>(input: HttpJsonInput<Body, Frame>): HttpJs
frames: (prepared, request, runtime) =>
Stream.unwrap(
runtime.http
.execute(prepared.request)
.execute(prepared.request, runtime.httpOptions)
.pipe(
Effect.map((response) =>
prepared.framing.frame(
Expand Down
3 changes: 2 additions & 1 deletion packages/llm/src/route/transport/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import type { Effect, Stream } from "effect"
import type { Interface as RequestExecutorInterface } from "../executor"
import type { ExecuteOptions, Interface as RequestExecutorInterface } from "../executor"
import type { Interface as WebSocketExecutorInterface } from "./websocket"
import type { LLMError, LLMRequest } from "../../schema"

export interface TransportRuntime {
readonly http: RequestExecutorInterface
readonly webSocket?: WebSocketExecutorInterface
readonly httpOptions?: ExecuteOptions
}

export interface Transport<Body, Prepared, Frame> {
Expand Down
2 changes: 1 addition & 1 deletion packages/llm/src/schema/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ export class TransportReason extends Schema.Class<TransportReason>("LLM.Error.Tr
http: Schema.optional(HttpContext),
}) {
get retryable() {
return false
return this.kind === "TransportError" || this.kind === "Timeout"
}
}

Expand Down
12 changes: 12 additions & 0 deletions packages/llm/src/schema/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,15 @@ export const ProviderErrorEvent = Schema.Struct({
}).annotate({ identifier: "LLM.Event.ProviderError" })
export type ProviderErrorEvent = Schema.Schema.Type<typeof ProviderErrorEvent>

export const RetryEvent = Schema.Struct({
type: Schema.tag("retry"),
attempt: Schema.Number,
delayMs: Schema.Number,
message: Schema.String,
providerMetadata: Schema.optional(ProviderMetadata),
}).annotate({ identifier: "LLM.Event.Retry" })
export type RetryEvent = Schema.Schema.Type<typeof RetryEvent>

const llmEventTagged = Schema.Union([
StepStart,
TextStart,
Expand All @@ -220,6 +229,7 @@ const llmEventTagged = Schema.Union([
StepFinish,
Finish,
ProviderErrorEvent,
RetryEvent,
]).pipe(Schema.toTaggedUnion("type"))

type WithID<Event extends { readonly id: unknown }, ID> = Omit<Event, "type" | "id"> & { readonly id: ID | string }
Expand Down Expand Up @@ -265,6 +275,7 @@ export const LLMEvent = Object.assign(llmEventTagged, {
usage: input.usage === undefined ? undefined : Usage.from(input.usage),
}),
providerError: ProviderErrorEvent.make,
retry: RetryEvent.make,
is: {
stepStart: llmEventTagged.guards["step-start"],
textStart: llmEventTagged.guards["text-start"],
Expand All @@ -282,6 +293,7 @@ export const LLMEvent = Object.assign(llmEventTagged, {
stepFinish: llmEventTagged.guards["step-finish"],
finish: llmEventTagged.guards.finish,
providerError: llmEventTagged.guards["provider-error"],
retry: llmEventTagged.guards.retry,
},
})
export type LLMEvent = Schema.Schema.Type<typeof llmEventTagged>
Expand Down
Loading
Loading