From 8993944b3d92d438a70a297873c4bfc22b71efdb Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 5 Apr 2026 17:04:42 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=96=20fix:=20replay=20SSH=20startup=20?= =?UTF-8?q?status=20across=20workspace=20switches?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/browser/stores/WorkspaceStore.test.ts | 92 +++++++++++++++++++ src/browser/stores/WorkspaceStore.ts | 14 ++- .../agentSession.preStreamError.test.ts | 49 ++++++++++ src/node/services/agentSession.ts | 92 ++++++++++++++----- 4 files changed, 222 insertions(+), 25 deletions(-) diff --git a/src/browser/stores/WorkspaceStore.test.ts b/src/browser/stores/WorkspaceStore.test.ts index 7b88319e63..c8994b7aad 100644 --- a/src/browser/stores/WorkspaceStore.test.ts +++ b/src/browser/stores/WorkspaceStore.test.ts @@ -1558,6 +1558,98 @@ describe("WorkspaceStore", () => { expect(store.getWorkspaceState(workspaceId).isStreamStarting).toBe(false); }); + it("replays runtime-status before caught-up when switching back to a preparing workspace", async () => { + const workspaceId = "stream-starting-runtime-status-replay"; + const otherWorkspaceId = "stream-starting-runtime-status-other"; + const startupDetail = "Checking workspace runtime..."; + let subscriptionCount = 0; + let releaseSecondCaughtUp: (() => void) | undefined; + + mockOnChat.mockImplementation(async function* ( + input?: { workspaceId: string; mode?: unknown }, + options?: { signal?: AbortSignal } + ): AsyncGenerator { + if (input?.workspaceId !== workspaceId) { + await waitForAbortSignal(options?.signal); + return; + } + + subscriptionCount += 1; + + if (subscriptionCount === 1) { + yield { type: "caught-up" }; + await Promise.resolve(); + yield { + type: "stream-lifecycle", + workspaceId, + phase: "preparing", + hadAnyOutput: false, + }; + await Promise.resolve(); + yield { + type: "runtime-status", + workspaceId, + phase: "starting", + runtimeType: "ssh", + detail: startupDetail, + }; + await waitForAbortSignal(options?.signal); + return; + } + + yield { + type: "stream-lifecycle", + workspaceId, + phase: "preparing", + hadAnyOutput: false, + }; + await Promise.resolve(); + yield { + type: "runtime-status", + workspaceId, + phase: "starting", + runtimeType: "ssh", + detail: startupDetail, + }; + await new Promise((resolve) => { + releaseSecondCaughtUp = resolve; + }); + yield { type: "caught-up", replay: "full" }; + await waitForAbortSignal(options?.signal); + }); + + createAndAddWorkspace(store, workspaceId); + + const sawInitialStartup = await waitUntil(() => { + const state = store.getWorkspaceState(workspaceId); + return state.isStreamStarting && state.runtimeStatus?.detail === startupDetail; + }); + expect(sawInitialStartup).toBe(true); + + createAndAddWorkspace(store, otherWorkspaceId); + store.setActiveWorkspaceId(workspaceId); + + const replayedStartupBeforeCaughtUp = await waitUntil(() => { + const state = store.getWorkspaceState(workspaceId); + return ( + subscriptionCount >= 2 && + state.isStreamStarting && + state.runtimeStatus?.detail === startupDetail + ); + }); + expect(replayedStartupBeforeCaughtUp).toBe(true); + + releaseSecondCaughtUp?.(); + + const stayedVisibleAfterCaughtUp = await waitUntil(() => { + const state = store.getWorkspaceState(workspaceId); + return ( + !state.loading && state.isStreamStarting && state.runtimeStatus?.detail === startupDetail + ); + }); + expect(stayedVisibleAfterCaughtUp).toBe(true); + }); + it("active workspace still shows starting during legitimate startup gap", async () => { const workspaceId = "stream-starting-active-workspace"; diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 34800653ed..6a6d96aa7d 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -28,13 +28,16 @@ import { CUSTOM_EVENTS, createCustomEvent } from "@/common/constants/events"; import { useCallback, useSyncExternalStore } from "react"; import { isCaughtUpMessage, + isStreamAbort, isStreamError, + isStreamLifecycle, isDeleteMessage, isBashOutputEvent, isTaskCreatedEvent, isMuxMessage, isQueuedMessageChanged, isRestoreToInput, + isRuntimeStatus, } from "@/common/orpc/types"; import { type StreamAbortEvent, @@ -1551,6 +1554,8 @@ export class WorkspaceStore { : (activity?.lastThinkingLevel ?? aggregator.getCurrentThinkingLevel() ?? null); const hasAuthoritativeStreamLifecycle = streamLifecycle !== null && streamLifecycle.phase !== "idle"; + const hasReplayPreparingLifecycle = + isActiveWorkspace && !transient.caughtUp && streamLifecycle?.phase === "preparing"; const aggregatorRecency = aggregator.getRecencyTimestamp(); const recencyTimestamp = aggregatorRecency === null @@ -1558,10 +1563,11 @@ export class WorkspaceStore { : Math.max(aggregatorRecency, activity?.recency ?? aggregatorRecency); // Treat the backend lifecycle as authoritative, but keep any optimistic // pre-stream "starting" state scoped to the active, caught-up workspace. - // Inactive or replaying workspaces should derive status from authoritative - // activity instead of a sticky local pending-start timestamp. + // Reconnect replay is the one exception: if the backend has already re-emitted + // a PREPARING lifecycle snapshot, keep showing startup instead of briefly + // hiding the barrier until caught-up lands. const isStreamStarting = - useAggregatorState && + (useAggregatorState || hasReplayPreparingLifecycle) && (streamLifecycle?.phase === "preparing" || (!hasAuthoritativeStreamLifecycle && pendingStreamStartTime !== null)) && !canInterrupt; @@ -3566,7 +3572,7 @@ export class WorkspaceStore { } if (!transient.caughtUp && this.isBufferedEvent(data)) { - if ("type" in data && (data.type === "stream-lifecycle" || data.type === "stream-abort")) { + if (isStreamLifecycle(data) || isStreamAbort(data) || isRuntimeStatus(data)) { applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects: false }); this.states.bump(workspaceId); } diff --git a/src/node/services/agentSession.preStreamError.test.ts b/src/node/services/agentSession.preStreamError.test.ts index c828de5029..925dad5027 100644 --- a/src/node/services/agentSession.preStreamError.test.ts +++ b/src/node/services/agentSession.preStreamError.test.ts @@ -298,6 +298,55 @@ describe("AgentSession pre-stream errors", () => { expect(replayInit).toHaveBeenCalledWith(workspaceId); }); + it("replays preparing runtime-status before caught-up so reconnects keep startup details", async () => { + const workspaceId = "ws-replay-runtime-status"; + const { session, cleanup, replayInit, aiEmitter } = + await createReplaySessionHarness(workspaceId); + historyCleanup = cleanup; + + const privateSession = session as unknown as { + setTurnPhase(next: "idle" | "preparing" | "streaming" | "completing"): void; + }; + + privateSession.setTurnPhase("preparing"); + aiEmitter.emit("runtime-status", { + type: "runtime-status", + workspaceId, + phase: "starting", + runtimeType: "ssh", + source: "runtime", + detail: "Checking workspace runtime...", + }); + + const replayedEvents: WorkspaceChatMessage[] = []; + await session.replayHistory(({ message }) => replayedEvents.push(message)); + + const replayedLifecycleIndex = replayedEvents.findIndex( + (event) => event.type === "stream-lifecycle" + ); + const replayedRuntimeStatusIndex = replayedEvents.findIndex( + (event) => event.type === "runtime-status" + ); + const caughtUpIndex = replayedEvents.findIndex((event) => event.type === "caught-up"); + const replayedRuntimeStatus = replayedEvents.find( + (event): event is Extract => + event.type === "runtime-status" + ); + + expect(replayedRuntimeStatus).toEqual({ + type: "runtime-status", + workspaceId, + phase: "starting", + runtimeType: "ssh", + source: "runtime", + detail: "Checking workspace runtime...", + }); + expect(replayedLifecycleIndex).toBeGreaterThanOrEqual(0); + expect(replayedRuntimeStatusIndex).toBeGreaterThan(replayedLifecycleIndex); + expect(caughtUpIndex).toBeGreaterThan(replayedRuntimeStatusIndex); + expect(replayInit).toHaveBeenCalledWith(workspaceId); + }); + it("schedules auto-retry when runtime startup fails before stream events", async () => { const workspaceId = "ws-runtime-start-failed"; diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index 1b883dcadb..754c5afd7f 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -76,6 +76,7 @@ import { resolveAgentInheritanceChain } from "@/node/services/agentDefinitions/r import { MessageQueue } from "./messageQueue"; import { copyStreamLifecycleSnapshot, + type RuntimeStatusEvent, type StreamAbortReason, type StreamEndEvent, type StreamLifecycleSnapshot, @@ -398,6 +399,16 @@ export class AgentSession { */ private terminalStreamError: StreamErrorMessage | null = null; + /** + * Latest pre-stream runtime-status breadcrumb for the in-flight PREPARING turn. + * + * This used to live only in the renderer, which meant switching away from and back to an + * SSH/Coder workspace could drop the startup detail text until a brand-new event arrived. + * Keeping the latest breadcrumb in the session lets replay restore the same status UI that + * live subscribers saw. + */ + private preparingRuntimeStatus: RuntimeStatusEvent | null = null; + /** Last lifecycle snapshot emitted to live subscribers (used for change detection only). */ private lastEmittedStreamLifecycle: StreamLifecycleSnapshot | null = null; @@ -609,19 +620,38 @@ export class AgentSession { return this.terminalStreamLifecycle ?? { phase: "idle", hadAnyOutput: false }; } + private hasSameStreamLifecycle( + left: StreamLifecycleSnapshot | null, + right: StreamLifecycleSnapshot + ): boolean { + return ( + left !== null && + left.phase === right.phase && + left.hadAnyOutput === right.hadAnyOutput && + (left.abortReason ?? null) === (right.abortReason ?? null) + ); + } + + private hasSameRuntimeStatus( + left: RuntimeStatusEvent | null, + right: RuntimeStatusEvent + ): boolean { + return ( + left !== null && + left.phase === right.phase && + left.runtimeType === right.runtimeType && + (left.source ?? null) === (right.source ?? null) && + (left.detail ?? null) === (right.detail ?? null) + ); + } + private emitStreamLifecycleIfChanged(): void { if (this.disposed) { return; } const snapshot = this.getCurrentStreamLifecycleSnapshot(); - const lastSnapshot = this.lastEmittedStreamLifecycle; - if ( - lastSnapshot && - lastSnapshot.phase === snapshot.phase && - lastSnapshot.hadAnyOutput === snapshot.hadAnyOutput && - (lastSnapshot.abortReason ?? null) === (snapshot.abortReason ?? null) - ) { + if (this.hasSameStreamLifecycle(this.lastEmittedStreamLifecycle, snapshot)) { return; } @@ -653,6 +683,19 @@ export class AgentSession { }); } + private updatePreparingRuntimeStatus(status: RuntimeStatusEvent): void { + if (status.phase === "ready" || status.phase === "error") { + this.clearPreparingRuntimeStatus(); + return; + } + + this.preparingRuntimeStatus = status; + } + + private clearPreparingRuntimeStatus(): void { + this.preparingRuntimeStatus = null; + } + private emitRetryEvent(event: RetryStatusEvent): void { if (this.disposed) { return; @@ -1550,6 +1593,7 @@ export class AgentSession { let replayedTerminalStreamError = false; let replayedStreamLifecycle: StreamLifecycleSnapshot | null = null; + let replayedRuntimeStatus: RuntimeStatusEvent | null = null; const emitReplayStatusMessage = (message: WorkspaceChatMessage): void => { listener({ workspaceId: this.workspaceId, message }); }; @@ -1563,21 +1607,20 @@ export class AgentSession { } const lifecycle = this.getCurrentStreamLifecycleSnapshot(); - if ( - replayedStreamLifecycle && - replayedStreamLifecycle.phase === lifecycle.phase && - replayedStreamLifecycle.hadAnyOutput === lifecycle.hadAnyOutput && - (replayedStreamLifecycle.abortReason ?? null) === (lifecycle.abortReason ?? null) - ) { - return; + if (!this.hasSameStreamLifecycle(replayedStreamLifecycle, lifecycle)) { + replayedStreamLifecycle = copyStreamLifecycleSnapshot(lifecycle); + emitReplayStatusMessage({ + type: "stream-lifecycle", + workspaceId: this.workspaceId, + ...lifecycle, + }); } - replayedStreamLifecycle = copyStreamLifecycleSnapshot(lifecycle); - emitReplayStatusMessage({ - type: "stream-lifecycle", - workspaceId: this.workspaceId, - ...lifecycle, - }); + const runtimeStatus = this.preparingRuntimeStatus; + if (runtimeStatus && !this.hasSameRuntimeStatus(replayedRuntimeStatus, runtimeStatus)) { + replayedRuntimeStatus = { ...runtimeStatus }; + emitReplayStatusMessage(runtimeStatus); + } }; let emittedReplayStreamEvents = false; @@ -3902,6 +3945,7 @@ export class AgentSession { const preStreamAbortReason = "abortReason" in payload ? payload.abortReason : undefined; if (this.turnPhase === TurnPhase.PREPARING) { + this.clearPreparingRuntimeStatus(); this.setTerminalStreamLifecycle("interrupted", { abortReason: preStreamAbortReason, hadAnyOutput: false, @@ -3946,7 +3990,12 @@ export class AgentSession { this.emitChatEvent(payload); this.setTurnPhase(TurnPhase.IDLE); }); - forward("runtime-status", (payload) => this.emitChatEvent(payload)); + forward("runtime-status", (payload) => { + if (payload.type === "runtime-status") { + this.updatePreparingRuntimeStatus(payload); + } + this.emitChatEvent(payload); + }); forward("stream-end", async (payload) => { if (payload.type !== "stream-end") { @@ -4151,6 +4200,7 @@ export class AgentSession { private setTurnPhase(next: TurnPhase): void { this.turnPhase = next; + this.clearPreparingRuntimeStatus(); if (next !== TurnPhase.IDLE) { this.terminalStreamLifecycle = null;