Skip to content

Commit 2647483

Browse files
committed
🤖 fix: avoid stream replay blocking caught-up
1 parent 98e35c2 commit 2647483

File tree

2 files changed

+92
-3
lines changed

2 files changed

+92
-3
lines changed

src/node/services/streamManager.test.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,90 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
320320
});
321321
});
322322

323+
describe("StreamManager - replayStream", () => {
324+
let streamManager: StreamManager;
325+
let mockHistoryService: HistoryService;
326+
let mockPartialService: PartialService;
327+
328+
beforeEach(() => {
329+
mockHistoryService = createMockHistoryService();
330+
mockPartialService = createMockPartialService();
331+
streamManager = new StreamManager(mockHistoryService, mockPartialService);
332+
// Suppress error events from bubbling up as uncaught exceptions during tests
333+
streamManager.on("error", () => undefined);
334+
});
335+
336+
test("does not chase newly appended parts during replay", async () => {
337+
const workspaceId = "test-workspace-replay";
338+
339+
// Mock token tracker so we can pause inside replay.
340+
let releaseFirstCountTokens: (() => void) | undefined;
341+
let onFirstCountTokensCalled: (() => void) | undefined;
342+
343+
const firstCountTokensCalled = new Promise<void>((resolve) => {
344+
onFirstCountTokensCalled = resolve;
345+
});
346+
347+
let countTokensCalls = 0;
348+
const tokenTracker = {
349+
setModel: mock(() => Promise.resolve()),
350+
countTokens: mock(() => {
351+
countTokensCalls += 1;
352+
if (countTokensCalls === 1) {
353+
onFirstCountTokensCalled?.();
354+
return new Promise<number>((resolve) => {
355+
releaseFirstCountTokens = () => resolve(1);
356+
});
357+
}
358+
return Promise.resolve(1);
359+
}),
360+
};
361+
362+
const replaced = Reflect.set(streamManager, "tokenTracker", tokenTracker);
363+
if (!replaced) {
364+
throw new Error("Failed to mock StreamManager.tokenTracker");
365+
}
366+
367+
const workspaceStreamsValue = Reflect.get(streamManager, "workspaceStreams") as unknown;
368+
if (!(workspaceStreamsValue instanceof Map)) {
369+
throw new Error("StreamManager.workspaceStreams is not a Map");
370+
}
371+
372+
const workspaceStreams = workspaceStreamsValue as Map<string, unknown>;
373+
374+
const streamInfo = {
375+
state: "streaming",
376+
messageId: "msg-1",
377+
model: "anthropic:claude-3-5-sonnet",
378+
historySequence: 1,
379+
startTime: Date.now(),
380+
parts: [{ type: "text", text: "hello", timestamp: Date.now() }],
381+
initialMetadata: {},
382+
};
383+
384+
workspaceStreams.set(workspaceId, streamInfo);
385+
386+
const deltas: string[] = [];
387+
streamManager.on("stream-delta", (data: { delta: string }) => {
388+
deltas.push(data.delta);
389+
});
390+
391+
const replayPromise = streamManager.replayStream(workspaceId);
392+
393+
// Wait until replay is actively tokenizing the first part.
394+
await firstCountTokensCalled;
395+
396+
// Mutate parts while replay is in progress. replayStream should snapshot, so this part should
397+
// not be emitted by replay.
398+
streamInfo.parts.push({ type: "text", text: "world", timestamp: Date.now() });
399+
400+
releaseFirstCountTokens?.();
401+
await replayPromise;
402+
403+
expect(deltas).toEqual(["hello"]);
404+
});
405+
});
406+
323407
describe("StreamManager - Unavailable Tool Handling", () => {
324408
let streamManager: StreamManager;
325409
let mockHistoryService: HistoryService;

src/node/services/streamManager.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1754,9 +1754,14 @@ export class StreamManager extends EventEmitter {
17541754
...(replayMode && { mode: replayMode }),
17551755
});
17561756

1757-
// Replay accumulated parts as events using shared emission logic
1758-
// This guarantees replay produces identical events to the original stream
1759-
for (const part of streamInfo.parts) {
1757+
// Replay accumulated parts as events using shared emission logic.
1758+
//
1759+
// IMPORTANT: Snapshot the parts array. During active streaming, `streamInfo.parts` grows as new
1760+
// deltas arrive. Iterating the live array would "chase" new parts and can prevent this replay
1761+
// from completing (which delays the "caught-up" signal and leaves the UI stuck on
1762+
// "Loading workspace..." during refresh).
1763+
const partsSnapshot = streamInfo.parts.slice();
1764+
for (const part of partsSnapshot) {
17601765
await this.emitPartAsEvent(typedWorkspaceId, streamInfo.messageId, part);
17611766
}
17621767
}

0 commit comments

Comments
 (0)