Skip to content

Commit 7618ac3

Browse files
authored
Wire AES-GCM encryption into serialization layer (#1251)
* fix(core): chain unconsumed event check onto promiseQueue to prevent false positives The EventsConsumer's unconsumed event check (setTimeout(0)) was racing against the promiseQueue's async deserialization. When parallel steps completed and their hydrateStepReturnValue did real async work (e.g., decryption), the setTimeout(0) fired before the promise chain resolved the step results and triggered the next subscribe() call. This caused step_created events for sequential steps to be falsely flagged as unconsumed/orphaned. Fix: chain the unconsumed check onto the promiseQueue via getPromiseQueue() so it only fires after all pending async work completes. Use process.nextTick (not setTimeout) after the queue drains to give synchronous subscribe() calls from resolved user code a chance to cancel. Version-based cancellation replaces clearTimeout since the check is now promise-based. Adds getPromiseQueue option to EventsConsumerOptions. The workflow.ts context uses a getter/setter to keep the promiseQueue holder in sync. Reproduction test: parallel steps A+B with 10ms mock deserialization delay, followed by sequential step C. Previously failed with 'Unconsumed event: step_created(C)'. Now passes. * fix: chain hydrateWorkflowArguments onto promiseQueue to prevent false unconsumed events The unconsumed event check was firing during the async gap between run_started consumption and the workflow function subscribing its first step callbacks. This happened because hydrateWorkflowArguments is async, and during its await, the EventsConsumer advanced to step_created events that had no subscriber yet. Fix: chain hydrateWorkflowArguments onto the promiseQueue so the unconsumed check (which waits for the queue to drain) doesn't fire until after the workflow arguments are hydrated and the workflow function has been invoked. * fix: use setTimeout(0) macrotask for unconsumed check to ensure VM promise propagation completes The process.nextTick-based unconsumed check was still racing against VM promise propagation. After promiseQueue resolves and the user code's resolve() fires, there are multiple microtask hops through the VM boundary before the workflow code actually calls subscribe() for the next steps. process.nextTick fires before those VM microtasks complete. setTimeout(0) is a macrotask that is guaranteed to fire only after ALL microtasks (including VM promise chain propagation) have drained. The pendingUnconsumedTimeout handle is stored and cleared in subscribe() to prevent keeping the event loop alive unnecessarily. * fix: increase unconsumed event check delay to 100ms for cross-VM promise propagation setTimeout(0) is insufficient because Node.js does not guarantee that macrotasks fire after all cross-context (VM boundary) microtasks settle. After promiseQueue resolves and resolve() fires in the host context, there are multiple microtask hops through the VM boundary before the workflow code actually calls subscribe(). A 100ms delay provides sufficient time for this propagation while still detecting truly orphaned events promptly. Also update sleep.test.ts to wait 200ms for the unconsumed check. * Add browser-compatible AES-GCM to core and HKDF key derivation to world-vercel * update changeset * Move HKDF key derivation server-side: API returns per-run derived key * Refactor encrypt/decrypt to accept CryptoKey, export importKey for callers to import once per run * Overload getEncryptionKeyForRun: accept context for start(), fetch WorkflowRun in resume-hook * Split changeset into per-package descriptions for world, world-vercel, and core * Remove unnecessary Uint8Array.from() wrapper around Buffer.from() * Use zod to parse Vercel API response * Wire encryption into serialization layer * Wire AES-GCM encryption into serialization layer * update changeset * Add encryption unit tests: primitives, maybeEncrypt/maybeDecrypt, isEncrypted, complex type round-trips * Accept CryptoKey in encrypt/decrypt, export importKey for callers to import once per run * Fix review comments: cache stream encryption key, remove redundant casts, fix stale comments * Trying to clean up some type non-sense * fix: restore world-vercel files to main versions The rebase incorrectly picked up older versions of these files from early encryption branch commits. The main versions are correct and up-to-date. * fix: add type cast for hydrateStepReturnValue return in hook.ts * fix: address review feedback on encryption PR - Remove Vercel-specific error message from maybeDecrypt (core should not reference VERCEL_DEPLOYMENT_KEY) - Move stream encryption/decryption from transport layer (WorkflowServerReadableStream/WritableStream) to framing layer (getSerializeStream/getDeserializeStream). Frame length headers stay in the clear so frame boundaries are always parseable regardless of transport chunking; encryption wraps the frame payload. - Remove explicit Promise<unknown> return types from all 4 hydrate functions. On main these had inferred types (any from devalue), so callers didn't need casts. The encryption branch added explicit annotations that broke this. - Revert unnecessary type casts in run.ts, step-handler.ts, hook.ts that were only needed due to the explicit Promise<unknown> annotations - Revert closureVars type from unknown back to Record<string, any> in context-storage.ts to match the contract with getClosureVars - Fix hydrateWorkflowArguments JSDoc for unused _runId parameter * Revert more unnecessary changes * cleanup: remove unused runId param, deduplicate processFrames, add legacy comments - Remove unused _runId parameter from WorkflowServerReadableStream constructor and all 4 call sites - Deduplicate processFrames decryption: decrypt first and reassign format/payload, then fall through to single deserialization path - Add comments on all legacy non-Uint8Array branches explaining when this happens (specVersion 1 runs stored data as plain JSON arrays) - Fix duplicate code block in hydrateStepReturnValue * feat: wire cryptoKey through stream serialize/deserialize pipeline Thread the encryption key through the entire stream serialization chain so that ReadableStream and WritableStream values are encrypted/decrypted at the framing level. - Add optional cryptoKey param to getExternalReducers, getStepReducers, getExternalRevivers, getStepRevivers - Pass cryptoKey to getSerializeStream/getDeserializeStream at all 8 internal call sites within reducers/revivers - Thread key from dehydrate/hydrate functions into their reducers/revivers - Cache encryption key in Run class (resolved once via getEncryptionKey(), reused for returnValue, getReadable(), etc.) - Make Run#getReadable() async to resolve the cached key before creating the deserialize stream - Add encryptionKey to step context storage so getWritable() can access it during step execution * fix: make cryptoKey required-but-nullable to prevent silent omission, add stream encryption tests Change cryptoKey parameter from optional (cryptoKey?) to required-but- nullable (cryptoKey: CryptoKey | undefined) on all 6 functions: - getSerializeStream, getDeserializeStream - getExternalReducers, getStepReducers - getExternalRevivers, getStepRevivers This ensures every call site must explicitly pass the key or undefined, making it impossible to accidentally omit it and silently skip encryption. Add 7 stream encryption round-trip tests: - Encrypted frames have 'encr' prefix inside length header - Full round-trip: encrypt serialize -> decrypt deserialize - Concatenated encrypted frames (transport coalescing) - Split encrypted frames (transport splitting) - Error when encrypted data encountered without key - No encryption when key is undefined - Large payload round-trip Full audit confirms all encryption key threading is complete: - All 8 dehydrate/hydrate functions pass key to reducers/revivers - All stream serialize/deserialize call sites pass key - Run class caches key for reuse across returnValue and getReadable() - Step context storage carries key for getWritable() * fix: keep Run#getReadable() sync, resolve encryption key lazily in streams - Revert Run#getReadable() to synchronous (non-breaking API). The encryption key is passed as a Promise through the chain and resolved lazily inside the first async transform() call. - Add EncryptionKeyParam type alias that accepts CryptoKey, undefined, or Promise<CryptoKey | undefined>. Used by getSerializeStream, getDeserializeStream, and all reducer/reviver functions. - Key promises are resolved once on first use via a keyState cache object inside each stream's transform closure. - Fix CLI showStream to resolve encryption key from world when runId is provided via --run flag, instead of passing undefined. - Remove incorrect CLI warning that --run is not supported for streams (it is now needed for encrypted stream decryption). * . * fix: address review feedback from PR #1251 - Fix 4 broken dehydrateWorkflowArguments calls in workflow.test.ts that were passing ops as runId (missing runId and key params) - Use WorkflowRuntimeError instead of plain Error in decodeFormatPrefix for unknown serialization formats, for consistency and programmatic error handling - Document maybeDecrypt throw behavior: callers should be aware this surfaces as a rejected promise during key rotation/misconfiguration - Document key-fetch rejection timing in streams: promise rejection won't surface until the first chunk is processed
1 parent 60bc9d5 commit 7618ac3

9 files changed

Lines changed: 1171 additions & 136 deletions

File tree

.changeset/e2e-encryption.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@workflow/core": patch
3+
---
4+
5+
Wire AES-GCM encryption into serialization layer with stream support

packages/cli/src/lib/inspect/output.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import { importKey } from '@workflow/core/encryption';
12
import {
3+
type EncryptionKeyParam,
24
getDeserializeStream,
35
getExternalRevivers,
46
} from '@workflow/core/serialization';
@@ -780,16 +782,32 @@ export const showStream = async (
780782
streamId: string,
781783
opts: InspectCLIOptions = {}
782784
) => {
783-
if (opts.runId || opts.stepId) {
785+
if (opts.stepId) {
784786
logger.warn(
785-
'Filtering by run-id or step-id is not supported when showing a stream, ignoring filter.'
787+
'Filtering by step-id is not supported when showing a stream, ignoring filter.'
786788
);
787789
}
788790
const rawStream = await world.readFromStream(streamId);
789791

792+
// Resolve the encryption key if a runId is provided (needed for encrypted streams).
793+
// The key is passed as a promise so stream construction is synchronous —
794+
// it will be resolved lazily on the first encrypted frame.
795+
let encryptionKey: EncryptionKeyParam;
796+
if (opts.runId) {
797+
encryptionKey = (async () => {
798+
const rawKey = await world.getEncryptionKeyForRun?.(opts.runId!);
799+
return rawKey ? await importKey(rawKey) : undefined;
800+
})();
801+
}
802+
790803
// Deserialize the stream to get JavaScript objects
791-
const revivers = getExternalRevivers(globalThis, [], '');
792-
const transform = getDeserializeStream(revivers);
804+
const revivers = getExternalRevivers(
805+
globalThis,
806+
[],
807+
opts.runId ?? '',
808+
encryptionKey
809+
);
810+
const transform = getDeserializeStream(revivers, encryptionKey);
793811
const stream = rawStream.pipeThrough(transform);
794812

795813
logger.info('Streaming to stdout, press CTRL+C to abort.');

packages/core/src/runtime/run.ts

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
type WorkflowRunStatus,
99
type World,
1010
} from '@workflow/world';
11-
import { importKey } from '../encryption.js';
11+
import { type CryptoKey, importKey } from '../encryption.js';
1212
import {
1313
getExternalRevivers,
1414
hydrateWorkflowReturnValue,
@@ -63,11 +63,35 @@ export class Run<TResult> {
6363
*/
6464
private world: World;
6565

66+
/**
67+
* Cached encryption key resolution. Resolved once on first use and
68+
* reused for returnValue, getReadable(), etc.
69+
* @internal
70+
*/
71+
private encryptionKeyPromise: Promise<CryptoKey | undefined> | null = null;
72+
6673
constructor(runId: string) {
6774
this.runId = runId;
6875
this.world = getWorld();
6976
}
7077

78+
/**
79+
* Resolves and caches the encryption key for this run.
80+
* The key is the same for the lifetime of a run, so it only needs
81+
* to be resolved once.
82+
* @internal
83+
*/
84+
private getEncryptionKey(): Promise<CryptoKey | undefined> {
85+
if (!this.encryptionKeyPromise) {
86+
this.encryptionKeyPromise = (async () => {
87+
const run = await this.world.runs.get(this.runId);
88+
const rawKey = await this.world.getEncryptionKeyForRun?.(run);
89+
return rawKey ? await importKey(rawKey) : undefined;
90+
})();
91+
}
92+
return this.encryptionKeyPromise;
93+
}
94+
7195
/**
7296
* Interrupts pending `sleep()` calls, resuming the workflow early.
7397
*
@@ -153,7 +177,15 @@ export class Run<TResult> {
153177
): ReadableStream<R> {
154178
const { ops = [], global = globalThis, startIndex, namespace } = options;
155179
const name = getWorkflowRunStreamId(this.runId, namespace);
156-
return getExternalRevivers(global, ops, this.runId).ReadableStream({
180+
// Pass the key as a promise — it will be resolved lazily inside
181+
// the first async transform() call of the deserialize stream.
182+
const encryptionKey = this.getEncryptionKey();
183+
return getExternalRevivers(
184+
global,
185+
ops,
186+
this.runId,
187+
encryptionKey
188+
).ReadableStream({
157189
name,
158190
startIndex,
159191
}) as ReadableStream<R>;
@@ -170,8 +202,7 @@ export class Run<TResult> {
170202
const run = await this.world.runs.get(this.runId);
171203

172204
if (run.status === 'completed') {
173-
const rawKey = await this.world.getEncryptionKeyForRun?.(run);
174-
const encryptionKey = rawKey ? await importKey(rawKey) : undefined;
205+
const encryptionKey = await this.getEncryptionKey();
175206
return await hydrateWorkflowReturnValue(
176207
run.output,
177208
this.runId,

packages/core/src/runtime/step-handler.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ const stepHandler = getWorldHandlers().createQueueHandler(
356356
},
357357
ops,
358358
closureVars: hydratedInput.closureVars,
359+
encryptionKey,
359360
},
360361
() => stepFn.apply(thisVal, args)
361362
);

0 commit comments

Comments
 (0)