Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/encryptor-interface.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@workflow/core": patch
"@workflow/world": patch
"@workflow/cli": patch
"@workflow/world-testing": patch
Comment thread
TooTallNate marked this conversation as resolved.
---

Add `World.getEncryptionKeyForRun()` and thread encryption key through serialization layer
13 changes: 12 additions & 1 deletion packages/cli/src/lib/inspect/hydration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,20 @@ function getRevivers(): Revivers {
// Public API
// ---------------------------------------------------------------------------

/** Resolver function that retrieves the encryption key for a given run ID. */
export type EncryptionKeyResolver =
| ((runId: string) => Promise<Uint8Array | undefined>)
| null;

/**
* Hydrate the serialized data fields of a resource for CLI display.
*
* The optional `_encryptionKeyResolver` parameter is accepted for forward
* compatibility with encryption support but is not yet used.
*/
export function hydrateResourceIO<T>(resource: T): T {
export function hydrateResourceIO<T>(
resource: T,
_encryptionKeyResolver?: EncryptionKeyResolver
): T {
return hydrateResourceIOGeneric(resource as any, getRevivers()) as T;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The type of _encryptorResolver is unknown here, but EncryptionKeyResolver is defined in the sibling output.ts. For consistency and forward-compat, consider importing and using that type so when encryption is wired into the CLI hydration, the type is already correct.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a proper EncryptionKeyResolver type definition in hydration.ts matching the one in output.ts, and typed the parameter accordingly.

}
43 changes: 34 additions & 9 deletions packages/cli/src/lib/inspect/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ import type {
World,
} from '@workflow/world';
import chalk from 'chalk';

/** A function that resolves an encryption key for a given runId. */
export type EncryptionKeyResolver =
| ((runId: string) => Promise<Uint8Array | undefined>)
| null;

/** Create an EncryptionKeyResolver from a World instance */
function createResolver(world: World): EncryptionKeyResolver {
if (!world.getEncryptionKeyForRun) return null;
return (runId: string) => world.getEncryptionKeyForRun!(runId);
}

import { formatDistance } from 'date-fns';
import Table from 'easy-table';
import { logger } from '../config/log.js';
Expand Down Expand Up @@ -507,6 +519,7 @@ const inlineFormatIO = <T>(io: T, topLevel: boolean = true): string => {
};

export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => {
const resolveKey = createResolver(world);
if (opts.stepId || opts.runId) {
logger.warn(
'Filtering by step-id or run-id is not supported in list calls, ignoring filter.'
Expand Down Expand Up @@ -535,7 +548,7 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => {
resolveData,
});
const runsWithHydratedIO = await Promise.all(
runs.data.map(hydrateResourceIO)
runs.data.map((r) => hydrateResourceIO(r, resolveKey))
);
showJson({ ...runs, data: runsWithHydratedIO });
return;
Expand Down Expand Up @@ -574,7 +587,9 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => {
}
},
displayPage: async (runs) => {
const runsWithHydratedIO = await Promise.all(runs.map(hydrateResourceIO));
const runsWithHydratedIO = await Promise.all(
runs.map((r) => hydrateResourceIO(r, resolveKey))
);
logger.log(showTable(runsWithHydratedIO, props, opts));
},
});
Expand All @@ -584,13 +599,16 @@ export const getRecentRun = async (
world: World,
opts: InspectCLIOptions = {}
) => {
const resolveKey = createResolver(world);
logger.warn(`No runId provided, fetching data for latest run instead.`);
try {
const runs = await world.runs.list({
pagination: { limit: 1, sortOrder: opts.sort || 'desc' },
resolveData: 'none', // Don't need data for just getting the ID
});
runs.data = await Promise.all(runs.data.map(hydrateResourceIO));
runs.data = await Promise.all(
runs.data.map((r) => hydrateResourceIO(r, resolveKey))
);
return runs.data[0];
} catch (error) {
if (handleApiError(error, opts.backend)) {
Expand All @@ -605,12 +623,13 @@ export const showRun = async (
runId: string,
opts: InspectCLIOptions = {}
) => {
const resolveKey = createResolver(world);
if (opts.withData) {
logger.warn('`withData` flag is ignored when showing individual resources');
}
try {
const run = await world.runs.get(runId, { resolveData: 'all' });
const runWithHydratedIO = await hydrateResourceIO(run);
const runWithHydratedIO = await hydrateResourceIO(run, resolveKey);
if (opts.json) {
showJson(runWithHydratedIO);
return;
Expand All @@ -636,6 +655,7 @@ export const listSteps = async (
runId: undefined,
}
) => {
const resolveKey = createResolver(world);
if (opts.stepId) {
logger.warn(
'Filtering by step-id is not supported in list calls, ignoring filter.'
Expand Down Expand Up @@ -714,7 +734,7 @@ export const listSteps = async (
},
displayPage: async (steps) => {
const stepsWithHydratedIO = await Promise.all(
steps.map(hydrateResourceIO)
steps.map((s) => hydrateResourceIO(s, resolveKey))
);
logger.log(showTable(stepsWithHydratedIO, props, opts));
showInspectInfoBox('step');
Expand All @@ -727,6 +747,7 @@ export const showStep = async (
stepId: string,
opts: InspectCLIOptions = {}
) => {
const resolveKey = createResolver(world);
if (opts.withData) {
logger.warn('`withData` flag is ignored when showing individual resources');
}
Expand All @@ -739,7 +760,7 @@ export const showStep = async (
const step = await world.steps.get(opts.runId, stepId, {
resolveData: 'all',
});
const stepWithHydratedIO = await hydrateResourceIO(step);
const stepWithHydratedIO = await hydrateResourceIO(step, resolveKey);
if (opts.json) {
showJson(stepWithHydratedIO);
return;
Expand Down Expand Up @@ -923,6 +944,7 @@ export const listEvents = async (
};

export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => {
const resolveKey = createResolver(world);
if (opts.workflowName) {
logger.warn(
'Filtering by workflow-name is not supported for hooks, ignoring filter.'
Expand Down Expand Up @@ -955,7 +977,7 @@ export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => {
resolveData,
});
const hydratedHooks = await Promise.all(
hooks.data.map(hydrateResourceIO)
hooks.data.map((h) => hydrateResourceIO(h, resolveKey))
);
showJson({ ...hooks, data: hydratedHooks });
return;
Expand Down Expand Up @@ -1000,7 +1022,9 @@ export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => {
}
},
displayPage: async (hooks) => {
const hydratedHooks = await Promise.all(hooks.map(hydrateResourceIO));
const hydratedHooks = await Promise.all(
hooks.map((h) => hydrateResourceIO(h, resolveKey))
);
logger.log(showTable(hydratedHooks, HOOK_LISTED_PROPS, opts));
showInspectInfoBox('hook');
},
Expand All @@ -1012,14 +1036,15 @@ export const showHook = async (
hookId: string,
opts: InspectCLIOptions = {}
) => {
const resolveKey = createResolver(world);
if (opts.withData) {
logger.warn('`withData` flag is ignored when showing individual resources');
}
try {
const hook = await world.hooks.get(hookId, {
resolveData: 'all',
});
const hydratedHook = await hydrateResourceIO(hook);
const hydratedHook = await hydrateResourceIO(hook, resolveKey);
if (opts.json) {
showJson(hydratedHook);
return;
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/private.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ export function getStepFunction(stepId: string): StepFunction | undefined {
export { __private_getClosureVars } from './step/get-closure-vars.js';

export interface WorkflowOrchestratorContext {
runId: string;
encryptionKey: Uint8Array | undefined;
globalThis: typeof globalThis;
eventsConsumer: EventsConsumer;
/**
Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,14 @@ export function workflowEntrypoint(
replaySpan?.setAttributes({
...Attribute.WorkflowEventsCount(events.length),
});
// Resolve the encryption key for this run's deployment
const encryptionKey =
await world.getEncryptionKeyForRun?.(runId);
return await runWorkflow(
workflowCode,
workflowRun,
events
events,
encryptionKey
);
}
);
Expand Down
53 changes: 37 additions & 16 deletions packages/core/src/runtime/resume-hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,33 @@ import { getWorkflowQueueName } from './helpers.js';
import { getWorld } from './world.js';

/**
* Get the hook by token to find the associated workflow run,
* and hydrate the `metadata` property if it was set from within
* the workflow run.
*
* @param token - The unique token identifying the hook
* Internal helper that returns both the hook and the resolved encryption key.
*/
export async function getHookByToken(token: string): Promise<Hook> {
async function getHookByTokenWithKey(
token: string
): Promise<{ hook: Hook; encryptionKey: Uint8Array | undefined }> {
const world = getWorld();
const hook = await world.hooks.getByToken(token);
const encryptionKey = await world.getEncryptionKeyForRun?.(hook.runId);
if (typeof hook.metadata !== 'undefined') {
hook.metadata = await hydrateStepArguments(
hook.metadata as any,
[],
hook.runId
hook.runId,
encryptionKey
);
}
return { hook, encryptionKey };
}

/**
* Get the hook by token to find the associated workflow run,
* and hydrate the `metadata` property if it was set from within
* the workflow run.
*
* @param token - The unique token identifying the hook
*/
export async function getHookByToken(token: string): Promise<Hook> {
const { hook } = await getHookByTokenWithKey(token);
return hook;
}

Expand Down Expand Up @@ -68,17 +79,26 @@ export async function getHookByToken(token: string): Promise<Hook> {
*/
export async function resumeHook<T = any>(
tokenOrHook: string | Hook,
payload: T
payload: T,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: _encryptionKey uses the underscore-prefix convention that typically signals an unused parameter, but it is used on lines 106 and 110. Consider renaming to encryptionKeyOverride or just encryptionKey to better communicate intent.

(I see @VaguelySerious flagged this too — just confirming it's still present in the latest revision.)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — renamed to encryptionKeyOverride to properly communicate intent.

encryptionKeyOverride?: Uint8Array | undefined
): Promise<Hook> {
Comment thread
TooTallNate marked this conversation as resolved.
return await waitedUntil(() => {
return trace('hook.resume', async (span) => {
const world = getWorld();

try {
const hook =
typeof tokenOrHook === 'string'
? await getHookByToken(tokenOrHook)
: tokenOrHook;
let hook: Hook;
let encryptionKey: Uint8Array | undefined;
if (typeof tokenOrHook === 'string') {
const result = await getHookByTokenWithKey(tokenOrHook);
hook = result.hook;
encryptionKey = encryptionKeyOverride ?? result.encryptionKey;
} else {
hook = tokenOrHook;
encryptionKey =
encryptionKeyOverride ??
(await world.getEncryptionKeyForRun?.(hook.runId));
}

span?.setAttributes({
...Attribute.HookToken(hook.token),
Expand All @@ -91,8 +111,9 @@ export async function resumeHook<T = any>(
const v1Compat = isLegacySpecVersion(hook.specVersion);
const dehydratedPayload = await dehydrateStepReturnValue(
payload,
ops,
hook.runId,
encryptionKey,
ops,
globalThis,
v1Compat
);
Expand Down Expand Up @@ -200,7 +221,7 @@ export async function resumeWebhook(
token: string,
request: Request
): Promise<Response> {
const hook = await getHookByToken(token);
const { hook, encryptionKey } = await getHookByTokenWithKey(token);

let response: Response | undefined;
let responseReadable: ReadableStream<Response> | undefined;
Expand Down Expand Up @@ -229,7 +250,7 @@ export async function resumeWebhook(
response = new Response(null, { status: 202 });
}

await resumeHook(hook, request);
await resumeHook(hook, request, encryptionKey);

if (responseReadable) {
// Wait for the readable stream to emit one chunk,
Expand Down
9 changes: 8 additions & 1 deletion packages/core/src/runtime/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,14 @@ export class Run<TResult> {
const run = await this.world.runs.get(this.runId);

if (run.status === 'completed') {
return await hydrateWorkflowReturnValue(run.output, [], this.runId);
const encryptionKey = await this.world.getEncryptionKeyForRun?.(
this.runId
);
return await hydrateWorkflowReturnValue(
run.output,
this.runId,
encryptionKey
);
}

if (run.status === 'cancelled') {
Expand Down
8 changes: 7 additions & 1 deletion packages/core/src/runtime/runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,14 @@ export async function recreateRunFromExisting(
): Promise<string> {
try {
const run = await world.runs.get(runId, { resolveData: 'all' });
const encryptionKey = await world.getEncryptionKeyForRun?.(runId);
const workflowArgs = normalizeWorkflowArgs(
await hydrateWorkflowArguments(run.input, globalThis)
await hydrateWorkflowArguments(
run.input,
runId,
encryptionKey,
globalThis
)
);
const specVersion =
options.specVersion ?? run.specVersion ?? SPEC_VERSION_LEGACY;
Expand Down
9 changes: 8 additions & 1 deletion packages/core/src/runtime/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,19 @@ export async function start<TArgs extends unknown[], TResult>(
const specVersion = opts.specVersion ?? SPEC_VERSION_CURRENT;
const v1Compat = isLegacySpecVersion(specVersion);

// Resolve encryption key for the new run. The runId has already been
// generated above (client-generated ULID) and will be used for both
// key derivation and the run_created event. The World implementation
// uses the runId for per-run HKDF key derivation.
const encryptionKey = await world.getEncryptionKeyForRun?.(runId);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this isn't adequately documented in the world interface . If a placeholder "works", then getEncryptionKeyForRun on the world should allow null or something similar and describe how the fallback works.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment. The runId is a client-generated ULID that has already been created at that point — it is the actual runId that will be used for the run_created event and key derivation. Clarified the comment to explain this.


// Create run via run_created event (event-sourced architecture)
// Pass client-generated runId - server will accept and use it
const workflowArguments = await dehydrateWorkflowArguments(
args,
ops,
runId,
encryptionKey,
ops,
globalThis,
v1Compat
);
Expand Down
Loading