From b2648b3b3a1329fa44021f729be72a584a6d32c8 Mon Sep 17 00:00:00 2001 From: Karthik Kalyanaraman Date: Fri, 20 Mar 2026 15:51:43 -0700 Subject: [PATCH 1/3] fix stream tab for legacy runs --- .changeset/curvy-moles-end.md | 5 ++ .../web/app/lib/hooks/use-stream-reader.ts | 80 ++++++++++++++++--- 2 files changed, 74 insertions(+), 11 deletions(-) create mode 100644 .changeset/curvy-moles-end.md diff --git a/.changeset/curvy-moles-end.md b/.changeset/curvy-moles-end.md new file mode 100644 index 0000000000..881773781a --- /dev/null +++ b/.changeset/curvy-moles-end.md @@ -0,0 +1,5 @@ +--- +"@workflow/web": patch +--- + +Fix stream detection for pre 4.1.0-beta.56 runs diff --git a/packages/web/app/lib/hooks/use-stream-reader.ts b/packages/web/app/lib/hooks/use-stream-reader.ts index 26671d216b..d36747b417 100644 --- a/packages/web/app/lib/hooks/use-stream-reader.ts +++ b/packages/web/app/lib/hooks/use-stream-reader.ts @@ -17,6 +17,24 @@ export interface StreamChunk { const FRAME_HEADER_SIZE = 4; +/** + * Detect stream encoding from the first bytes. + * + * - **framed**: Current format (≥ 4.1.0-beta.56) — 4-byte big-endian length + + * format-prefixed payload (`devl`/`encr`). + * - **legacy**: Older SDK versions (≤ 4.1.0-beta.55) used newline-delimited + * devalue strings with no binary framing. + */ +type StreamEncoding = 'framed' | 'legacy'; + +function detectEncoding(data: Uint8Array): StreamEncoding { + if (data.length < FRAME_HEADER_SIZE) return 'framed'; + const view = new DataView(data.buffer, data.byteOffset, data.byteLength); + const len = view.getUint32(0, false); + if (len > 0 && len <= 10 * 1024 * 1024) return 'framed'; + return 'legacy'; +} + export function useStreamReader( env: EnvMap, streamId: string | null, @@ -48,7 +66,6 @@ export function useStreamReader( const readStreamData = async () => { try { - // Get raw binary stream from server (no deserialization on server) const rawStream = await readStream( env, streamId, @@ -56,15 +73,15 @@ export function useStreamReader( abortController.signal ); - // Import the CryptoKey if the user has clicked Decrypt const cryptoKey = encryptionKey ? await importKey(encryptionKey) : undefined; - // Process length-prefixed frames from the raw stream, deserializing - // and decrypting entirely client-side. const reader = (rawStream as ReadableStream).getReader(); + const decoder = new TextDecoder(); let buffer = new Uint8Array(0); + let encoding: StreamEncoding | null = null; + let textRemainder = ''; const appendToBuffer = (data: Uint8Array) => { const newBuffer = new Uint8Array(buffer.length + data.length); @@ -73,18 +90,59 @@ export function useStreamReader( buffer = newBuffer; }; + const addLegacyLine = (line: string) => { + if (!mounted || !line) return; + const chunkId = chunkIdRef.current++; + let text: string; + try { + const parsed = JSON.parse(line); + text = JSON.stringify(parsed, null, 2); + } catch { + text = line; + } + setChunks((prev) => [...prev, { id: chunkId, text }]); + }; + for (;;) { if (abortController.signal.aborted) break; const { value, done } = await reader.read(); if (done) { + if (encoding === 'legacy' && textRemainder.trim()) { + addLegacyLine(textRemainder.trim()); + textRemainder = ''; + } if (mounted) setIsLive(false); break; } - appendToBuffer(value); + // Detect encoding on first read with enough data + if (encoding === null) { + appendToBuffer(value); + if (buffer.length >= FRAME_HEADER_SIZE) { + encoding = detectEncoding(buffer); + } + if (encoding !== 'framed') { + textRemainder = decoder.decode(buffer, { stream: true }); + buffer = new Uint8Array(0); + } + } else if (encoding === 'legacy') { + textRemainder += decoder.decode(value, { stream: true }); + } else { + appendToBuffer(value); + } + + if (encoding === 'legacy') { + const lines = textRemainder.split('\n'); + textRemainder = lines.pop() ?? ''; + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed) addLegacyLine(trimmed); + } + continue; + } - // Process complete frames + // Framed mode: process length-prefixed frames while (buffer.length >= FRAME_HEADER_SIZE) { const view = new DataView( buffer.buffer, @@ -93,8 +151,12 @@ export function useStreamReader( ); const frameLength = view.getUint32(0, false); + if (frameLength === 0 || frameLength > 10 * 1024 * 1024) { + break; + } + if (buffer.length < FRAME_HEADER_SIZE + frameLength) { - break; // Incomplete frame + break; } const frameData = buffer.slice( @@ -104,7 +166,6 @@ export function useStreamReader( buffer = buffer.slice(FRAME_HEADER_SIZE + frameLength); try { - // Check if the frame is encrypted const { format, payload } = decodeFormatPrefix(frameData); let dataToHydrate: Uint8Array; @@ -119,14 +180,11 @@ export function useStreamReader( reader.cancel().catch(() => {}); return; } - // Decrypt to get the inner format-prefixed bytes (e.g., devl+data) dataToHydrate = await aesGcmDecrypt(cryptoKey, payload); } else { - // Not encrypted — pass the original frame data (with format prefix) dataToHydrate = frameData; } - // hydrateData handles format prefix decoding + devalue parsing const hydrated = hydrateData(dataToHydrate, revivers); if (mounted && hydrated !== undefined && hydrated !== null) { const chunkId = chunkIdRef.current++; From 333e93202657a36d28b34502ad399efc948daeef Mon Sep 17 00:00:00 2001 From: Karthik Kalyanaraman Date: Fri, 20 Mar 2026 16:55:55 -0700 Subject: [PATCH 2/3] fix stream tab for legacy runs --- packages/web/app/lib/hooks/use-stream-reader.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/web/app/lib/hooks/use-stream-reader.ts b/packages/web/app/lib/hooks/use-stream-reader.ts index d36747b417..260933b4b4 100644 --- a/packages/web/app/lib/hooks/use-stream-reader.ts +++ b/packages/web/app/lib/hooks/use-stream-reader.ts @@ -122,7 +122,7 @@ export function useStreamReader( if (buffer.length >= FRAME_HEADER_SIZE) { encoding = detectEncoding(buffer); } - if (encoding !== 'framed') { + if (encoding === 'legacy') { textRemainder = decoder.decode(buffer, { stream: true }); buffer = new Uint8Array(0); } From defb6064bac1b9886ded61e5dc77bf0ea71cdd4b Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 20 Mar 2026 17:37:12 -0700 Subject: [PATCH 3/3] Apply suggestion from @VaguelySerious Signed-off-by: Peter Wielander --- .changeset/curvy-moles-end.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/curvy-moles-end.md b/.changeset/curvy-moles-end.md index 881773781a..1c79c8608e 100644 --- a/.changeset/curvy-moles-end.md +++ b/.changeset/curvy-moles-end.md @@ -2,4 +2,4 @@ "@workflow/web": patch --- -Fix stream detection for pre 4.1.0-beta.56 runs +Fix stream display for streams created before version `4.1.0-beta.56`