-
Notifications
You must be signed in to change notification settings - Fork 20.6k
fix(session): retry empty stream truncations and discard partial parts #26167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,6 +86,10 @@ interface ProcessorContext extends Input { | |
| currentTextID: string | undefined | ||
| reasoningMap: Record<string, SessionV1.ReasoningPart> | ||
| v2AssistantMessageID: SessionMessage.ID | undefined | ||
| // Part id created just before the current attempt begins; parts with a | ||
| // greater id were produced by the attempt and are discarded when it is | ||
| // retried after a stream truncation. | ||
| partFloor: PartID | ||
| } | ||
|
|
||
| type StreamEvent = LLMEvent | ||
|
|
@@ -128,6 +132,7 @@ export const layer = Layer.effect( | |
| currentTextID: undefined, | ||
| reasoningMap: {}, | ||
| v2AssistantMessageID: undefined, | ||
| partFloor: PartID.ascending(), | ||
| } | ||
| const mirrorAssistant = flags.experimentalEventSystem && !input.assistantMessage.summary | ||
| let aborted = false | ||
|
|
@@ -395,7 +400,7 @@ export const layer = Layer.effect( | |
| time: { start: Date.now() }, | ||
| metadata: value.providerMetadata, | ||
| } | ||
| yield* session.updatePart(ctx.reasoningMap[value.id]) | ||
| yield* session.updatePart(ctx.reasoningMap[value.id]) | ||
| return | ||
|
|
||
| case "reasoning-delta": | ||
|
|
@@ -701,6 +706,20 @@ export const layer = Layer.effect( | |
| usage: value.usage ?? new Usage({}), | ||
| metadata: value.providerMetadata, | ||
| }) | ||
| // Detect stream truncation: the AI SDK reports the unmapped | ||
| // fallback reason when the upstream provider stream ends without a | ||
| // proper stop_reason. No usage and no output means the connection | ||
| // was cut mid-generation, which is a transient failure that should | ||
| // be retried. | ||
| if (value.reason === "unknown" && usage.tokens.output === 0) { | ||
| return yield* Effect.fail( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small style-guide suggestion, optional for the human to decide: in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The All 14 existing Keeping the |
||
| new SessionV1.APIError({ | ||
| message: "Provider stream ended without a stop reason", | ||
| isRetryable: true, | ||
| metadata: { code: "EmptyOther" }, | ||
| }), | ||
| ) | ||
| } | ||
|
Comment on lines
+714
to
+722
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest extending this check to also catch Hit the same failure shape on Azure-served gpt-5.5 via the OpenAI-compatible adapter:
Why the current check misses it: the PR guards on Suggested extension (reuses if (
usage.tokens.output === 0 &&
ctx.attemptParts.length === 0 &&
(value.reason === "unknown" || value.reason === "stop")
) {
return yield* Effect.fail(new MessageV2.APIError({
message: "Provider returned empty stream",
isRetryable: true,
metadata: { code: "EmptyStream" },
}))
} |
||
| if (!ctx.assistantMessage.summary) { | ||
| // TODO(v2): Temporary dual-write while migrating session messages to v2 events. | ||
| if (mirrorAssistant) { | ||
|
|
@@ -846,6 +865,27 @@ export const layer = Layer.effect( | |
| } | ||
| }) | ||
|
|
||
| // Discards every part the failed attempt persisted (anything created | ||
| // after partFloor) so a successful retry replaces rather than appends to | ||
| // the truncated content. The assistant message is created fresh per | ||
| // process() call, so the floor scopes removal to this attempt's output. | ||
| const discardAttempt = Effect.fn("SessionProcessor.discardAttempt")(function* () { | ||
| const existing = yield* MessageV2.parts(ctx.assistantMessage.id).pipe( | ||
| Effect.provideService(Database.Service, database), | ||
| ) | ||
| for (const part of existing) { | ||
| if (part.id <= ctx.partFloor) continue | ||
| yield* session.removePart({ | ||
| sessionID: ctx.sessionID, | ||
| messageID: ctx.assistantMessage.id, | ||
| partID: part.id, | ||
| }) | ||
| } | ||
| ctx.currentText = undefined | ||
| ctx.reasoningMap = {} | ||
| ctx.toolcalls = {} | ||
| }) | ||
|
|
||
| const cleanup = Effect.fn("SessionProcessor.cleanup")(function* () { | ||
| if (ctx.snapshot) { | ||
| const patch = yield* snapshot.patch(ctx.snapshot) | ||
|
|
@@ -933,6 +973,11 @@ export const layer = Layer.effect( | |
| yield* events.publish(Session.Event.Error, { sessionID: ctx.sessionID, error }) | ||
| return | ||
| } | ||
| // Retries are exhausted: drop the truncated attempt's partial parts so | ||
| // the failed message doesn't keep an orphan step-start / partial text. | ||
| if (SessionV1.APIError.isInstance(error) && error.data.metadata?.code === "EmptyOther") { | ||
| yield* discardAttempt() | ||
| } | ||
| if (!ctx.assistantMessage.summary) { | ||
| // TODO(v2): Temporary dual-write while migrating session messages to v2 events. | ||
| if (mirrorAssistant) { | ||
|
|
@@ -959,6 +1004,9 @@ export const layer = Layer.effect( | |
| slog.info("process") | ||
| ctx.needsCompaction = false | ||
| ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true | ||
| // Record the high-water mark before any attempt persists parts so a | ||
| // truncation retry can discard exactly this call's output. | ||
| ctx.partFloor = PartID.ascending() | ||
|
|
||
| return yield* Effect.gen(function* () { | ||
| yield* Effect.gen(function* () { | ||
|
|
@@ -1003,7 +1051,12 @@ export const layer = Layer.effect( | |
| timestamp: DateTime.makeUnsafe(Date.now()), | ||
| }) | ||
| : Effect.void | ||
| return flushV2Fragments().pipe( | ||
| // Only stream truncations leave partial parts worth discarding; | ||
| // other retryable errors (rate limits, 5xx) retry untouched. | ||
| const truncated = | ||
| SessionV1.APIError.isInstance(info.error) && info.error.data.metadata?.code === "EmptyOther" | ||
| return (truncated ? discardAttempt() : Effect.void).pipe( | ||
| Effect.andThen(flushV2Fragments()), | ||
| Effect.andThen(event), | ||
| Effect.andThen( | ||
| status.set(ctx.sessionID, { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion for the human to decide: this failure happens after stream parts may already have been persisted on the current assistant message. Because
Effect.retry(...)wraps the stream beforecleanup()runs, a retry will start a new stream on the same message without removing the partial text/reasoning parts from the truncated attempt, so a successful retry can leave the original truncated content plus the retried response in the final assistant message. Consider clearing the in-flight attempt parts before retrying, or moving this detection earlier to a place where no partial parts have been committed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch — verified the concern is real:
Effect.ensuring(cleanup())wraps the retry, socleanup()only runs at the very end of the whole chain.ctx.currentText/ctx.reasoningMappersist across retry attempts (closure-captured).text-startandreasoning-startcallsession.updatePart(...)immediately, so partial parts are already in SQLite by the timefinish-stepfires.Pushed a fix in
0a09591b2:ctx.attemptParts(pushed intext-start/reasoning-start).discardAttempt()helper deletes those parts viasession.removePart(...)and resetscurrentText/reasoningMap/snapshot.setcallback so it fires only when a retry will actually happen. Terminal failures (no retry) route throughhaltand keep the partial content as user-visible context.Note this is a pre-existing issue affecting all retryable mid-stream errors (ECONNRESET, ZlibError, SSE timeout, etc.); the
EmptyOtherpath just makes it more frequent. The fix applies uniformly to all of them.Added an
it.instanceregression test (retry discards in-flight parts from the failed attempt) that pushes a truncated reply followed by a clean success and asserts the final message contains only the retried text.