From 2f21fccc02fbb77734cbc20f14d2271f7efc4012 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Wed, 25 Feb 2026 10:26:40 -0800 Subject: [PATCH 1/8] Move event ref resolution from server-side to client-side When fetching events with resolveData='all', world-vercel now always sends remoteRefBehavior=lazy to workflow-server and hydrates the returned ref descriptors client-side. This moves memory pressure from the server (which was OOMing when resolving all refs for large event lists) to the client. For inline refs (dbrf:), data is decoded locally from the descriptor's embedded payload with zero network overhead. For S3/Redis refs, parallel requests are made to the new GET /v2/refs endpoint with bounded concurrency (max 10 concurrent requests). --- packages/world-vercel/src/events.ts | 136 ++++++++++++++++++++++++++-- packages/world-vercel/src/refs.ts | 106 ++++++++++++++++++++++ 2 files changed, 234 insertions(+), 8 deletions(-) create mode 100644 packages/world-vercel/src/refs.ts diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index f58b2b1f38..59cb0ee60a 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -14,6 +14,11 @@ import { WorkflowRunSchema, } from '@workflow/world'; import z from 'zod'; +import { + isRefDescriptor, + type RefDescriptor, + resolveRefDescriptors, +} from './refs.js'; import { cancelWorkflowRunV1, createWorkflowRunV1 } from './runs.js'; import { deserializeStep, StepWireSchema } from './steps.js'; import type { APIConfig } from './utils.js'; @@ -37,19 +42,36 @@ const EventResultWireSchema = z.object({ hook: HookSchema.optional(), }); -// Would usually "EventSchema.omit({ eventData: true })" but that doesn't work -// on zod unions. Re-creating the schema manually. -// specVersion defaults to 1 (legacy) when parsing responses from storage +// Schema for events returned with `remoteRefBehavior=lazy`. +// Includes both `eventDataRef` (legacy, specVersion=1) and `eventData` +// (v2, specVersion=2 — may contain nested RefDescriptor values). +// specVersion defaults to 1 (legacy) when parsing responses from storage. const EventWithRefsSchema = z.object({ eventId: z.string(), runId: z.string(), eventType: EventTypeSchema, correlationId: z.string().optional(), eventDataRef: z.any().optional(), + eventData: z.any().optional(), createdAt: z.coerce.date(), specVersion: z.number().default(1), }); +/** + * Maps event types to the field name within `eventData` that may contain + * a ref descriptor. Mirrors the server-side `resolveEventDataRefs()` mapping. + */ +const eventDataRefFieldMap: Record = { + run_created: 'input', + run_completed: 'output', + run_failed: 'error', + step_created: 'input', + step_completed: 'result', + step_failed: 'error', + step_retrying: 'error', + hook_created: 'metadata', +}; + // Events where the client uses the response entity data need 'resolve' (default). // Events where the client discards the response can use 'lazy' to skip expensive // S3 ref resolution on the server, saving ~200-460ms per event. @@ -59,6 +81,93 @@ const eventsNeedingResolve = new Set([ 'step_started', // client reads result.step (checks attempt, state) ]); +/** + * Collect all ref descriptors from a list of lazy-loaded events. + * Returns a flat array of { eventIndex, refType, fieldName?, descriptor } + * entries that can be resolved in bulk. + */ +interface PendingRef { + eventIndex: number; + /** + * 'entity' = top-level eventDataRef (legacy specVersion=1 events) + * 'nested' = nested ref descriptor within eventData (v2 events) + */ + refType: 'entity' | 'nested'; + /** The field name within eventData containing the ref (only for 'nested') */ + fieldName?: string; + descriptor: RefDescriptor; +} + +function collectPendingRefs(events: any[]): PendingRef[] { + const pending: PendingRef[] = []; + + for (let i = 0; i < events.length; i++) { + const event = events[i]; + + // Legacy events (specVersion=1): eventDataRef is a RefDescriptor + if (event.eventDataRef && isRefDescriptor(event.eventDataRef)) { + pending.push({ + eventIndex: i, + refType: 'entity', + descriptor: event.eventDataRef, + }); + } + + // V2 events: eventData may contain a nested RefDescriptor + if (event.eventData && typeof event.eventData === 'object') { + const fieldName = eventDataRefFieldMap[event.eventType as string]; + if (fieldName) { + const fieldValue = event.eventData[fieldName]; + if (isRefDescriptor(fieldValue)) { + pending.push({ + eventIndex: i, + refType: 'nested', + fieldName, + descriptor: fieldValue, + }); + } + } + } + } + + return pending; +} + +/** + * Hydrate lazy-loaded events by resolving all ref descriptors client-side. + * For entity-level refs (eventDataRef), the resolved value becomes eventData. + * For nested refs (eventData[field]), the resolved value replaces the descriptor. + */ +async function hydrateEventRefs( + events: any[], + config?: APIConfig +): Promise { + const pending = collectPendingRefs(events); + if (pending.length === 0) return events; + + // Resolve all descriptors in parallel with bounded concurrency + const descriptors = pending.map((p) => p.descriptor); + const resolvedValues = await resolveRefDescriptors(descriptors, config); + + // Apply resolved values back to the events + for (let i = 0; i < pending.length; i++) { + const { eventIndex, refType, fieldName } = pending[i]; + const event = events[eventIndex]; + const resolved = resolvedValues[i]; + + if (refType === 'entity') { + // Legacy: eventDataRef → eventData, remove the ref field + event.eventData = resolved; + delete event.eventDataRef; + } else if (refType === 'nested' && fieldName) { + // V2: replace the nested ref descriptor with resolved value + event.eventData[fieldName] = resolved; + } + } + + return events; +} + // Functions export async function getWorkflowRunEvents( params: ListEventsParams | ListEventsByCorrelationIdParams, @@ -84,8 +193,11 @@ export async function getWorkflowRunEvents( if (pagination?.sortOrder) searchParams.set('sortOrder', pagination.sortOrder); if (correlationId) searchParams.set('correlationId', correlationId); - const remoteRefBehavior = resolveData === 'none' ? 'lazy' : 'resolve'; - searchParams.set('remoteRefBehavior', remoteRefBehavior); + + // Always send 'lazy' to the server to avoid server-side OOM from resolving + // all refs in memory. When resolveData is 'all', we hydrate refs client-side + // via individual ref resolution requests. + searchParams.set('remoteRefBehavior', 'lazy'); const queryString = searchParams.toString(); const query = queryString ? `?${queryString}` : ''; @@ -97,11 +209,19 @@ export async function getWorkflowRunEvents( endpoint, options: { method: 'GET' }, config, - schema: PaginatedResponseSchema( - remoteRefBehavior === 'lazy' ? EventWithRefsSchema : EventSchema - ), + schema: PaginatedResponseSchema(EventWithRefsSchema), })) as PaginatedResponse; + if (resolveData === 'all') { + // Hydrate refs client-side: resolve all ref descriptors in parallel + const hydratedEvents = await hydrateEventRefs(response.data, config); + return { + ...response, + data: hydratedEvents, + }; + } + + // resolveData === 'none': strip eventData and eventDataRef return { ...response, data: response.data.map((event: any) => diff --git a/packages/world-vercel/src/refs.ts b/packages/world-vercel/src/refs.ts new file mode 100644 index 0000000000..bc2dc85154 --- /dev/null +++ b/packages/world-vercel/src/refs.ts @@ -0,0 +1,106 @@ +import { decode } from 'cbor-x'; +import z from 'zod'; +import type { APIConfig } from './utils.js'; +import { makeRequest } from './utils.js'; + +/** + * A ref descriptor as returned by workflow-server when `remoteRefBehavior=lazy`. + * Matches the server-side `RefDescriptor` type in `lib/data/remote-ref.ts`. + */ +export interface RefDescriptor { + _type: 'RemoteRef'; + _ref: string; + /** Base64-encoded inline payload. Present only for dbrf: (inline) refs. */ + _data?: string; + /** Content type of the inline payload. Present only for dbrf: refs. */ + _ct?: string; +} + +/** + * Checks if a value is a RefDescriptor object. + */ +export function isRefDescriptor(value: unknown): value is RefDescriptor { + return ( + typeof value === 'object' && + value !== null && + '_type' in value && + '_ref' in value && + (value as { _type: string })._type === 'RemoteRef' + ); +} + +/** + * Maximum number of concurrent ref resolution requests. + * Limits peak concurrency to avoid overwhelming the server. + */ +const REF_RESOLVE_CONCURRENCY = 10; + +/** + * Resolve a single ref descriptor. + * + * For inline refs (dbrf: prefix), the data is decoded locally from the + * descriptor's `_data` field — no network request is needed. + * + * For S3 refs (s3rf:) and Redis refs (kvrf:), a request is made to the + * `GET /v2/refs` endpoint on workflow-server. + */ +export async function resolveRefDescriptor( + descriptor: RefDescriptor, + config?: APIConfig +): Promise { + const ref = descriptor._ref; + + // Inline refs (dbrf:) carry their data in the descriptor — decode locally + if (ref.startsWith('dbrf:') && descriptor._data) { + const contentType = descriptor._ct ?? 'application/cbor'; + const binaryData = Buffer.from(descriptor._data, 'base64'); + if (contentType === 'application/octet-stream') { + return new Uint8Array(binaryData); + } + // CBOR-encoded data — decode it + return decode(new Uint8Array(binaryData)); + } + + // Remote refs (s3rf:, kvrf:) — fetch from the server + const response = await makeRequest({ + endpoint: `/v2/refs?ref=${encodeURIComponent(ref)}`, + options: { method: 'GET' }, + config, + schema: z.object({ data: z.any() }), + }); + + return response.data; +} + +/** + * Resolve multiple ref descriptors in parallel with bounded concurrency. + * + * @param descriptors - Array of ref descriptors to resolve + * @param config - API configuration + * @returns Array of resolved values in the same order as input + */ +export async function resolveRefDescriptors( + descriptors: RefDescriptor[], + config?: APIConfig +): Promise { + if (descriptors.length === 0) return []; + + // Simple case: if under concurrency limit, resolve all at once + if (descriptors.length <= REF_RESOLVE_CONCURRENCY) { + return Promise.all(descriptors.map((d) => resolveRefDescriptor(d, config))); + } + + // Batch with bounded concurrency + const results: unknown[] = new Array(descriptors.length); + for (let i = 0; i < descriptors.length; i += REF_RESOLVE_CONCURRENCY) { + const batch = descriptors.slice(i, i + REF_RESOLVE_CONCURRENCY); + const batchResults = await Promise.all( + batch.map((d) => resolveRefDescriptor(d, config)) + ); + for (let j = 0; j < batchResults.length; j++) { + results[i + j] = batchResults[j]; + } + } + + return results; +} From 4e2239b4353b1215804dd416fba2a3778a53ecda Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Wed, 25 Feb 2026 10:27:33 -0800 Subject: [PATCH 2/8] Add changeset for client-side ref hydration --- .changeset/client-side-ref-hydration.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/client-side-ref-hydration.md diff --git a/.changeset/client-side-ref-hydration.md b/.changeset/client-side-ref-hydration.md new file mode 100644 index 0000000000..f38085fc61 --- /dev/null +++ b/.changeset/client-side-ref-hydration.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-vercel": patch +--- + +Move event ref resolution from server-side to client-side to reduce memory pressure on workflow-server From a66a553b40ccafb31a284965c3191361b4d10187 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Wed, 25 Feb 2026 14:12:17 -0800 Subject: [PATCH 3/8] Address code review feedback - Fix bug: filterEventData now strips eventDataRef in resolveData='none' path, preventing raw ref descriptors from leaking to callers - Fix isRefDescriptor type guard to verify _ref is a string - Fail fast for dbrf refs missing _data instead of falling through - Add OTEL world.refs.hydrate span wrapping hydrateEventRefs with hydrated_count attribute, and world.refs.resolve span in batch resolver with inline/remote count attributes - Shallow-clone events before mutation to avoid corrupting upstream caches - Add error context wrapping for resolveRefDescriptors failures - Remove Buffer double copy: pass Buffer directly to cbor-x decode - Update server response handling: /v2/refs now returns raw CBOR bytes - Fix changeset wording per reviewer feedback - Fix comment wording per reviewer feedback --- .changeset/client-side-ref-hydration.md | 2 +- packages/world-vercel/src/events.ts | 76 ++++++++++++++++++------- packages/world-vercel/src/refs.ts | 72 +++++++++++++++-------- 3 files changed, 103 insertions(+), 47 deletions(-) diff --git a/.changeset/client-side-ref-hydration.md b/.changeset/client-side-ref-hydration.md index f38085fc61..67849ab554 100644 --- a/.changeset/client-side-ref-hydration.md +++ b/.changeset/client-side-ref-hydration.md @@ -2,4 +2,4 @@ "@workflow/world-vercel": patch --- -Move event ref resolution from server-side to client-side to reduce memory pressure on workflow-server +Move event ref resolution from server-side to client-side to reduce memory pressure diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 59cb0ee60a..ad882c7db1 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -21,13 +21,20 @@ import { } from './refs.js'; import { cancelWorkflowRunV1, createWorkflowRunV1 } from './runs.js'; import { deserializeStep, StepWireSchema } from './steps.js'; +import { trace } from './telemetry.js'; import type { APIConfig } from './utils.js'; import { DEFAULT_RESOLVE_DATA_OPTION, makeRequest } from './utils.js'; -// Helper to filter event data based on resolveData setting +// Helper to filter event data based on resolveData setting. +// Strips both eventData and eventDataRef since the server always returns +// lazy refs now, and callers with resolveData='none' should not see either. function filterEventData(event: any, resolveData: 'none' | 'all'): Event { if (resolveData === 'none') { - const { eventData: _eventData, ...rest } = event; + const { + eventData: _eventData, + eventDataRef: _eventDataRef, + ...rest + } = event; return rest; } return event; @@ -137,6 +144,9 @@ function collectPendingRefs(events: any[]): PendingRef[] { * Hydrate lazy-loaded events by resolving all ref descriptors client-side. * For entity-level refs (eventDataRef), the resolved value becomes eventData. * For nested refs (eventData[field]), the resolved value replaces the descriptor. + * + * Events are shallow-cloned before mutation to avoid corrupting any upstream + * caches (SWR, React cache, etc.) that might hold references to the originals. */ async function hydrateEventRefs( events: any[], @@ -145,27 +155,49 @@ async function hydrateEventRefs( const pending = collectPendingRefs(events); if (pending.length === 0) return events; - // Resolve all descriptors in parallel with bounded concurrency - const descriptors = pending.map((p) => p.descriptor); - const resolvedValues = await resolveRefDescriptors(descriptors, config); - - // Apply resolved values back to the events - for (let i = 0; i < pending.length; i++) { - const { eventIndex, refType, fieldName } = pending[i]; - const event = events[eventIndex]; - const resolved = resolvedValues[i]; - - if (refType === 'entity') { - // Legacy: eventDataRef → eventData, remove the ref field - event.eventData = resolved; - delete event.eventDataRef; - } else if (refType === 'nested' && fieldName) { - // V2: replace the nested ref descriptor with resolved value - event.eventData[fieldName] = resolved; + return trace('world.refs.hydrate', async (span) => { + span?.setAttribute('workflow.refs.hydrated_count', pending.length); + + // Resolve all descriptors in parallel with bounded concurrency + const descriptors = pending.map((p) => p.descriptor); + const resolvedValues = await resolveRefDescriptors( + descriptors, + config + ).catch((err) => { + const msg = err instanceof Error ? err.message : String(err); + throw new Error( + `Failed to hydrate ${pending.length} ref(s) across ${events.length} event(s): ${msg}` + ); + }); + + // Shallow-clone events that need modification, then apply resolved values + const result = [...events]; + for (let i = 0; i < pending.length; i++) { + const { eventIndex, refType, fieldName } = pending[i]; + const resolved = resolvedValues[i]; + + // Shallow-clone the event (and eventData if nested) before mutating + if (result[eventIndex] === events[eventIndex]) { + result[eventIndex] = { ...events[eventIndex] }; + } + const event = result[eventIndex]; + + if (refType === 'entity') { + // Legacy: eventDataRef → eventData, remove the ref field + event.eventData = resolved; + delete event.eventDataRef; + } else if (refType === 'nested' && fieldName) { + // Shallow-clone eventData before mutating if not yet cloned + if (event.eventData === events[eventIndex].eventData) { + event.eventData = { ...event.eventData }; + } + // V2: replace the nested ref descriptor with resolved value + event.eventData[fieldName] = resolved; + } } - } - return events; + return result; + }); } // Functions @@ -194,7 +226,7 @@ export async function getWorkflowRunEvents( searchParams.set('sortOrder', pagination.sortOrder); if (correlationId) searchParams.set('correlationId', correlationId); - // Always send 'lazy' to the server to avoid server-side OOM from resolving + // Always send 'lazy' to the server to avoid memory pressure from resolving // all refs in memory. When resolveData is 'all', we hydrate refs client-side // via individual ref resolution requests. searchParams.set('remoteRefBehavior', 'lazy'); diff --git a/packages/world-vercel/src/refs.ts b/packages/world-vercel/src/refs.ts index bc2dc85154..bc73132f2b 100644 --- a/packages/world-vercel/src/refs.ts +++ b/packages/world-vercel/src/refs.ts @@ -1,5 +1,6 @@ import { decode } from 'cbor-x'; import z from 'zod'; +import { trace } from './telemetry.js'; import type { APIConfig } from './utils.js'; import { makeRequest } from './utils.js'; @@ -25,6 +26,7 @@ export function isRefDescriptor(value: unknown): value is RefDescriptor { value !== null && '_type' in value && '_ref' in value && + typeof (value as { _ref: unknown })._ref === 'string' && (value as { _type: string })._type === 'RemoteRef' ); } @@ -42,7 +44,7 @@ const REF_RESOLVE_CONCURRENCY = 10; * descriptor's `_data` field — no network request is needed. * * For S3 refs (s3rf:) and Redis refs (kvrf:), a request is made to the - * `GET /v2/refs` endpoint on workflow-server. + * `GET /v2/refs` endpoint on workflow-server which returns raw CBOR bytes. */ export async function resolveRefDescriptor( descriptor: RefDescriptor, @@ -51,30 +53,36 @@ export async function resolveRefDescriptor( const ref = descriptor._ref; // Inline refs (dbrf:) carry their data in the descriptor — decode locally - if (ref.startsWith('dbrf:') && descriptor._data) { + if (ref.startsWith('dbrf:')) { + if (!descriptor._data) { + throw new Error(`Inline ref descriptor missing _data field: ${ref}`); + } const contentType = descriptor._ct ?? 'application/cbor'; const binaryData = Buffer.from(descriptor._data, 'base64'); if (contentType === 'application/octet-stream') { return new Uint8Array(binaryData); } - // CBOR-encoded data — decode it - return decode(new Uint8Array(binaryData)); + // CBOR-encoded data — decode it. Buffer is accepted by cbor-x directly. + return decode(binaryData); } - // Remote refs (s3rf:, kvrf:) — fetch from the server - const response = await makeRequest({ + // Remote refs (s3rf:, kvrf:) — fetch raw CBOR bytes from the server. + // The server returns the raw stored bytes directly (not wrapped in a + // JSON/CBOR envelope), so makeRequest decodes them into the JS value. + return makeRequest({ endpoint: `/v2/refs?ref=${encodeURIComponent(ref)}`, options: { method: 'GET' }, config, - schema: z.object({ data: z.any() }), + schema: z.any(), }); - - return response.data; } /** * Resolve multiple ref descriptors in parallel with bounded concurrency. * + * If an entire batch fails (e.g., /v2/refs endpoint is down), remaining + * batches are aborted to avoid sending doomed requests. + * * @param descriptors - Array of ref descriptors to resolve * @param config - API configuration * @returns Array of resolved values in the same order as input @@ -85,22 +93,38 @@ export async function resolveRefDescriptors( ): Promise { if (descriptors.length === 0) return []; - // Simple case: if under concurrency limit, resolve all at once - if (descriptors.length <= REF_RESOLVE_CONCURRENCY) { - return Promise.all(descriptors.map((d) => resolveRefDescriptor(d, config))); - } + return trace('world.refs.resolve', async (span) => { + const inlineCount = descriptors.filter((d) => + d._ref.startsWith('dbrf:') + ).length; + const remoteCount = descriptors.length - inlineCount; + + span?.setAttributes({ + 'workflow.refs.total_count': descriptors.length, + 'workflow.refs.inline_count': inlineCount, + 'workflow.refs.remote_count': remoteCount, + }); - // Batch with bounded concurrency - const results: unknown[] = new Array(descriptors.length); - for (let i = 0; i < descriptors.length; i += REF_RESOLVE_CONCURRENCY) { - const batch = descriptors.slice(i, i + REF_RESOLVE_CONCURRENCY); - const batchResults = await Promise.all( - batch.map((d) => resolveRefDescriptor(d, config)) - ); - for (let j = 0; j < batchResults.length; j++) { - results[i + j] = batchResults[j]; + // Simple case: if under concurrency limit, resolve all at once + if (descriptors.length <= REF_RESOLVE_CONCURRENCY) { + return Promise.all( + descriptors.map((d) => resolveRefDescriptor(d, config)) + ); + } + + // Batch with bounded concurrency. If a batch fails entirely, + // abort remaining batches to avoid cascading failures. + const results: unknown[] = new Array(descriptors.length); + for (let i = 0; i < descriptors.length; i += REF_RESOLVE_CONCURRENCY) { + const batch = descriptors.slice(i, i + REF_RESOLVE_CONCURRENCY); + const batchResults = await Promise.all( + batch.map((d) => resolveRefDescriptor(d, config)) + ); + for (let j = 0; j < batchResults.length; j++) { + results[i + j] = batchResults[j]; + } } - } - return results; + return results; + }); } From fcc580c3e9f2921017842df66c4155972836df04 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Wed, 25 Feb 2026 15:20:41 -0800 Subject: [PATCH 4/8] Address second round of review feedback - Remove unnecessary Uint8Array copy for octet-stream inline refs; return Buffer directly since it's already a Uint8Array subclass - Fix misleading batch failure comment: Promise.all rejects on any single ref failure, not only when the entire batch fails - Deduplicate ref descriptors by _ref key before resolving to avoid redundant network requests when multiple events share the same ref --- packages/world-vercel/src/events.ts | 41 ++++++++++++++++++++--------- packages/world-vercel/src/refs.ts | 8 +++--- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index ad882c7db1..1dd9fd23b0 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -158,23 +158,38 @@ async function hydrateEventRefs( return trace('world.refs.hydrate', async (span) => { span?.setAttribute('workflow.refs.hydrated_count', pending.length); - // Resolve all descriptors in parallel with bounded concurrency - const descriptors = pending.map((p) => p.descriptor); - const resolvedValues = await resolveRefDescriptors( - descriptors, - config - ).catch((err) => { - const msg = err instanceof Error ? err.message : String(err); - throw new Error( - `Failed to hydrate ${pending.length} ref(s) across ${events.length} event(s): ${msg}` - ); - }); + // Deduplicate descriptors by _ref key to avoid redundant resolutions. + // Multiple events may reference the same ref (e.g., shared input). + const uniqueDescriptors = new Map(); + for (const p of pending) { + if (!uniqueDescriptors.has(p.descriptor._ref)) { + uniqueDescriptors.set(p.descriptor._ref, p.descriptor); + } + } + const deduped = Array.from(uniqueDescriptors.values()); + + // Resolve unique descriptors in parallel with bounded concurrency + const dedupedResults = await resolveRefDescriptors(deduped, config).catch( + (err) => { + const msg = err instanceof Error ? err.message : String(err); + throw new Error( + `Failed to hydrate ${pending.length} ref(s) across ${events.length} event(s): ${msg}` + ); + } + ); + + // Build a map from ref key → resolved value for fast lookup + const resolvedMap = new Map(); + const dedupedKeys = Array.from(uniqueDescriptors.keys()); + for (let i = 0; i < dedupedKeys.length; i++) { + resolvedMap.set(dedupedKeys[i], dedupedResults[i]); + } // Shallow-clone events that need modification, then apply resolved values const result = [...events]; for (let i = 0; i < pending.length; i++) { - const { eventIndex, refType, fieldName } = pending[i]; - const resolved = resolvedValues[i]; + const { eventIndex, refType, fieldName, descriptor } = pending[i]; + const resolved = resolvedMap.get(descriptor._ref); // Shallow-clone the event (and eventData if nested) before mutating if (result[eventIndex] === events[eventIndex]) { diff --git a/packages/world-vercel/src/refs.ts b/packages/world-vercel/src/refs.ts index bc73132f2b..8d49fd4e05 100644 --- a/packages/world-vercel/src/refs.ts +++ b/packages/world-vercel/src/refs.ts @@ -60,7 +60,8 @@ export async function resolveRefDescriptor( const contentType = descriptor._ct ?? 'application/cbor'; const binaryData = Buffer.from(descriptor._data, 'base64'); if (contentType === 'application/octet-stream') { - return new Uint8Array(binaryData); + // Buffer is a Uint8Array subclass — return directly to avoid a copy. + return binaryData; } // CBOR-encoded data — decode it. Buffer is accepted by cbor-x directly. return decode(binaryData); @@ -112,8 +113,9 @@ export async function resolveRefDescriptors( ); } - // Batch with bounded concurrency. If a batch fails entirely, - // abort remaining batches to avoid cascading failures. + // Batch with bounded concurrency. If any ref in a batch fails, + // the batch rejects and remaining batches are aborted to avoid + // cascading failures. const results: unknown[] = new Array(descriptors.length); for (let i = 0; i < descriptors.length; i += REF_RESOLVE_CONCURRENCY) { const batch = descriptors.slice(i, i + REF_RESOLVE_CONCURRENCY); From d930e566c00228581d994c6348cb2fc502fe1ae2 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Wed, 25 Feb 2026 15:29:18 -0800 Subject: [PATCH 5/8] Fix E2E failures: handle octet-stream refs and coerce event dates Two bugs found via E2E tests: 1. resolveRefDescriptor used makeRequest which only handles JSON/CBOR API responses. The /v2/refs endpoint returns raw bytes with varying Content-Type (application/cbor or application/octet-stream for binary data like Uint8Array). Now uses direct fetch with getHttpConfig and handles both content types explicitly. 2. EventWithRefsSchema uses eventData: z.any() which skips type coercions that EventSchema applies (e.g., z.coerce.date() for resumeAt in wait_created events). After hydrating refs, events are now re-parsed through EventSchema.safeParse() to apply these coercions. Events that don't match a known type pass through as-is. --- packages/world-vercel/src/events.ts | 12 +++++++++- packages/world-vercel/src/refs.ts | 37 ++++++++++++++++++++--------- 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 1dd9fd23b0..36dc1155ad 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -262,9 +262,19 @@ export async function getWorkflowRunEvents( if (resolveData === 'all') { // Hydrate refs client-side: resolve all ref descriptors in parallel const hydratedEvents = await hydrateEventRefs(response.data, config); + + // Re-parse hydrated events through EventSchema to apply type coercions + // (e.g., z.coerce.date() for resumeAt) that EventWithRefsSchema skips. + // Use safeParse to gracefully handle any events that don't match a known + // type — pass them through as-is rather than failing the entire request. + const validatedEvents = hydratedEvents.map((event: any) => { + const result = EventSchema.safeParse(event); + return result.success ? result.data : event; + }); + return { ...response, - data: hydratedEvents, + data: validatedEvents, }; } diff --git a/packages/world-vercel/src/refs.ts b/packages/world-vercel/src/refs.ts index 8d49fd4e05..67c64ab3e2 100644 --- a/packages/world-vercel/src/refs.ts +++ b/packages/world-vercel/src/refs.ts @@ -1,8 +1,6 @@ import { decode } from 'cbor-x'; -import z from 'zod'; import { trace } from './telemetry.js'; -import type { APIConfig } from './utils.js'; -import { makeRequest } from './utils.js'; +import { type APIConfig, getHttpConfig } from './utils.js'; /** * A ref descriptor as returned by workflow-server when `remoteRefBehavior=lazy`. @@ -67,15 +65,32 @@ export async function resolveRefDescriptor( return decode(binaryData); } - // Remote refs (s3rf:, kvrf:) — fetch raw CBOR bytes from the server. + // Remote refs (s3rf:, kvrf:) — fetch raw bytes from the server. // The server returns the raw stored bytes directly (not wrapped in a - // JSON/CBOR envelope), so makeRequest decodes them into the JS value. - return makeRequest({ - endpoint: `/v2/refs?ref=${encodeURIComponent(ref)}`, - options: { method: 'GET' }, - config, - schema: z.any(), - }); + // JSON/CBOR envelope). The Content-Type may be 'application/cbor' (for + // CBOR-encoded data) or 'application/octet-stream' (for raw binary like + // Uint8Array). We handle both content types directly rather than going + // through makeRequest, which only handles JSON/CBOR API responses. + const { baseUrl, headers } = await getHttpConfig(config); + const url = `${baseUrl}/v2/refs?ref=${encodeURIComponent(ref)}`; + + const response = await fetch(new Request(url, { method: 'GET', headers })); + if (!response.ok) { + throw new Error( + `Failed to resolve ref ${ref}: HTTP ${response.status} ${response.statusText}` + ); + } + + const contentType = response.headers.get('Content-Type') || ''; + const buffer = await response.arrayBuffer(); + + if (contentType.includes('application/octet-stream')) { + // Raw binary data (e.g., Uint8Array stored by the workflow) + return new Uint8Array(buffer); + } + + // CBOR-encoded data (the common case for structured values) + return decode(new Uint8Array(buffer)); } /** From b6a1c1161d33b81b29d57ad93d08f5bf1419e3b1 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Wed, 25 Feb 2026 16:40:12 -0800 Subject: [PATCH 6/8] Update ref resolution URL to use /runs/:runId/refs path Update resolveRefDescriptor to include the runId in the endpoint path, matching the server-side rename from GET /v2/refs to GET /v2/runs/:runId/refs. Thread runId from events through the dedup map and into resolveRefDescriptors via a new RefWithRunId interface that pairs each descriptor with its owning event's runId. --- packages/world-vercel/src/events.ts | 15 ++++++--- packages/world-vercel/src/refs.ts | 48 +++++++++++++++++++---------- 2 files changed, 41 insertions(+), 22 deletions(-) diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 36dc1155ad..0a8d14e6b9 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -17,6 +17,7 @@ import z from 'zod'; import { isRefDescriptor, type RefDescriptor, + type RefWithRunId, resolveRefDescriptors, } from './refs.js'; import { cancelWorkflowRunV1, createWorkflowRunV1 } from './runs.js'; @@ -160,13 +161,17 @@ async function hydrateEventRefs( // Deduplicate descriptors by _ref key to avoid redundant resolutions. // Multiple events may reference the same ref (e.g., shared input). - const uniqueDescriptors = new Map(); + const uniqueRefs = new Map(); for (const p of pending) { - if (!uniqueDescriptors.has(p.descriptor._ref)) { - uniqueDescriptors.set(p.descriptor._ref, p.descriptor); + if (!uniqueRefs.has(p.descriptor._ref)) { + const eventRunId = events[p.eventIndex].runId as string; + uniqueRefs.set(p.descriptor._ref, { + descriptor: p.descriptor, + runId: eventRunId, + }); } } - const deduped = Array.from(uniqueDescriptors.values()); + const deduped = Array.from(uniqueRefs.values()); // Resolve unique descriptors in parallel with bounded concurrency const dedupedResults = await resolveRefDescriptors(deduped, config).catch( @@ -180,7 +185,7 @@ async function hydrateEventRefs( // Build a map from ref key → resolved value for fast lookup const resolvedMap = new Map(); - const dedupedKeys = Array.from(uniqueDescriptors.keys()); + const dedupedKeys = Array.from(uniqueRefs.keys()); for (let i = 0; i < dedupedKeys.length; i++) { resolvedMap.set(dedupedKeys[i], dedupedResults[i]); } diff --git a/packages/world-vercel/src/refs.ts b/packages/world-vercel/src/refs.ts index 67c64ab3e2..7595bda748 100644 --- a/packages/world-vercel/src/refs.ts +++ b/packages/world-vercel/src/refs.ts @@ -42,10 +42,16 @@ const REF_RESOLVE_CONCURRENCY = 10; * descriptor's `_data` field — no network request is needed. * * For S3 refs (s3rf:) and Redis refs (kvrf:), a request is made to the - * `GET /v2/refs` endpoint on workflow-server which returns raw CBOR bytes. + * `GET /v2/runs/:runId/refs` endpoint on workflow-server which returns + * raw CBOR or binary bytes. + * + * @param descriptor - The ref descriptor to resolve + * @param runId - The runId that owns this ref (used in the URL path) + * @param config - API configuration */ export async function resolveRefDescriptor( descriptor: RefDescriptor, + runId: string, config?: APIConfig ): Promise { const ref = descriptor._ref; @@ -72,7 +78,7 @@ export async function resolveRefDescriptor( // Uint8Array). We handle both content types directly rather than going // through makeRequest, which only handles JSON/CBOR API responses. const { baseUrl, headers } = await getHttpConfig(config); - const url = `${baseUrl}/v2/refs?ref=${encodeURIComponent(ref)}`; + const url = `${baseUrl}/v2/runs/${encodeURIComponent(runId)}/refs?ref=${encodeURIComponent(ref)}`; const response = await fetch(new Request(url, { method: 'GET', headers })); if (!response.ok) { @@ -93,49 +99,57 @@ export async function resolveRefDescriptor( return decode(new Uint8Array(buffer)); } +/** + * A ref descriptor paired with the runId that owns it, for resolution. + */ +export interface RefWithRunId { + descriptor: RefDescriptor; + runId: string; +} + /** * Resolve multiple ref descriptors in parallel with bounded concurrency. * - * If an entire batch fails (e.g., /v2/refs endpoint is down), remaining - * batches are aborted to avoid sending doomed requests. + * If any ref in a batch fails, the batch rejects and remaining batches + * are aborted to avoid cascading failures. * - * @param descriptors - Array of ref descriptors to resolve + * @param refs - Array of ref descriptors with their owning runIds * @param config - API configuration * @returns Array of resolved values in the same order as input */ export async function resolveRefDescriptors( - descriptors: RefDescriptor[], + refs: RefWithRunId[], config?: APIConfig ): Promise { - if (descriptors.length === 0) return []; + if (refs.length === 0) return []; return trace('world.refs.resolve', async (span) => { - const inlineCount = descriptors.filter((d) => - d._ref.startsWith('dbrf:') + const inlineCount = refs.filter((r) => + r.descriptor._ref.startsWith('dbrf:') ).length; - const remoteCount = descriptors.length - inlineCount; + const remoteCount = refs.length - inlineCount; span?.setAttributes({ - 'workflow.refs.total_count': descriptors.length, + 'workflow.refs.total_count': refs.length, 'workflow.refs.inline_count': inlineCount, 'workflow.refs.remote_count': remoteCount, }); // Simple case: if under concurrency limit, resolve all at once - if (descriptors.length <= REF_RESOLVE_CONCURRENCY) { + if (refs.length <= REF_RESOLVE_CONCURRENCY) { return Promise.all( - descriptors.map((d) => resolveRefDescriptor(d, config)) + refs.map((r) => resolveRefDescriptor(r.descriptor, r.runId, config)) ); } // Batch with bounded concurrency. If any ref in a batch fails, // the batch rejects and remaining batches are aborted to avoid // cascading failures. - const results: unknown[] = new Array(descriptors.length); - for (let i = 0; i < descriptors.length; i += REF_RESOLVE_CONCURRENCY) { - const batch = descriptors.slice(i, i + REF_RESOLVE_CONCURRENCY); + const results: unknown[] = new Array(refs.length); + for (let i = 0; i < refs.length; i += REF_RESOLVE_CONCURRENCY) { + const batch = refs.slice(i, i + REF_RESOLVE_CONCURRENCY); const batchResults = await Promise.all( - batch.map((d) => resolveRefDescriptor(d, config)) + batch.map((r) => resolveRefDescriptor(r.descriptor, r.runId, config)) ); for (let j = 0; j < batchResults.length; j++) { results[i + j] = batchResults[j]; From 3432d43945013c3e71b39e8022fa9ff89a965102 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Wed, 25 Feb 2026 16:57:56 -0800 Subject: [PATCH 7/8] Add OTEL tracing, structured errors, and coercion failure logging - Wrap direct fetch in resolveRefDescriptor with trace('http GET') span matching makeRequest conventions (http.request.method, url.full, http.response.status_code, peer.service, error recording) - Set Accept and X-Request-Time headers that makeRequest normally adds, preventing RSC request memoization and enabling content negotiation - Use WorkflowAPIError instead of plain Error for ref fetch failures, preserving structured context (url, status) for future retry logic - Log warning when EventSchema.safeParse fails during post-hydration coercion, making schema mismatches visible in production --- packages/world-vercel/src/events.ts | 7 +++ packages/world-vercel/src/refs.ts | 72 ++++++++++++++++++++++------- 2 files changed, 63 insertions(+), 16 deletions(-) diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 0a8d14e6b9..7b95c86aa3 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -272,10 +272,17 @@ export async function getWorkflowRunEvents( // (e.g., z.coerce.date() for resumeAt) that EventWithRefsSchema skips. // Use safeParse to gracefully handle any events that don't match a known // type — pass them through as-is rather than failing the entire request. + let coercionFailures = 0; const validatedEvents = hydratedEvents.map((event: any) => { const result = EventSchema.safeParse(event); + if (!result.success) coercionFailures++; return result.success ? result.data : event; }); + if (coercionFailures > 0) { + console.warn( + `[world-vercel] EventSchema coercion failed for ${coercionFailures}/${hydratedEvents.length} events` + ); + } return { ...response, diff --git a/packages/world-vercel/src/refs.ts b/packages/world-vercel/src/refs.ts index 7595bda748..9500c7e9e7 100644 --- a/packages/world-vercel/src/refs.ts +++ b/packages/world-vercel/src/refs.ts @@ -1,5 +1,14 @@ +import { WorkflowAPIError } from '@workflow/errors'; import { decode } from 'cbor-x'; -import { trace } from './telemetry.js'; +import { + ErrorType, + getSpanKind, + HttpRequestMethod, + HttpResponseStatusCode, + PeerService, + trace, + UrlFull, +} from './telemetry.js'; import { type APIConfig, getHttpConfig } from './utils.js'; /** @@ -78,25 +87,56 @@ export async function resolveRefDescriptor( // Uint8Array). We handle both content types directly rather than going // through makeRequest, which only handles JSON/CBOR API responses. const { baseUrl, headers } = await getHttpConfig(config); - const url = `${baseUrl}/v2/runs/${encodeURIComponent(runId)}/refs?ref=${encodeURIComponent(ref)}`; + const endpoint = `/v2/runs/${encodeURIComponent(runId)}/refs?ref=${encodeURIComponent(ref)}`; + const url = `${baseUrl}${endpoint}`; + + // Set headers that makeRequest normally adds: Accept for content + // negotiation and X-Request-Time to bypass RSC request memoization. + headers.set('Accept', 'application/cbor, application/octet-stream'); + headers.set('X-Request-Time', Date.now().toString()); + + return trace( + 'http GET', + { kind: await getSpanKind('CLIENT') }, + async (span) => { + span?.setAttributes({ + ...HttpRequestMethod('GET'), + ...UrlFull(url), + ...PeerService('workflow-server'), + }); + + const response = await fetch( + new Request(url, { method: 'GET', headers }) + ); - const response = await fetch(new Request(url, { method: 'GET', headers })); - if (!response.ok) { - throw new Error( - `Failed to resolve ref ${ref}: HTTP ${response.status} ${response.statusText}` - ); - } + span?.setAttributes({ + ...HttpResponseStatusCode(response.status), + }); + + if (!response.ok) { + const error = new WorkflowAPIError( + `Failed to resolve ref: HTTP ${response.status} ${response.statusText}`, + { url, status: response.status } + ); + span?.setAttributes({ + ...ErrorType(`HTTP ${response.status}`), + }); + span?.recordException?.(error); + throw error; + } - const contentType = response.headers.get('Content-Type') || ''; - const buffer = await response.arrayBuffer(); + const contentType = response.headers.get('Content-Type') || ''; + const buffer = await response.arrayBuffer(); - if (contentType.includes('application/octet-stream')) { - // Raw binary data (e.g., Uint8Array stored by the workflow) - return new Uint8Array(buffer); - } + if (contentType.includes('application/octet-stream')) { + // Raw binary data (e.g., Uint8Array stored by the workflow) + return new Uint8Array(buffer); + } - // CBOR-encoded data (the common case for structured values) - return decode(new Uint8Array(buffer)); + // CBOR-encoded data (the common case for structured values) + return decode(new Uint8Array(buffer)); + } + ); } /** From 733584656586aef3b4266ba40bdbe850070ac191 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 26 Feb 2026 09:54:00 -0800 Subject: [PATCH 8/8] [world-vercel] Expose total blob and stream storage bytes on run entity (#1196) --- .changeset/cold-pillows-refuse.md | 5 +++++ packages/world-vercel/src/runs.ts | 5 +++-- 2 files changed, 8 insertions(+), 2 deletions(-) create mode 100644 .changeset/cold-pillows-refuse.md diff --git a/.changeset/cold-pillows-refuse.md b/.changeset/cold-pillows-refuse.md new file mode 100644 index 0000000000..c9da0e6674 --- /dev/null +++ b/.changeset/cold-pillows-refuse.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-vercel": patch +--- + +Include total blob and stream storage size of a run in the run response diff --git a/packages/world-vercel/src/runs.ts b/packages/world-vercel/src/runs.ts index 1cde2bff45..86c369156f 100644 --- a/packages/world-vercel/src/runs.ts +++ b/packages/world-vercel/src/runs.ts @@ -33,6 +33,9 @@ const WorkflowRunWireBaseSchema = WorkflowRunBaseSchema.omit({ }).extend({ // Backend returns error as either a JSON string or structured object error: z.union([z.string(), StructuredErrorSchema]).optional(), + // Not part of the World interface, but passed through for direct consumers and debugging + blobStorageBytes: z.number().optional(), + streamStorageBytes: z.number().optional(), }); // Wire schema for resolved data (full input/output) @@ -50,8 +53,6 @@ const WorkflowRunWireWithRefsSchema = WorkflowRunWireBaseSchema.omit({ // Accept both Uint8Array (v2 format) and any (legacy v1 JSON format) input: z.union([z.instanceof(Uint8Array), z.any()]).optional(), output: z.union([z.instanceof(Uint8Array), z.any()]).optional(), - blobStorageBytes: z.number().optional(), - streamStorageBytes: z.number().optional(), }); // Overloaded function signatures for filterRunData