Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/hip-crabs-vanish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/world-vercel": patch
---

Vary ref resolution concurrency based on header
37 changes: 27 additions & 10 deletions packages/world-vercel/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ function collectPendingRefs(events: any[]): PendingRef[] {
*/
async function hydrateEventRefs(
events: any[],
config?: APIConfig
config?: APIConfig,
refResolveConcurrency?: number
): Promise<any[]> {
const pending = collectPendingRefs(events);
if (pending.length === 0) return events;
Expand All @@ -174,14 +175,16 @@ async function hydrateEventRefs(
const deduped = Array.from(uniqueRefs.values());

// Resolve unique descriptors in parallel with bounded concurrency
const dedupedResults = await resolveRefDescriptors(deduped, config).catch(
(err) => {
const msg = err instanceof Error ? err.message : String(err);
throw new Error(
`Failed to hydrate ${pending.length} ref(s) across ${events.length} event(s): ${msg}`
);
}
);
const dedupedResults = await resolveRefDescriptors(
deduped,
config,
refResolveConcurrency
).catch((err) => {
const msg = err instanceof Error ? err.message : String(err);
throw new Error(
`Failed to hydrate ${pending.length} ref(s) across ${events.length} event(s): ${msg}`
);
});

// Build a map from ref key → resolved value for fast lookup
const resolvedMap = new Map<string, unknown>();
Expand Down Expand Up @@ -257,16 +260,30 @@ export async function getWorkflowRunEvents(
? `/v2/events${query}`
: `/v2/runs/${runId}/events${query}`;

let refResolveConcurrency: number | undefined;
const response = (await makeRequest({
endpoint,
options: { method: 'GET' },
config,
schema: PaginatedResponseSchema(EventWithRefsSchema),
onResponse: (res) => {
const header = res.headers.get('x-ref-resolve-concurrency');
if (header) {
const parsed = parseInt(header, 10);
if (!Number.isNaN(parsed) && parsed > 0) {
refResolveConcurrency = parsed;
}
}
},
})) as PaginatedResponse<Event>;

if (resolveData === 'all') {
// Hydrate refs client-side: resolve all ref descriptors in parallel
const hydratedEvents = await hydrateEventRefs(response.data, config);
const hydratedEvents = await hydrateEventRefs(
response.data,
config,
refResolveConcurrency
);

// Re-parse hydrated events through EventSchema to apply type coercions
// (e.g., z.coerce.date() for resumeAt) that EventWithRefsSchema skips.
Expand Down
13 changes: 9 additions & 4 deletions packages/world-vercel/src/refs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,18 @@ export interface RefWithRunId {
*
* @param refs - Array of ref descriptors with their owning runIds
* @param config - API configuration
* @param concurrency - Max concurrent ref resolution requests. Falls back to REF_RESOLVE_CONCURRENCY.
* @returns Array of resolved values in the same order as input
*/
export async function resolveRefDescriptors(
refs: RefWithRunId[],
config?: APIConfig
config?: APIConfig,
concurrency?: number
): Promise<unknown[]> {
if (refs.length === 0) return [];

const limit = concurrency ?? REF_RESOLVE_CONCURRENCY;

return trace('world.refs.resolve', async (span) => {
const inlineCount = refs.filter((r) =>
r.descriptor._ref.startsWith('dbrf:')
Expand All @@ -173,10 +177,11 @@ export async function resolveRefDescriptors(
'workflow.refs.total_count': refs.length,
'workflow.refs.inline_count': inlineCount,
'workflow.refs.remote_count': remoteCount,
'workflow.refs.concurrency_limit': limit,
});

// Simple case: if under concurrency limit, resolve all at once
if (refs.length <= REF_RESOLVE_CONCURRENCY) {
if (refs.length <= limit) {
return Promise.all(
refs.map((r) => resolveRefDescriptor(r.descriptor, r.runId, config))
);
Expand All @@ -186,8 +191,8 @@ export async function resolveRefDescriptors(
// the batch rejects and remaining batches are aborted to avoid
// cascading failures.
const results: unknown[] = new Array(refs.length);
for (let i = 0; i < refs.length; i += REF_RESOLVE_CONCURRENCY) {
const batch = refs.slice(i, i + REF_RESOLVE_CONCURRENCY);
for (let i = 0; i < refs.length; i += limit) {
const batch = refs.slice(i, i + limit);
const batchResults = await Promise.all(
batch.map((r) => resolveRefDescriptor(r.descriptor, r.runId, config))
);
Expand Down
6 changes: 6 additions & 0 deletions packages/world-vercel/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,16 @@ export async function makeRequest<T>({
config = {},
schema,
data,
onResponse,
}: {
endpoint: string;
options?: Omit<RequestInit, 'body'>;
config?: APIConfig;
schema: z.ZodSchema<T>;
/** Request body data - will be CBOR encoded */
data?: unknown;
/** Optional callback invoked with the raw Response before body consumption. Use to read response headers. */
onResponse?: (response: Response) => void;
}): Promise<T> {
const method = options.method || 'GET';
const { baseUrl, headers } = await getHttpConfig(config);
Expand Down Expand Up @@ -321,6 +324,9 @@ export async function makeRequest<T>({
throw error;
}

// Expose response headers to caller before consuming the body
onResponse?.(response);

// Parse the response body (CBOR or JSON) with tracing
let parseResult: ParseResult;
try {
Expand Down
Loading