Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .changeset/plain-rivers-behave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
"@workflow/world-postgres": patch
"@workflow/world-testing": patch
"@workflow/world-vercel": patch
"@workflow/world-local": patch
"@workflow/web-shared": patch
"@workflow/world": patch
"@workflow/core": patch
"@workflow/cli": patch
---

**BREAKING CHANGE**: Change user input/output to be binary data (Uint8Array) at the World interface

This is part of specVersion 2 changes where serialization of workflow and step data uses binary format instead of JSON arrays. This allows the workflow client to be fully responsible for the data serialization format and enables future enhancements such as encryption and compression without the World implementation needing to care about the underlying data representation.
6 changes: 4 additions & 2 deletions packages/cli/src/lib/inspect/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import type {
ListEventsParams,
PaginationOptions,
Step,
StepWithoutData,
WorkflowRun,
WorkflowRunWithoutData,
World,
} from '@workflow/world';
import chalk from 'chalk';
Expand Down Expand Up @@ -543,7 +545,7 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => {
}
}

await setupListPagination<WorkflowRun>({
await setupListPagination<WorkflowRun | WorkflowRunWithoutData>({
initialCursor: opts.cursor,
interactive: opts.interactive,
fetchPage: async (cursor) => {
Expand Down Expand Up @@ -681,7 +683,7 @@ export const listSteps = async (
}
}

await setupListPagination<Step>({
await setupListPagination<Step | StepWithoutData>({
initialCursor: opts.cursor,
interactive: opts.interactive,
fetchPage: async (cursor) => {
Expand Down
7 changes: 5 additions & 2 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ async function triggerWorkflow(

const res = await fetch(url, {
method: 'POST',
headers: getProtectionBypassHeaders(),
body: JSON.stringify(dehydratedArgs),
headers: {
...getProtectionBypassHeaders(),
'Content-Type': 'application/octet-stream',
},
body: dehydratedArgs.buffer as BodyInit,
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

body: dehydratedArgs.buffer ignores byteOffset/byteLength and will send the entire underlying ArrayBuffer if dehydratedArgs is ever a view into a larger buffer. Using the Uint8Array directly (or slicing the buffer to the view range) avoids accidentally sending extra bytes.

Suggested change
body: dehydratedArgs.buffer as BodyInit,
body: dehydratedArgs as BodyInit,

Copilot uses AI. Check for mistakes.
});
if (!res.ok) {
throw new Error(
Expand Down
168 changes: 123 additions & 45 deletions packages/core/src/observability.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import { inspect } from 'node:util';
import { parseClassName } from '@workflow/utils/parse-name';
import { unflatten } from 'devalue';
import {
getCommonRevivers,
hydrateStepArguments,
hydrateStepReturnValue,
hydrateWorkflowArguments,
Expand Down Expand Up @@ -219,26 +221,74 @@ const streamPrintRevivers: Record<string, (value: any) => any> = {
Class: serializedClassToString,
};

/**
* Combined revivers for observability hydration.
* Merges common revivers with stream print revivers.
*/
const getObservabilityRevivers = () => ({
...getCommonRevivers(globalThis),
...streamPrintRevivers,
});

/**
* Check if data is in legacy format (devalue parsed array).
* Legacy specVersion 1 runs stored data as JSON arrays from devalue.
*/
const isLegacyFormat = (data: unknown): data is any[] => {
return Array.isArray(data);
};

/**
* Check if data is in new binary format (Uint8Array).
* New specVersion 2 runs store data as binary Uint8Array.
*/
const isBinaryFormat = (data: unknown): data is Uint8Array => {
return data instanceof Uint8Array;
};

/**
* Hydrate legacy format data (array) using unflatten.
*/
const hydrateLegacyData = (data: any[]): unknown => {
return unflatten(data, getObservabilityRevivers());
};

const hydrateStepIO = <
T extends { stepId?: string; input?: any; output?: any; runId?: string },
>(
step: T
): T => {
let hydratedInput = step.input;
let hydratedOutput = step.output;

// Hydrate input - handle both binary (specVersion 2) and legacy (specVersion 1) formats
if (isBinaryFormat(step.input) && step.input.byteLength > 0) {
hydratedInput = hydrateStepArguments(
step.input,
[],
step.runId as string,
globalThis,
streamPrintRevivers
);
} else if (isLegacyFormat(step.input) && step.input.length > 0) {
hydratedInput = hydrateLegacyData(step.input);
}

// Hydrate output - handle both binary (specVersion 2) and legacy (specVersion 1) formats
if (isBinaryFormat(step.output)) {
hydratedOutput = hydrateStepReturnValue(
step.output,
globalThis,
streamPrintRevivers
);
} else if (isLegacyFormat(step.output) && step.output.length > 0) {
hydratedOutput = hydrateLegacyData(step.output);
}

return {
...step,
input:
step.input && Array.isArray(step.input) && step.input.length
? hydrateStepArguments(
step.input,
[],
step.runId as string,
globalThis,
streamPrintRevivers
)
: step.input,
output: step.output
? hydrateStepReturnValue(step.output, globalThis, streamPrintRevivers)
: step.output,
input: hydratedInput,
output: hydratedOutput,
};
};

Expand All @@ -247,25 +297,37 @@ const hydrateWorkflowIO = <
>(
workflow: T
): T => {
let hydratedInput = workflow.input;
let hydratedOutput = workflow.output;

// Hydrate input - handle both binary (specVersion 2) and legacy (specVersion 1) formats
if (isBinaryFormat(workflow.input) && workflow.input.byteLength > 0) {
hydratedInput = hydrateWorkflowArguments(
workflow.input,
globalThis,
streamPrintRevivers
);
} else if (isLegacyFormat(workflow.input) && workflow.input.length > 0) {
hydratedInput = hydrateLegacyData(workflow.input);
}

// Hydrate output - handle both binary (specVersion 2) and legacy (specVersion 1) formats
if (isBinaryFormat(workflow.output)) {
hydratedOutput = hydrateWorkflowReturnValue(
workflow.output,
[],
workflow.runId as string,
globalThis,
streamPrintRevivers
);
} else if (isLegacyFormat(workflow.output) && workflow.output.length > 0) {
hydratedOutput = hydrateLegacyData(workflow.output);
}

return {
...workflow,
input:
workflow.input && Array.isArray(workflow.input) && workflow.input.length
? hydrateWorkflowArguments(
workflow.input,
globalThis,
streamPrintRevivers
)
: workflow.input,
output: workflow.output
? hydrateWorkflowReturnValue(
workflow.output,
[],
workflow.runId as string,
globalThis,
streamPrintRevivers
)
: workflow.output,
input: hydratedInput,
output: hydratedOutput,
};
};

Expand All @@ -283,11 +345,19 @@ const hydrateEventData = <
// so we need to hydrate it specifically.
try {
if ('result' in eventData && typeof eventData.result === 'object') {
eventData.result = hydrateStepReturnValue(
eventData.result,
globalThis,
streamPrintRevivers
);
// Handle both binary (specVersion 2) and legacy (specVersion 1) formats
if (isBinaryFormat(eventData.result)) {
eventData.result = hydrateStepReturnValue(
eventData.result,
globalThis,
streamPrintRevivers
);
} else if (
isLegacyFormat(eventData.result) &&
eventData.result.length > 0
) {
eventData.result = hydrateLegacyData(eventData.result);
}
}
} catch (error) {
console.error('Error hydrating event data', error);
Expand All @@ -301,18 +371,26 @@ const hydrateEventData = <
const hydrateHookMetadata = <T extends { hookId?: string; metadata?: any }>(
hook: T
): T => {
let hydratedMetadata = hook.metadata;

if (hook.metadata && 'runId' in hook) {
// Handle both binary (specVersion 2) and legacy (specVersion 1) formats
if (isBinaryFormat(hook.metadata)) {
hydratedMetadata = hydrateStepArguments(
hook.metadata,
[],
hook.runId as string,
globalThis,
streamPrintRevivers
);
} else if (isLegacyFormat(hook.metadata) && hook.metadata.length > 0) {
hydratedMetadata = hydrateLegacyData(hook.metadata);
}
}

return {
...hook,
metadata:
hook.metadata && 'runId' in hook
? hydrateStepArguments(
hook.metadata,
[],
hook.runId as string,
globalThis,
streamPrintRevivers
)
: hook.metadata,
metadata: hydratedMetadata,
};
};

Expand Down
3 changes: 1 addition & 2 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import {
} from './runtime/helpers.js';
import { handleSuspension } from './runtime/suspension-handler.js';
import { getWorld, getWorldHandlers } from './runtime/world.js';
import type { Serializable } from './schemas.js';
import {
getExternalRevivers,
hydrateWorkflowReturnValue,
Expand Down Expand Up @@ -382,7 +381,7 @@ export function workflowEntrypoint(
eventType: 'run_completed',
specVersion: SPEC_VERSION_CURRENT,
eventData: {
output: result as Serializable,
output: result,
},
});

Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/runtime/step-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { getPort } from '@workflow/utils/get-port';
import { SPEC_VERSION_CURRENT, StepInvokePayloadSchema } from '@workflow/world';
import { runtimeLogger } from '../logger.js';
import { getStepFunction } from '../private.js';
import type { Serializable } from '../schemas.js';
import {
dehydrateStepReturnValue,
hydrateStepArguments,
Expand Down Expand Up @@ -285,12 +284,13 @@ const stepHandler = getWorldHandlers().createQueueHandler(

// Complete the step via event (event-sourced architecture)
// The event creation atomically updates the step entity
// result was dehydrated above by dehydrateStepReturnValue, which returns Uint8Array
await world.events.create(workflowRunId, {
eventType: 'step_completed',
specVersion: SPEC_VERSION_CURRENT,
correlationId: stepId,
eventData: {
result: result as Serializable,
result: result as Uint8Array,
},
});

Expand Down
11 changes: 7 additions & 4 deletions packages/core/src/runtime/suspension-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { waitUntil } from '@vercel/functions';
import { WorkflowAPIError } from '@workflow/errors';
import {
type CreateEventRequest,
type SerializedData,
SPEC_VERSION_CURRENT,
type World,
} from '@workflow/world';
Expand All @@ -12,7 +13,6 @@ import type {
WaitInvocationQueueItem,
WorkflowSuspension,
} from '../global.js';
import type { Serializable } from '../schemas.js';
import { dehydrateStepArguments } from '../serialization.js';
import * as Attribute from '../telemetry/semantic-conventions.js';
import { serializeTraceCarrier } from '../telemetry.js';
Expand Down Expand Up @@ -61,10 +61,13 @@ export async function handleSuspension({

// Build hook_created events (World will atomically create hook entities)
const hookEvents: CreateEventRequest[] = hookItems.map((queueItem) => {
const hookMetadata =
const hookMetadata: SerializedData | undefined =
typeof queueItem.metadata === 'undefined'
? undefined
: dehydrateStepArguments(queueItem.metadata, suspension.globalThis);
: (dehydrateStepArguments(
queueItem.metadata,
suspension.globalThis
) as SerializedData);
return {
eventType: 'hook_created' as const,
specVersion: SPEC_VERSION_CURRENT,
Expand Down Expand Up @@ -142,7 +145,7 @@ export async function handleSuspension({
correlationId: queueItem.correlationId,
eventData: {
stepName: queueItem.stepName,
input: dehydratedInput as Serializable,
input: dehydratedInput as SerializedData,
},
};
try {
Expand Down
Loading
Loading