diff --git a/.changeset/client-side-ref-hydration.md b/.changeset/client-side-ref-hydration.md new file mode 100644 index 0000000000..67849ab554 --- /dev/null +++ b/.changeset/client-side-ref-hydration.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-vercel": patch +--- + +Move event ref resolution from server-side to client-side to reduce memory pressure diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index f58b2b1f38..7b95c86aa3 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -14,15 +14,28 @@ import { WorkflowRunSchema, } from '@workflow/world'; import z from 'zod'; +import { + isRefDescriptor, + type RefDescriptor, + type RefWithRunId, + resolveRefDescriptors, +} from './refs.js'; import { cancelWorkflowRunV1, createWorkflowRunV1 } from './runs.js'; import { deserializeStep, StepWireSchema } from './steps.js'; +import { trace } from './telemetry.js'; import type { APIConfig } from './utils.js'; import { DEFAULT_RESOLVE_DATA_OPTION, makeRequest } from './utils.js'; -// Helper to filter event data based on resolveData setting +// Helper to filter event data based on resolveData setting. +// Strips both eventData and eventDataRef since the server always returns +// lazy refs now, and callers with resolveData='none' should not see either. function filterEventData(event: any, resolveData: 'none' | 'all'): Event { if (resolveData === 'none') { - const { eventData: _eventData, ...rest } = event; + const { + eventData: _eventData, + eventDataRef: _eventDataRef, + ...rest + } = event; return rest; } return event; @@ -37,19 +50,36 @@ const EventResultWireSchema = z.object({ hook: HookSchema.optional(), }); -// Would usually "EventSchema.omit({ eventData: true })" but that doesn't work -// on zod unions. Re-creating the schema manually. -// specVersion defaults to 1 (legacy) when parsing responses from storage +// Schema for events returned with `remoteRefBehavior=lazy`. +// Includes both `eventDataRef` (legacy, specVersion=1) and `eventData` +// (v2, specVersion=2 — may contain nested RefDescriptor values). +// specVersion defaults to 1 (legacy) when parsing responses from storage. const EventWithRefsSchema = z.object({ eventId: z.string(), runId: z.string(), eventType: EventTypeSchema, correlationId: z.string().optional(), eventDataRef: z.any().optional(), + eventData: z.any().optional(), createdAt: z.coerce.date(), specVersion: z.number().default(1), }); +/** + * Maps event types to the field name within `eventData` that may contain + * a ref descriptor. Mirrors the server-side `resolveEventDataRefs()` mapping. + */ +const eventDataRefFieldMap: Record = { + run_created: 'input', + run_completed: 'output', + run_failed: 'error', + step_created: 'input', + step_completed: 'result', + step_failed: 'error', + step_retrying: 'error', + hook_created: 'metadata', +}; + // Events where the client uses the response entity data need 'resolve' (default). // Events where the client discards the response can use 'lazy' to skip expensive // S3 ref resolution on the server, saving ~200-460ms per event. @@ -59,6 +89,137 @@ const eventsNeedingResolve = new Set([ 'step_started', // client reads result.step (checks attempt, state) ]); +/** + * Collect all ref descriptors from a list of lazy-loaded events. + * Returns a flat array of { eventIndex, refType, fieldName?, descriptor } + * entries that can be resolved in bulk. + */ +interface PendingRef { + eventIndex: number; + /** + * 'entity' = top-level eventDataRef (legacy specVersion=1 events) + * 'nested' = nested ref descriptor within eventData (v2 events) + */ + refType: 'entity' | 'nested'; + /** The field name within eventData containing the ref (only for 'nested') */ + fieldName?: string; + descriptor: RefDescriptor; +} + +function collectPendingRefs(events: any[]): PendingRef[] { + const pending: PendingRef[] = []; + + for (let i = 0; i < events.length; i++) { + const event = events[i]; + + // Legacy events (specVersion=1): eventDataRef is a RefDescriptor + if (event.eventDataRef && isRefDescriptor(event.eventDataRef)) { + pending.push({ + eventIndex: i, + refType: 'entity', + descriptor: event.eventDataRef, + }); + } + + // V2 events: eventData may contain a nested RefDescriptor + if (event.eventData && typeof event.eventData === 'object') { + const fieldName = eventDataRefFieldMap[event.eventType as string]; + if (fieldName) { + const fieldValue = event.eventData[fieldName]; + if (isRefDescriptor(fieldValue)) { + pending.push({ + eventIndex: i, + refType: 'nested', + fieldName, + descriptor: fieldValue, + }); + } + } + } + } + + return pending; +} + +/** + * Hydrate lazy-loaded events by resolving all ref descriptors client-side. + * For entity-level refs (eventDataRef), the resolved value becomes eventData. + * For nested refs (eventData[field]), the resolved value replaces the descriptor. + * + * Events are shallow-cloned before mutation to avoid corrupting any upstream + * caches (SWR, React cache, etc.) that might hold references to the originals. + */ +async function hydrateEventRefs( + events: any[], + config?: APIConfig +): Promise { + const pending = collectPendingRefs(events); + if (pending.length === 0) return events; + + return trace('world.refs.hydrate', async (span) => { + span?.setAttribute('workflow.refs.hydrated_count', pending.length); + + // Deduplicate descriptors by _ref key to avoid redundant resolutions. + // Multiple events may reference the same ref (e.g., shared input). + const uniqueRefs = new Map(); + for (const p of pending) { + if (!uniqueRefs.has(p.descriptor._ref)) { + const eventRunId = events[p.eventIndex].runId as string; + uniqueRefs.set(p.descriptor._ref, { + descriptor: p.descriptor, + runId: eventRunId, + }); + } + } + const deduped = Array.from(uniqueRefs.values()); + + // Resolve unique descriptors in parallel with bounded concurrency + const dedupedResults = await resolveRefDescriptors(deduped, config).catch( + (err) => { + const msg = err instanceof Error ? err.message : String(err); + throw new Error( + `Failed to hydrate ${pending.length} ref(s) across ${events.length} event(s): ${msg}` + ); + } + ); + + // Build a map from ref key → resolved value for fast lookup + const resolvedMap = new Map(); + const dedupedKeys = Array.from(uniqueRefs.keys()); + for (let i = 0; i < dedupedKeys.length; i++) { + resolvedMap.set(dedupedKeys[i], dedupedResults[i]); + } + + // Shallow-clone events that need modification, then apply resolved values + const result = [...events]; + for (let i = 0; i < pending.length; i++) { + const { eventIndex, refType, fieldName, descriptor } = pending[i]; + const resolved = resolvedMap.get(descriptor._ref); + + // Shallow-clone the event (and eventData if nested) before mutating + if (result[eventIndex] === events[eventIndex]) { + result[eventIndex] = { ...events[eventIndex] }; + } + const event = result[eventIndex]; + + if (refType === 'entity') { + // Legacy: eventDataRef → eventData, remove the ref field + event.eventData = resolved; + delete event.eventDataRef; + } else if (refType === 'nested' && fieldName) { + // Shallow-clone eventData before mutating if not yet cloned + if (event.eventData === events[eventIndex].eventData) { + event.eventData = { ...event.eventData }; + } + // V2: replace the nested ref descriptor with resolved value + event.eventData[fieldName] = resolved; + } + } + + return result; + }); +} + // Functions export async function getWorkflowRunEvents( params: ListEventsParams | ListEventsByCorrelationIdParams, @@ -84,8 +245,11 @@ export async function getWorkflowRunEvents( if (pagination?.sortOrder) searchParams.set('sortOrder', pagination.sortOrder); if (correlationId) searchParams.set('correlationId', correlationId); - const remoteRefBehavior = resolveData === 'none' ? 'lazy' : 'resolve'; - searchParams.set('remoteRefBehavior', remoteRefBehavior); + + // Always send 'lazy' to the server to avoid memory pressure from resolving + // all refs in memory. When resolveData is 'all', we hydrate refs client-side + // via individual ref resolution requests. + searchParams.set('remoteRefBehavior', 'lazy'); const queryString = searchParams.toString(); const query = queryString ? `?${queryString}` : ''; @@ -97,11 +261,36 @@ export async function getWorkflowRunEvents( endpoint, options: { method: 'GET' }, config, - schema: PaginatedResponseSchema( - remoteRefBehavior === 'lazy' ? EventWithRefsSchema : EventSchema - ), + schema: PaginatedResponseSchema(EventWithRefsSchema), })) as PaginatedResponse; + if (resolveData === 'all') { + // Hydrate refs client-side: resolve all ref descriptors in parallel + const hydratedEvents = await hydrateEventRefs(response.data, config); + + // Re-parse hydrated events through EventSchema to apply type coercions + // (e.g., z.coerce.date() for resumeAt) that EventWithRefsSchema skips. + // Use safeParse to gracefully handle any events that don't match a known + // type — pass them through as-is rather than failing the entire request. + let coercionFailures = 0; + const validatedEvents = hydratedEvents.map((event: any) => { + const result = EventSchema.safeParse(event); + if (!result.success) coercionFailures++; + return result.success ? result.data : event; + }); + if (coercionFailures > 0) { + console.warn( + `[world-vercel] EventSchema coercion failed for ${coercionFailures}/${hydratedEvents.length} events` + ); + } + + return { + ...response, + data: validatedEvents, + }; + } + + // resolveData === 'none': strip eventData and eventDataRef return { ...response, data: response.data.map((event: any) => diff --git a/packages/world-vercel/src/refs.ts b/packages/world-vercel/src/refs.ts new file mode 100644 index 0000000000..9500c7e9e7 --- /dev/null +++ b/packages/world-vercel/src/refs.ts @@ -0,0 +1,201 @@ +import { WorkflowAPIError } from '@workflow/errors'; +import { decode } from 'cbor-x'; +import { + ErrorType, + getSpanKind, + HttpRequestMethod, + HttpResponseStatusCode, + PeerService, + trace, + UrlFull, +} from './telemetry.js'; +import { type APIConfig, getHttpConfig } from './utils.js'; + +/** + * A ref descriptor as returned by workflow-server when `remoteRefBehavior=lazy`. + * Matches the server-side `RefDescriptor` type in `lib/data/remote-ref.ts`. + */ +export interface RefDescriptor { + _type: 'RemoteRef'; + _ref: string; + /** Base64-encoded inline payload. Present only for dbrf: (inline) refs. */ + _data?: string; + /** Content type of the inline payload. Present only for dbrf: refs. */ + _ct?: string; +} + +/** + * Checks if a value is a RefDescriptor object. + */ +export function isRefDescriptor(value: unknown): value is RefDescriptor { + return ( + typeof value === 'object' && + value !== null && + '_type' in value && + '_ref' in value && + typeof (value as { _ref: unknown })._ref === 'string' && + (value as { _type: string })._type === 'RemoteRef' + ); +} + +/** + * Maximum number of concurrent ref resolution requests. + * Limits peak concurrency to avoid overwhelming the server. + */ +const REF_RESOLVE_CONCURRENCY = 10; + +/** + * Resolve a single ref descriptor. + * + * For inline refs (dbrf: prefix), the data is decoded locally from the + * descriptor's `_data` field — no network request is needed. + * + * For S3 refs (s3rf:) and Redis refs (kvrf:), a request is made to the + * `GET /v2/runs/:runId/refs` endpoint on workflow-server which returns + * raw CBOR or binary bytes. + * + * @param descriptor - The ref descriptor to resolve + * @param runId - The runId that owns this ref (used in the URL path) + * @param config - API configuration + */ +export async function resolveRefDescriptor( + descriptor: RefDescriptor, + runId: string, + config?: APIConfig +): Promise { + const ref = descriptor._ref; + + // Inline refs (dbrf:) carry their data in the descriptor — decode locally + if (ref.startsWith('dbrf:')) { + if (!descriptor._data) { + throw new Error(`Inline ref descriptor missing _data field: ${ref}`); + } + const contentType = descriptor._ct ?? 'application/cbor'; + const binaryData = Buffer.from(descriptor._data, 'base64'); + if (contentType === 'application/octet-stream') { + // Buffer is a Uint8Array subclass — return directly to avoid a copy. + return binaryData; + } + // CBOR-encoded data — decode it. Buffer is accepted by cbor-x directly. + return decode(binaryData); + } + + // Remote refs (s3rf:, kvrf:) — fetch raw bytes from the server. + // The server returns the raw stored bytes directly (not wrapped in a + // JSON/CBOR envelope). The Content-Type may be 'application/cbor' (for + // CBOR-encoded data) or 'application/octet-stream' (for raw binary like + // Uint8Array). We handle both content types directly rather than going + // through makeRequest, which only handles JSON/CBOR API responses. + const { baseUrl, headers } = await getHttpConfig(config); + const endpoint = `/v2/runs/${encodeURIComponent(runId)}/refs?ref=${encodeURIComponent(ref)}`; + const url = `${baseUrl}${endpoint}`; + + // Set headers that makeRequest normally adds: Accept for content + // negotiation and X-Request-Time to bypass RSC request memoization. + headers.set('Accept', 'application/cbor, application/octet-stream'); + headers.set('X-Request-Time', Date.now().toString()); + + return trace( + 'http GET', + { kind: await getSpanKind('CLIENT') }, + async (span) => { + span?.setAttributes({ + ...HttpRequestMethod('GET'), + ...UrlFull(url), + ...PeerService('workflow-server'), + }); + + const response = await fetch( + new Request(url, { method: 'GET', headers }) + ); + + span?.setAttributes({ + ...HttpResponseStatusCode(response.status), + }); + + if (!response.ok) { + const error = new WorkflowAPIError( + `Failed to resolve ref: HTTP ${response.status} ${response.statusText}`, + { url, status: response.status } + ); + span?.setAttributes({ + ...ErrorType(`HTTP ${response.status}`), + }); + span?.recordException?.(error); + throw error; + } + + const contentType = response.headers.get('Content-Type') || ''; + const buffer = await response.arrayBuffer(); + + if (contentType.includes('application/octet-stream')) { + // Raw binary data (e.g., Uint8Array stored by the workflow) + return new Uint8Array(buffer); + } + + // CBOR-encoded data (the common case for structured values) + return decode(new Uint8Array(buffer)); + } + ); +} + +/** + * A ref descriptor paired with the runId that owns it, for resolution. + */ +export interface RefWithRunId { + descriptor: RefDescriptor; + runId: string; +} + +/** + * Resolve multiple ref descriptors in parallel with bounded concurrency. + * + * If any ref in a batch fails, the batch rejects and remaining batches + * are aborted to avoid cascading failures. + * + * @param refs - Array of ref descriptors with their owning runIds + * @param config - API configuration + * @returns Array of resolved values in the same order as input + */ +export async function resolveRefDescriptors( + refs: RefWithRunId[], + config?: APIConfig +): Promise { + if (refs.length === 0) return []; + + return trace('world.refs.resolve', async (span) => { + const inlineCount = refs.filter((r) => + r.descriptor._ref.startsWith('dbrf:') + ).length; + const remoteCount = refs.length - inlineCount; + + span?.setAttributes({ + 'workflow.refs.total_count': refs.length, + 'workflow.refs.inline_count': inlineCount, + 'workflow.refs.remote_count': remoteCount, + }); + + // Simple case: if under concurrency limit, resolve all at once + if (refs.length <= REF_RESOLVE_CONCURRENCY) { + return Promise.all( + refs.map((r) => resolveRefDescriptor(r.descriptor, r.runId, config)) + ); + } + + // Batch with bounded concurrency. If any ref in a batch fails, + // the batch rejects and remaining batches are aborted to avoid + // cascading failures. + const results: unknown[] = new Array(refs.length); + for (let i = 0; i < refs.length; i += REF_RESOLVE_CONCURRENCY) { + const batch = refs.slice(i, i + REF_RESOLVE_CONCURRENCY); + const batchResults = await Promise.all( + batch.map((r) => resolveRefDescriptor(r.descriptor, r.runId, config)) + ); + for (let j = 0; j < batchResults.length; j++) { + results[i + j] = batchResults[j]; + } + } + + return results; + }); +}