diff --git a/packages/cli/package.json b/packages/cli/package.json index f663fc92e6..216bfe2bd8 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -54,7 +54,7 @@ "@workflow/world-vercel": "workspace:*", "boxen": "8.0.1", "builtin-modules": "5.0.0", - "chalk": "5.6.2", + "chalk": "catalog:", "chokidar": "4.0.3", "date-fns": "4.1.0", "easy-table": "1.2.0", diff --git a/packages/cli/src/lib/inspect/vercel-api.ts b/packages/cli/src/lib/inspect/vercel-api.ts index 29c8a95724..b060fd0cf1 100644 --- a/packages/cli/src/lib/inspect/vercel-api.ts +++ b/packages/cli/src/lib/inspect/vercel-api.ts @@ -1,4 +1,4 @@ -import chalk from 'chalk'; +import { Ansi } from '@workflow/errors'; import { logger } from '../config/log.js'; interface VercelTeam { @@ -31,19 +31,17 @@ export async function fetchTeamInfo( if (response.status === 401 || response.status === 403) { logger.error( - chalk.red( - `Authentication failed (${response.status}): Unable to access team information` + Ansi.frame( + `Authentication failed (${response.status}): unable to access team information`, + [ + Ansi.hint([ + 'ensure you are logged in and have access to the team:', + `1. Sign into your Vercel account with ${Ansi.code('npx vercel login')}`, + `2. Sync environment variables with ${Ansi.code('npx vercel env pull')}`, + ]), + ] ) ); - logger.warn( - chalk.yellow( - '\nPlease ensure you are logged in and have access to the team:' - ) - ); - logger.warn(chalk.yellow(' 1. Run `vercel login` to authenticate')); - logger.warn( - chalk.yellow(' 2. Run `vercel env pull` to sync environment variables') - ); return null; } diff --git a/packages/core/__mocks__/chalk.ts b/packages/core/__mocks__/chalk.ts new file mode 100644 index 0000000000..3c7ecba8d7 --- /dev/null +++ b/packages/core/__mocks__/chalk.ts @@ -0,0 +1,26 @@ +// Mock implementation of the 'chalk' library for testing purposes +// This mock wraps text in HTML-like tags to indicate styles +// because terminal styling is unreadable in test snapshots. + +import type { ChalkInstance } from 'chalk'; + +const short = new Map([ + ['italic', 'i'], + ['bold', 'b'], +]); + +function createChalkMock(currentModifiers: string[] = []): ChalkInstance { + return new Proxy(() => {}, { + get(_, prop: string) { + return createChalkMock([...currentModifiers, short.get(prop) || prop]); + }, + apply(_target, _thisArg, [text]) { + return currentModifiers.reduceRight((acc, mod) => { + const tag = String(mod); + return `<${tag}>${acc}`; + }, text as string); + }, + }) as ChalkInstance; +} + +export default createChalkMock(); diff --git a/packages/core/package.json b/packages/core/package.json index d80e78187c..42613fad87 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -56,6 +56,7 @@ "@workflow/world": "workspace:*", "@workflow/world-local": "workspace:*", "@workflow/world-vercel": "workspace:*", + "chalk": "catalog:", "debug": "4.4.3", "devalue": "5.6.0", "ms": "2.1.3", diff --git a/packages/core/src/create-hook.ts b/packages/core/src/create-hook.ts index eb8f2f778f..30d78bdd2a 100644 --- a/packages/core/src/create-hook.ts +++ b/packages/core/src/create-hook.ts @@ -1,3 +1,4 @@ +import { NotInWorkflowContextError } from './not-in-workflow-context-error.js'; import type { Serializable } from './schemas.js'; /** @@ -117,8 +118,9 @@ export interface WebhookOptions extends HookOptions { */ // @ts-expect-error `options` is here for types/docs export function createHook(options?: HookOptions): Hook { - throw new Error( - '`createHook()` can only be called inside a workflow function' + throw new NotInWorkflowContextError( + 'createHook()', + 'creating hooks: https://useworkflow.dev/docs/foundations/hooks#creating-your-first-hook' ); } @@ -137,7 +139,8 @@ export function createWebhook( // @ts-expect-error `options` is here for types/docs options?: WebhookOptions ): Webhook | Webhook { - throw new Error( - '`createWebhook()` can only be called inside a workflow function' + throw new NotInWorkflowContextError( + 'createWebhook()', + 'creating webhooks: https://useworkflow.dev/docs/foundations/hooks#creating-your-first-webhook' ); } diff --git a/packages/core/src/define-hook.ts b/packages/core/src/define-hook.ts index cd84917c9e..9f824b950e 100644 --- a/packages/core/src/define-hook.ts +++ b/packages/core/src/define-hook.ts @@ -1,6 +1,7 @@ import type { StandardSchemaV1 } from '@standard-schema/spec'; import type { Hook as HookEntity } from '@workflow/world'; import type { Hook, HookOptions } from './create-hook.js'; +import { NotInWorkflowContextError } from './not-in-workflow-context-error.js'; import { resumeHook } from './runtime/resume-hook.js'; /** @@ -78,8 +79,9 @@ export function defineHook({ } = {}): TypedHook { return { create(_options?: HookOptions): Hook { - throw new Error( - '`defineHook().create()` can only be called inside a workflow function.' + throw new NotInWorkflowContextError( + 'defineHook().create()', + `resumeHook(): https://useworkflow.dev/docs/api-reference/workflow-api/resume-hook` ); }, async resume(token: string, payload: TInput): Promise { diff --git a/packages/core/src/global.test.ts b/packages/core/src/global.test.ts index fda309f4f0..67654eea14 100644 --- a/packages/core/src/global.test.ts +++ b/packages/core/src/global.test.ts @@ -1,6 +1,7 @@ import { FatalError } from '@workflow/errors'; -import { describe, expect, it } from 'vitest'; +import { describe, expect, it, vi } from 'vitest'; import { + ENOTSUP, type HookInvocationQueueItem, type QueueItem, type StepInvocationQueueItem, @@ -375,3 +376,15 @@ describe('WorkflowSuspension', () => { expect(error.message).toBe('1 hook has not been created yet'); }); }); + +vi.mock('chalk'); +it('renders ENOTSUP nicely', () => { + expect(() => + ENOTSUP('new ReadableStream') + ).toThrowErrorMatchingInlineSnapshot(` + [Error: \`new ReadableStream\` is unsupported in a workflow context. + ├▶ calling this in a workflow context can cause determinism issues. + ╰▶ hint: exit the workflow function by calling a step function. + Read more about workflows and step functions: https://useworkflow.dev/docs/foundations/workflows-and-steps#step-functions] + `); +}); diff --git a/packages/core/src/global.ts b/packages/core/src/global.ts index ed7ce6c6e1..acf2156db7 100644 --- a/packages/core/src/global.ts +++ b/packages/core/src/global.ts @@ -1,4 +1,5 @@ import { pluralize } from '@workflow/utils'; +import { Ansi } from '@workflow/errors'; import type { Serializable } from './schemas.js'; export interface StepInvocationQueueItem { @@ -99,6 +100,17 @@ export class WorkflowSuspension extends Error { } } -export function ENOTSUP(): never { - throw new Error('Not supported in workflow functions'); +export function ENOTSUP(functionName: string): never { + throw new Error( + Ansi.frame( + `${Ansi.code(functionName)} is unsupported in a workflow context.`, + [ + 'calling this in a workflow context can cause determinism issues.', + Ansi.hint([ + `exit the workflow function by calling a step function.`, + `Read more about workflows and step functions: https://useworkflow.dev/docs/foundations/workflows-and-steps#step-functions`, + ]), + ] + ) + ); } diff --git a/packages/core/src/inspectable-error.ts b/packages/core/src/inspectable-error.ts new file mode 100644 index 0000000000..0308ce6df6 --- /dev/null +++ b/packages/core/src/inspectable-error.ts @@ -0,0 +1,22 @@ +import chalk from 'chalk'; + +export class InspectableError extends Error { + cause: unknown; + name = 'InspectableError'; + constructor(entity: 'run' | 'step', id: string, cause: unknown) { + const message = chalk.cyan( + `${chalk.bold('help:')} to inspect or retry manually run ${code(`wf inspect ${entity} ${id}`)}` + ); + super(message, { cause: cause }); + this.cause = cause; + } + + toString() { + return `${this.cause}\n${this.message}`; + } +} + +function code(text: string) { + const tick = chalk.dim('`'); + return chalk.italic(`${tick}${text}${tick}`); +} diff --git a/packages/core/src/not-in-workflow-context-error.test.ts b/packages/core/src/not-in-workflow-context-error.test.ts new file mode 100644 index 0000000000..b1fcb475da --- /dev/null +++ b/packages/core/src/not-in-workflow-context-error.test.ts @@ -0,0 +1,59 @@ +import { expect, onTestFinished, test, vi } from 'vitest'; +import { + NotInStepContextError, + NotInWorkflowContextError, + UnavailableInWorkflowContextError, +} from './not-in-workflow-context-error.js'; +import { + WORKFLOW_CONTEXT_SYMBOL, + type WorkflowMetadata, +} from './workflow/get-workflow-metadata.js'; + +// use html tags instead of actual ansi colors +vi.mock('chalk'); + +test('NotInStepContextError output', () => { + expect(() => { + throw new NotInStepContextError( + 'sleep()', + 'sleep(): https://example.vercel.sh' + ); + }).toThrowErrorMatchingInlineSnapshot(` + [NotInStepContextError: \`sleep()\` can only be called inside a step function + ╰▶ note: Read more about sleep(): https://example.vercel.sh] + `); +}); + +test('NotInWorkflowContextError output', () => { + expect(() => { + throw new NotInWorkflowContextError( + 'createHook()', + 'creating hooks: https://useworkflow.dev/docs/foundations/hooks#creating-your-first-hook' + ); + }).toThrowErrorMatchingInlineSnapshot(` + [NotInWorkflowContextError: \`createHook()\` can only be called inside a workflow function + ╰▶ note: Read more about creating hooks: https://useworkflow.dev/docs/foundations/hooks#creating-your-first-hook] + `); +}); + +test('UnavailableInWorkflowContextError output', () => { + Object.assign(globalThis, { + [WORKFLOW_CONTEXT_SYMBOL]: { + workflowName: 'workflow//workflows/example.ts//myWorkflow', + } as WorkflowMetadata, + }); + onTestFinished(() => { + delete (globalThis as any)[WORKFLOW_CONTEXT_SYMBOL]; + }); + expect(() => { + throw new UnavailableInWorkflowContextError( + `resumeHook()`, + 'resuming hooks: https://useworkflow.dev/docs/foundations/hooks#resuming-a-hook' + ); + }).toThrowErrorMatchingInlineSnapshot(` + [UnavailableInWorkflowContextError: \`resumeHook()\` cannot be called from a workflow context. + ├▶ calling this in a workflow context can cause determinism issues. + ╰▶ note: this call was made from the workflow//workflows/example.ts//myWorkflow workflow context. + Read more about resuming hooks: https://useworkflow.dev/docs/foundations/hooks#resuming-a-hook] + `); +}); diff --git a/packages/core/src/not-in-workflow-context-error.ts b/packages/core/src/not-in-workflow-context-error.ts new file mode 100644 index 0000000000..acfc2a3274 --- /dev/null +++ b/packages/core/src/not-in-workflow-context-error.ts @@ -0,0 +1,54 @@ +import { ansifyStep } from './parse-name.js'; +import { Ansi } from '@workflow/errors'; +import { getWorkflowMetadata } from './workflow/get-workflow-metadata.js'; + +export class NotInWorkflowContextError extends Error { + name = 'NotInWorkflowContextError'; + constructor( + readonly functionName: string, + docLink: `${string}: https://${string}` + ) { + super( + Ansi.frame( + `${Ansi.code(functionName)} can only be called inside a workflow function`, + [Ansi.note(`Read more about ${docLink}`)] + ) + ); + } +} + +export class NotInStepContextError extends Error { + name = 'NotInStepContextError'; + constructor( + readonly functionName: string, + docLink: `${string}: https://${string}` + ) { + super( + Ansi.frame( + `${Ansi.code(functionName)} can only be called inside a step function`, + [Ansi.note(`Read more about ${docLink}`)] + ) + ); + } +} + +export class UnavailableInWorkflowContextError extends Error { + name = 'UnavailableInWorkflowContextError'; + constructor( + readonly functionName: string, + docLink: `${string}: https://${string}` + ) { + const { workflowName } = getWorkflowMetadata(); + const message = Ansi.frame( + `${Ansi.code(functionName)} cannot be called from a workflow context.`, + [ + 'calling this in a workflow context can cause determinism issues.', + Ansi.note([ + `this call was made from the ${ansifyStep(workflowName)} workflow context.`, + `Read more about ${docLink}`, + ]), + ] + ); + super(message); + } +} diff --git a/packages/core/src/parse-name.ts b/packages/core/src/parse-name.ts index 91b376de5a..f2854b7456 100644 --- a/packages/core/src/parse-name.ts +++ b/packages/core/src/parse-name.ts @@ -1,3 +1,5 @@ +import chalk from 'chalk'; + /** * Parse a machine readable name. * @@ -62,3 +64,12 @@ export function parseWorkflowName(name: string) { export function parseStepName(name: string) { return parseName('step', name); } + +export function ansifyStep(name: string) { + if (chalk.level <= 0) { + return name; + } + return name + .replace(/^(workflow|step)\/\//, chalk.dim('$1//')) + .replace(/\/\//g, chalk.dim('//')); +} diff --git a/packages/core/src/private.ts b/packages/core/src/private.ts index d893b459f4..4c4f38b7d6 100644 --- a/packages/core/src/private.ts +++ b/packages/core/src/private.ts @@ -4,6 +4,7 @@ import type { EventsConsumer } from './events-consumer.js'; import type { QueueItem } from './global.js'; +import { ansifyStep } from './parse-name.js'; import type { Serializable } from './schemas.js'; export type StepFunction< @@ -29,6 +30,10 @@ export function getStepFunction(stepId: string): StepFunction | undefined { return registeredSteps.get(stepId); } +export function listRegisteredStepFunctions(): string[] { + return Array.from(registeredSteps.keys()); +} + /** * Get closure variables for the current step function * @internal @@ -36,6 +41,7 @@ export function getStepFunction(stepId: string): StepFunction | undefined { export { __private_getClosureVars } from './step/get-closure-vars.js'; export interface WorkflowOrchestratorContext { + workflowRunId: string; globalThis: typeof globalThis; eventsConsumer: EventsConsumer; /** @@ -47,3 +53,24 @@ export interface WorkflowOrchestratorContext { generateUlid: () => string; generateNanoid: () => string; } + +export class StepNotFoundError extends Error { + name = 'StepNotFoundError'; + constructor(stepName: string, opts?: { registeredSteps: string[] }) { + let allSteps = opts?.registeredSteps ?? listRegisteredStepFunctions(); + if (!stepName.startsWith('__builtin_')) { + allSteps = allSteps.filter((s) => !s.startsWith('__builtin_')); + } + + let steps = allSteps.map((x) => `- ${ansifyStep(x)}`).join('\n'); + steps = steps + ? `.\nAvailable steps:\n${steps}` + : ` (no steps were registered)`; + super( + `Can't find requested step function "${ansifyStep(stepName)}".\nMake sure the step function is registered${steps}` + ); + } + toString() { + return this.message; + } +} diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 60e5609f8c..cdf09d7150 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -11,6 +11,7 @@ import { type World, } from '@workflow/world'; import { WorkflowSuspension } from './global.js'; +import { InspectableError } from './inspectable-error.js'; import { runtimeLogger } from './logger.js'; import { parseWorkflowName } from './parse-name.js'; import { @@ -407,7 +408,9 @@ export function workflowEntrypoint( } } } - ); // End withTraceContext + ).catch((err) => { + throw new InspectableError('run', runId, err); + }); }); } ); diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index cf0db83cf1..73b90cc660 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -121,9 +121,7 @@ export async function resumeHook( traceCarrier: workflowRun.executionContext?.traceCarrier ?? undefined, } satisfies WorkflowInvokePayload, - { - deploymentId: workflowRun.deploymentId, - } + { deploymentId: workflowRun.deploymentId } ); return hook; diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 880709299e..4587601bb2 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -8,8 +8,9 @@ import { import { pluralize } from '@workflow/utils'; import { getPort } from '@workflow/utils/get-port'; import { StepInvokePayloadSchema } from '@workflow/world'; +import { InspectableError } from '../inspectable-error.js'; import { runtimeLogger } from '../logger.js'; -import { getStepFunction } from '../private.js'; +import { getStepFunction, StepNotFoundError } from '../private.js'; import type { Serializable } from '../schemas.js'; import { dehydrateStepReturnValue, @@ -65,7 +66,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( const stepFn = getStepFunction(stepName); if (!stepFn) { - throw new Error(`Step "${stepName}" not found`); + throw new StepNotFoundError(stepName); } if (typeof stepFn !== 'function') { throw new Error( @@ -235,6 +236,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( attempt, }, workflowMetadata: { + workflowName, workflowRunId, workflowStartedAt: new Date(+workflowStartedAt), // TODO: there should be a getUrl method on the world interface itself. This @@ -423,6 +425,8 @@ const stepHandler = getWorldHandlers().createQueueHandler( }); } ); + }).catch((cause) => { + throw new InspectableError('step', stepId, cause); }); } ); diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 2f55fbff3a..b32f4de8c3 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -96,7 +96,7 @@ describe('workflow arguments', () => { `); const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); - expect(hydrated).toBe(BigInt(9007199254740992)); + expect(hydrated).toBe(9007199254740992n); expect(typeof hydrated).toBe('bigint'); }); @@ -718,9 +718,10 @@ describe('workflow arguments', () => { ); // respondWith should throw an error when called from workflow context expect(hydrated.respondWith).toBeInstanceOf(Function); - expect(() => hydrated.respondWith()).toThrow( - '`respondWith()` must be called from within a step function' - ); + expect(() => hydrated.respondWith()).toThrowErrorMatchingInlineSnapshot(` + [NotInStepContextError: \`respondWith()\` can only be called inside a step function + ╰▶ note: Read more about dynamic webhook responses: https://useworkflow.dev/docs/foundations/hooks#dynamic-responses-manual-mode] + `); } finally { (globalThis as any)[STABLE_ULID] = originalStableUlid; } @@ -820,7 +821,7 @@ describe('step function serialization', () => { }); it('should lookup registered step function by name', () => { - const stepName = 'myRegisteredStep'; + const stepName = buildName('step', 'src/steps.ts', 'myRegisteredStep'); const stepFn = async (x: number) => x * 2; // Register the step function @@ -853,16 +854,17 @@ describe('step function serialization', () => { it('should throw error when reviver cannot find registered step function', () => { const revivers = getCommonRevivers(vmGlobalThis); - let err: Error | undefined; - try { - revivers.StepFunction({ stepId: 'nonExistentStep' }); - } catch (err_) { - err = err_ as Error; - } - - expect(err).toBeDefined(); - expect(err?.message).toContain('Step function "nonExistentStep" not found'); - expect(err?.message).toContain('Make sure the step function is registered'); + expect(() => { + revivers.StepFunction({ + stepId: buildName('step', 'my-file.ts', 'nonExistentStep'), + }); + }).toThrowErrorMatchingInlineSnapshot(` + [StepNotFoundError: Can't find requested step function "step//my-file.ts//nonExistentStep". + Make sure the step function is registered. + Available steps: + - step//src/steps.ts//myRegisteredStep + - testStep] + `); }); it('should dehydrate step function passed as argument to a step', () => { @@ -997,3 +999,11 @@ describe('step function serialization', () => { expect(result).toEqual({ stepId: stepName }); }); }); + +export function buildName( + kind: 'workflow' | 'step', + filePath: string, + name: string +) { + return `${kind}//${filePath}//${name}`; +} diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 976f398b0b..60af754823 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -1,7 +1,8 @@ import { WorkflowRuntimeError } from '@workflow/errors'; import { DevalueError, parse, stringify, unflatten } from 'devalue'; import { monotonicFactory } from 'ulid'; -import { getStepFunction } from './private.js'; +import { NotInStepContextError } from './not-in-workflow-context-error.js'; +import { getStepFunction, StepNotFoundError } from './private.js'; import { getWorld } from './runtime/world.js'; import { contextStorage } from './step/context-storage.js'; import { @@ -619,9 +620,7 @@ export function getCommonRevivers(global: Record = globalThis) { const stepFn = getStepFunction(stepId); if (!stepFn) { - throw new Error( - `Step function "${stepId}" not found. Make sure the step function is registered.` - ); + throw new StepNotFoundError(stepId); } // If closure variables were serialized, return a wrapper function @@ -779,8 +778,9 @@ export function getWorkflowRevivers( (value as any)[WEBHOOK_RESPONSE_WRITABLE] = responseWritable; delete value.responseWritable; (value as any).respondWith = () => { - throw new Error( - '`respondWith()` must be called from within a step function' + throw new NotInStepContextError( + 'respondWith()', + 'dynamic webhook responses: https://useworkflow.dev/docs/foundations/hooks#dynamic-responses-manual-mode' ); }; } diff --git a/packages/core/src/sleep.ts b/packages/core/src/sleep.ts index 7d3e0d7ab8..054d0b3cfd 100644 --- a/packages/core/src/sleep.ts +++ b/packages/core/src/sleep.ts @@ -1,4 +1,5 @@ import type { StringValue } from 'ms'; +import { NotInWorkflowContextError } from './not-in-workflow-context-error.js'; import { WORKFLOW_SLEEP } from './symbols.js'; /** @@ -39,7 +40,10 @@ export async function sleep(param: StringValue | Date | number): Promise { // Inside the workflow VM, the sleep function is stored in the globalThis object behind a symbol const sleepFn = (globalThis as any)[WORKFLOW_SLEEP]; if (!sleepFn) { - throw new Error('`sleep()` can only be called inside a workflow function'); + throw new NotInWorkflowContextError( + 'sleep()', + `sleep(): https://useworkflow.dev/docs/api-reference/workflow/sleep` + ); } return sleepFn(param); } diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts index a0707ed0f0..f8530acbb2 100644 --- a/packages/core/src/step.ts +++ b/packages/core/src/step.ts @@ -3,6 +3,8 @@ import { withResolvers } from '@workflow/utils'; import { EventConsumerResult } from './events-consumer.js'; import { type StepInvocationQueueItem, WorkflowSuspension } from './global.js'; import { stepLogger } from './logger.js'; +import { ansifyStep } from './parse-name.js'; +import { Ansi } from '@workflow/errors'; import type { WorkflowOrchestratorContext } from './private.js'; import type { Serializable } from './schemas.js'; import { hydrateStepReturnValue } from './serialization.js'; @@ -80,7 +82,14 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { setTimeout(() => { reject( new WorkflowRuntimeError( - `Corrupted event log: step ${correlationId} (${stepName}) started but not found in invocation queue` + Ansi.frame( + `Corrupted event log: step ${correlationId} (${ansifyStep(stepName)}) started but not found in invocation queue`, + [ + Ansi.help( + `Inspect the events by running ${Ansi.code(`npx workflow inspect events --run=${ctx.workflowRunId}`)}` + ), + ] + ) ) ); }, 0); @@ -126,7 +135,13 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { setTimeout(() => { reject( new WorkflowRuntimeError( - `Unexpected event type: "${event.eventType}"` + Ansi.frame(`Unexpected event type: "${event.eventType}"`, [ + [ + 'This seems like a bug in the workflow runtime.', + 'Please report this issue at https://github.com/vercel/workflow', + `Include the output from ${Ansi.code(`npx workflow inspect events --run=${ctx.workflowRunId}`)}`, + ].join('\n'), + ]) ) ); }, 0); diff --git a/packages/core/src/step/get-closure-vars.ts b/packages/core/src/step/get-closure-vars.ts index a5d1d5193d..fcf7b99cec 100644 --- a/packages/core/src/step/get-closure-vars.ts +++ b/packages/core/src/step/get-closure-vars.ts @@ -1,3 +1,4 @@ +import { NotInStepContextError } from '../not-in-workflow-context-error.js'; import { contextStorage } from './context-storage.js'; /** @@ -10,8 +11,9 @@ import { contextStorage } from './context-storage.js'; export function __private_getClosureVars(): Record { const ctx = contextStorage.getStore(); if (!ctx) { - throw new Error( - 'Closure variables can only be accessed inside a step function' + throw new NotInStepContextError( + '[Closure variables]', + 'Step functions: https://useworkflow.dev/docs/foundations/workflows-and-steps#step-functions' ); } return ctx.closureVars || {}; diff --git a/packages/core/src/step/get-step-metadata.ts b/packages/core/src/step/get-step-metadata.ts index 9ecd4714ea..2170c0e430 100644 --- a/packages/core/src/step/get-step-metadata.ts +++ b/packages/core/src/step/get-step-metadata.ts @@ -1,3 +1,4 @@ +import { NotInStepContextError } from '../not-in-workflow-context-error.js'; import { contextStorage } from './context-storage.js'; export interface StepMetadata { @@ -42,8 +43,9 @@ export interface StepMetadata { export function getStepMetadata(): StepMetadata { const ctx = contextStorage.getStore(); if (!ctx) { - throw new Error( - '`getStepMetadata()` can only be called inside a step function' + throw new NotInStepContextError( + 'getStepMetadata()', + 'step metadata: https://useworkflow.dev/docs/api-reference/workflow/get-step-metadata' ); } return ctx.stepMetadata; diff --git a/packages/core/src/step/get-workflow-metadata.ts b/packages/core/src/step/get-workflow-metadata.ts index 59368a0570..4aa299ce24 100644 --- a/packages/core/src/step/get-workflow-metadata.ts +++ b/packages/core/src/step/get-workflow-metadata.ts @@ -1,3 +1,4 @@ +import { Ansi } from '@workflow/errors'; import type { WorkflowMetadata } from '../workflow/get-workflow-metadata.js'; import { contextStorage } from './context-storage.js'; @@ -10,7 +11,16 @@ export function getWorkflowMetadata(): WorkflowMetadata { const ctx = contextStorage.getStore(); if (!ctx) { throw new Error( - '`getWorkflowMetadata()` can only be called inside a workflow or step function' + Ansi.frame( + '`getWorkflowMetadata()` can only be called inside a workflow or step function', + [ + Ansi.help([ + 'This function comes from Workflow DevKit, and requires to be used as a part of a workflow or a step,', + 'As it has no meaning outside of the workflow context.', + 'Read more: https://useworkflow.dev/docs/api-reference/workflow/get-workflow-metadata', + ]), + ] + ) ); } return ctx.workflowMetadata; diff --git a/packages/core/src/step/writable-stream.ts b/packages/core/src/step/writable-stream.ts index 3dc80b68ca..388d48d659 100644 --- a/packages/core/src/step/writable-stream.ts +++ b/packages/core/src/step/writable-stream.ts @@ -5,6 +5,7 @@ import { } from '../serialization.js'; import { getWorkflowRunStreamId } from '../util.js'; import type { WorkflowWritableStreamOptions } from '../writable-stream.js'; +import { getWritable as throwError } from '../writable-stream.js'; import { contextStorage } from './context-storage.js'; export type { WorkflowWritableStreamOptions }; @@ -24,9 +25,7 @@ export function getWritable( ): WritableStream { const ctx = contextStorage.getStore(); if (!ctx) { - throw new Error( - '`getWritable()` can only be called inside a workflow or step function' - ); + return throwError(); } const { namespace } = options; diff --git a/packages/core/src/workflow.test.ts b/packages/core/src/workflow.test.ts index 6e7b537875..c998333f95 100644 --- a/packages/core/src/workflow.test.ts +++ b/packages/core/src/workflow.test.ts @@ -972,9 +972,15 @@ describe('runWorkflow', () => { workflowRun, events ) - ).rejects.toThrow( - 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions' - ); + ).rejects.toThrowErrorMatchingInlineSnapshot(` + [WorkflowRuntimeError: \`setTimeout\` is not available in a workflow context. + ├▶ Timer-based functions are not supported in workflow functions as they introduce non-deterministic behavior. + │ Read more: https://useworkflow.dev/err/timeout-in-workflow + ╰▶ help: use the \`sleep\` function from the \`workflow\` package for time-based delays. + The sleep function is a step function that can be awaited on and properly recorded in the workflow's event log. + + Learn more: https://useworkflow.dev/err/timeout-in-workflow] + `); }); it('should throw an error when calling setInterval', async () => { @@ -1001,9 +1007,15 @@ describe('runWorkflow', () => { workflowRun, events ) - ).rejects.toThrow( - 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions' - ); + ).rejects.toThrowErrorMatchingInlineSnapshot(` + [WorkflowRuntimeError: \`setInterval\` is not available in a workflow context. + ├▶ Timer-based functions are not supported in workflow functions as they introduce non-deterministic behavior. + │ Read more: https://useworkflow.dev/err/timeout-in-workflow + ╰▶ help: use the \`sleep\` function from the \`workflow\` package for time-based delays. + The sleep function is a step function that can be awaited on and properly recorded in the workflow's event log. + + Learn more: https://useworkflow.dev/err/timeout-in-workflow] + `); }); it('should throw an error when calling clearTimeout', async () => { @@ -1030,9 +1042,15 @@ describe('runWorkflow', () => { workflowRun, events ) - ).rejects.toThrow( - 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions' - ); + ).rejects.toThrowErrorMatchingInlineSnapshot(` + [WorkflowRuntimeError: \`clearTimeout\` is not available in a workflow context. + ├▶ Timer-based functions are not supported in workflow functions as they introduce non-deterministic behavior. + │ Read more: https://useworkflow.dev/err/timeout-in-workflow + ╰▶ help: use the \`sleep\` function from the \`workflow\` package for time-based delays. + The sleep function is a step function that can be awaited on and properly recorded in the workflow's event log. + + Learn more: https://useworkflow.dev/err/timeout-in-workflow] + `); }); it('should throw an error when calling clearInterval', async () => { @@ -1059,9 +1077,15 @@ describe('runWorkflow', () => { workflowRun, events ) - ).rejects.toThrow( - 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions' - ); + ).rejects.toThrowErrorMatchingInlineSnapshot(` + [WorkflowRuntimeError: \`clearInterval\` is not available in a workflow context. + ├▶ Timer-based functions are not supported in workflow functions as they introduce non-deterministic behavior. + │ Read more: https://useworkflow.dev/err/timeout-in-workflow + ╰▶ help: use the \`sleep\` function from the \`workflow\` package for time-based delays. + The sleep function is a step function that can be awaited on and properly recorded in the workflow's event log. + + Learn more: https://useworkflow.dev/err/timeout-in-workflow] + `); }); it('should throw an error when calling setImmediate', async () => { @@ -1088,9 +1112,15 @@ describe('runWorkflow', () => { workflowRun, events ) - ).rejects.toThrow( - 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions' - ); + ).rejects.toThrowErrorMatchingInlineSnapshot(` + [WorkflowRuntimeError: \`setImmediate\` is not available in a workflow context. + ├▶ Timer-based functions are not supported in workflow functions as they introduce non-deterministic behavior. + │ Read more: https://useworkflow.dev/err/timeout-in-workflow + ╰▶ help: use the \`sleep\` function from the \`workflow\` package for time-based delays. + The sleep function is a step function that can be awaited on and properly recorded in the workflow's event log. + + Learn more: https://useworkflow.dev/err/timeout-in-workflow] + `); }); it('should throw an error when calling clearImmediate', async () => { @@ -1117,9 +1147,15 @@ describe('runWorkflow', () => { workflowRun, events ) - ).rejects.toThrow( - 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions' - ); + ).rejects.toThrowErrorMatchingInlineSnapshot(` + [WorkflowRuntimeError: \`clearImmediate\` is not available in a workflow context. + ├▶ Timer-based functions are not supported in workflow functions as they introduce non-deterministic behavior. + │ Read more: https://useworkflow.dev/err/timeout-in-workflow + ╰▶ help: use the \`sleep\` function from the \`workflow\` package for time-based delays. + The sleep function is a step function that can be awaited on and properly recorded in the workflow's event log. + + Learn more: https://useworkflow.dev/err/timeout-in-workflow] + `); }); it('should include documentation link in error message', async () => { @@ -1151,12 +1187,15 @@ describe('runWorkflow', () => { error = err as Error; } assert(error); - expect(error.message).toContain( - 'https://useworkflow.dev/err/timeout-in-workflow' - ); - expect(error.message).toContain( - 'Use the "sleep" function from "workflow"' - ); + expect(error).toMatchInlineSnapshot(` + [WorkflowRuntimeError: \`setTimeout\` is not available in a workflow context. + ├▶ Timer-based functions are not supported in workflow functions as they introduce non-deterministic behavior. + │ Read more: https://useworkflow.dev/err/timeout-in-workflow + ╰▶ help: use the \`sleep\` function from the \`workflow\` package for time-based delays. + The sleep function is a step function that can be awaited on and properly recorded in the workflow's event log. + + Learn more: https://useworkflow.dev/err/timeout-in-workflow] + `); }); }); diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index 6ecfba7fd1..08dc695541 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -1,5 +1,5 @@ import { runInContext } from 'node:vm'; -import { ERROR_SLUGS, WorkflowRuntimeError } from '@workflow/errors'; +import { Ansi, ERROR_SLUGS, WorkflowRuntimeError } from '@workflow/errors'; import { withResolvers } from '@workflow/utils'; import { getPort } from '@workflow/utils/get-port'; import type { Event, WorkflowRun } from '@workflow/world'; @@ -72,6 +72,7 @@ export async function runWorkflow( ); const workflowContext: WorkflowOrchestratorContext = { + workflowRunId: workflowRun.runId, globalThis: vmGlobalThis, onWorkflowError: workflowDiscontinuation.reject, eventsConsumer: new EventsConsumer(events), @@ -113,6 +114,7 @@ export async function runWorkflow( // For the workflow VM, we store the context in a symbol on the `globalThis` object const ctx: WorkflowMetadata = { workflowRunId: workflowRun.runId, + workflowName: workflowRun.workflowName, workflowStartedAt: new vmGlobalThis.Date(+startedAt), url, }; @@ -130,42 +132,13 @@ export async function runWorkflow( ); }; - // Override timeout/interval functions to throw helpful errors - // These are not supported in workflow functions because they rely on - // asynchronous scheduling which breaks deterministic replay - const timeoutErrorMessage = - 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions. Use the "sleep" function from "workflow" for time-based delays.'; - - (vmGlobalThis as any).setTimeout = () => { - throw new WorkflowRuntimeError(timeoutErrorMessage, { - slug: ERROR_SLUGS.TIMEOUT_FUNCTIONS_IN_WORKFLOW, - }); - }; - (vmGlobalThis as any).setInterval = () => { - throw new WorkflowRuntimeError(timeoutErrorMessage, { - slug: ERROR_SLUGS.TIMEOUT_FUNCTIONS_IN_WORKFLOW, - }); - }; - (vmGlobalThis as any).clearTimeout = () => { - throw new WorkflowRuntimeError(timeoutErrorMessage, { - slug: ERROR_SLUGS.TIMEOUT_FUNCTIONS_IN_WORKFLOW, - }); - }; - (vmGlobalThis as any).clearInterval = () => { - throw new WorkflowRuntimeError(timeoutErrorMessage, { - slug: ERROR_SLUGS.TIMEOUT_FUNCTIONS_IN_WORKFLOW, - }); - }; - (vmGlobalThis as any).setImmediate = () => { - throw new WorkflowRuntimeError(timeoutErrorMessage, { - slug: ERROR_SLUGS.TIMEOUT_FUNCTIONS_IN_WORKFLOW, - }); - }; - (vmGlobalThis as any).clearImmediate = () => { - throw new WorkflowRuntimeError(timeoutErrorMessage, { - slug: ERROR_SLUGS.TIMEOUT_FUNCTIONS_IN_WORKFLOW, - }); - }; + (vmGlobalThis as any).setTimeout = timeoutFunctionError('setTimeout'); + (vmGlobalThis as any).setInterval = timeoutFunctionError('setInterval'); + (vmGlobalThis as any).clearTimeout = timeoutFunctionError('clearTimeout'); + (vmGlobalThis as any).clearInterval = timeoutFunctionError('clearInterval'); + (vmGlobalThis as any).setImmediate = timeoutFunctionError('setImmediate'); + (vmGlobalThis as any).clearImmediate = + timeoutFunctionError('clearImmediate'); // `Request` and `Response` are special built-in classes that invoke steps // for the `json()`, `text()` and `arrayBuffer()` instance methods @@ -350,7 +323,7 @@ export async function runWorkflow( } clone(): Request { - ENOTSUP(); + ENOTSUP('Request.clone'); } get bodyUsed() { @@ -467,7 +440,7 @@ export async function runWorkflow( } static error(): Response { - ENOTSUP(); + ENOTSUP(`Response.error`); } static redirect(url: string | URL, status: number = 302): Response { @@ -498,7 +471,7 @@ export async function runWorkflow( class ReadableStream implements globalThis.ReadableStream { constructor() { - ENOTSUP(); + ENOTSUP(`new ReadableStream`); } get locked() { @@ -506,42 +479,42 @@ export async function runWorkflow( } cancel(): any { - ENOTSUP(); + ENOTSUP(`ReadableStream.cancel`); } getReader(): any { - ENOTSUP(); + ENOTSUP(`ReadableStream.getReader`); } pipeThrough(): any { - ENOTSUP(); + ENOTSUP(`ReadableStream.pipeThrough`); } pipeTo(): any { - ENOTSUP(); + ENOTSUP(`ReadableStream.pipeTo`); } tee(): any { - ENOTSUP(); + ENOTSUP(`ReadableStream.tee`); } values(): any { - ENOTSUP(); + ENOTSUP(`ReadableStream.values`); } static from(): any { - ENOTSUP(); + ENOTSUP(`ReadableStream.from`); } [Symbol.asyncIterator](): any { - ENOTSUP(); + ENOTSUP(`ReadableStream[Symbol.asyncIterator]`); } } vmGlobalThis.ReadableStream = ReadableStream; class WritableStream implements globalThis.WritableStream { constructor() { - ENOTSUP(); + ENOTSUP(`new WritableStream`); } get locked() { @@ -549,15 +522,15 @@ export async function runWorkflow( } abort(): any { - ENOTSUP(); + ENOTSUP(`WritableStream.abort`); } close(): any { - ENOTSUP(); + ENOTSUP(`WritableStream.close`); } getWriter(): any { - ENOTSUP(); + ENOTSUP(`WritableStream.getWriter`); } } vmGlobalThis.WritableStream = WritableStream; @@ -567,7 +540,7 @@ export async function runWorkflow( writable: globalThis.WritableStream; constructor() { - ENOTSUP(); + ENOTSUP(`new TransformStream`); } } vmGlobalThis.TransformStream = TransformStream; @@ -624,3 +597,22 @@ export async function runWorkflow( return dehydrated; }); } + +/** + * Override timeout/interval functions to throw helpful errors + * These are not supported in workflow functions because they rely on + * asynchronous scheduling which breaks deterministic replay + */ +export const timeoutFunctionError = (fnName: string) => () => { + throw new WorkflowRuntimeError( + Ansi.frame(`${Ansi.code(fnName)} is not available in a workflow context.`, [ + `Timer-based functions are not supported in workflow functions as they introduce non-deterministic behavior.\n` + + `Read more: https://useworkflow.dev/err/${ERROR_SLUGS.TIMEOUT_FUNCTIONS_IN_WORKFLOW}`, + Ansi.help([ + `use the ${Ansi.code('sleep')} function from the ${Ansi.code('workflow')} package for time-based delays.`, + "The sleep function is a step function that can be awaited on and properly recorded in the workflow's event log.", + ]), + ]), + { slug: ERROR_SLUGS.TIMEOUT_FUNCTIONS_IN_WORKFLOW } + ); +}; diff --git a/packages/core/src/workflow/create-hook.ts b/packages/core/src/workflow/create-hook.ts index 27e5ae698a..04ce354d58 100644 --- a/packages/core/src/workflow/create-hook.ts +++ b/packages/core/src/workflow/create-hook.ts @@ -5,6 +5,7 @@ import type { Webhook, WebhookOptions, } from '../create-hook.js'; +import { createHook as throwHookError } from '../create-hook.js'; import { WORKFLOW_CREATE_HOOK } from '../symbols.js'; import { getWorkflowMetadata } from './get-workflow-metadata.js'; @@ -14,9 +15,7 @@ export function createHook(options?: HookOptions): Hook { WORKFLOW_CREATE_HOOK ] as typeof createHook; if (!createHookFn) { - throw new Error( - '`createHook()` can only be called inside a workflow function' - ); + return throwHookError(options); } return createHookFn(options); } diff --git a/packages/core/src/workflow/get-workflow-metadata.test.ts b/packages/core/src/workflow/get-workflow-metadata.test.ts new file mode 100644 index 0000000000..55dfc7f667 --- /dev/null +++ b/packages/core/src/workflow/get-workflow-metadata.test.ts @@ -0,0 +1,13 @@ +import { expect, test, vi } from 'vitest'; +import { getWorkflowMetadata } from './get-workflow-metadata.js'; + +vi.mock('chalk'); + +test('throws an error if not in a workflow context', () => { + expect(() => getWorkflowMetadata()).toThrowErrorMatchingInlineSnapshot(` + [Error: \`getWorkflowMetadata()\` can only be called inside a workflow or step function + ╰▶ help: This function comes from Workflow DevKit, and requires to be used as a part of a workflow or a step, + As it has no meaning outside of the workflow context. + Read more: https://useworkflow.dev/docs/api-reference/workflow/get-workflow-metadata] + `); +}); diff --git a/packages/core/src/workflow/get-workflow-metadata.ts b/packages/core/src/workflow/get-workflow-metadata.ts index 0aa70f331e..2481dc971b 100644 --- a/packages/core/src/workflow/get-workflow-metadata.ts +++ b/packages/core/src/workflow/get-workflow-metadata.ts @@ -1,3 +1,5 @@ +import { Ansi } from '@workflow/errors'; + export interface WorkflowMetadata { /** * Unique identifier for the workflow run. @@ -13,6 +15,11 @@ export interface WorkflowMetadata { * The URL where the workflow can be triggered. */ url: string; + + /* + * The name of the workflow. + */ + workflowName: string; } export const WORKFLOW_CONTEXT_SYMBOL = @@ -23,7 +30,16 @@ export function getWorkflowMetadata(): WorkflowMetadata { const ctx = (globalThis as any)[WORKFLOW_CONTEXT_SYMBOL] as WorkflowMetadata; if (!ctx) { throw new Error( - '`getWorkflowMetadata()` can only be called inside a workflow or step function' + Ansi.frame( + '`getWorkflowMetadata()` can only be called inside a workflow or step function', + [ + Ansi.help([ + 'This function comes from Workflow DevKit, and requires to be used as a part of a workflow or a step,', + 'As it has no meaning outside of the workflow context.', + 'Read more: https://useworkflow.dev/docs/api-reference/workflow/get-workflow-metadata', + ]), + ] + ) ); } return ctx; diff --git a/packages/core/src/workflow/index.ts b/packages/core/src/workflow/index.ts index 61cc317491..35a227c5f2 100644 --- a/packages/core/src/workflow/index.ts +++ b/packages/core/src/workflow/index.ts @@ -1,3 +1,7 @@ +import { + NotInStepContextError, + UnavailableInWorkflowContextError, +} from '../not-in-workflow-context-error.js'; import type { StepMetadata } from '../step/get-step-metadata.js'; export { @@ -15,12 +19,14 @@ export { getWritable } from './writable-stream.js'; // workflows can't use these functions, but we still need to provide // the export so bundling doesn't fail when step and workflow are in same file export function getStepMetadata(): StepMetadata { - throw new Error( - '`getStepMetadata()` can only be called inside a step function' + throw new NotInStepContextError( + 'getStepMetadata()', + `getStepMetadata(): https://useworkflow.dev/docs/api-reference/workflow/get-step-metadata` ); } export function resumeHook() { - throw new Error( - '`resumeHook()` can only be called from outside a workflow function' + throw new UnavailableInWorkflowContextError( + `resumeHook()`, + 'resuming hooks: https://useworkflow.dev/docs/foundations/hooks#resuming-a-hook' ); } diff --git a/packages/core/src/writable-stream.ts b/packages/core/src/writable-stream.ts index 5d32e1a404..63758431a7 100644 --- a/packages/core/src/writable-stream.ts +++ b/packages/core/src/writable-stream.ts @@ -1,3 +1,5 @@ +import { Ansi } from '@workflow/errors'; + /** * The options for {@link getWritable}. */ @@ -25,6 +27,15 @@ export function getWritable( options: WorkflowWritableStreamOptions = {} ): WritableStream { throw new Error( - '`getWritable()` can only be called inside a workflow or step function' + Ansi.frame( + '`getWritable()` can only be called inside a workflow or step function', + [ + Ansi.help([ + 'This function comes from Workflow DevKit, and requires to be used as a part of a workflow or a step,', + 'As it has no meaning outside of the workflow context.', + 'Read more about the writable workflow stream: https://useworkflow.dev/docs/api-reference/workflow/get-writable', + ]), + ] + ) ); } diff --git a/packages/errors/package.json b/packages/errors/package.json index 7b311ae4ea..13b5384b6e 100644 --- a/packages/errors/package.json +++ b/packages/errors/package.json @@ -24,6 +24,7 @@ }, "scripts": { "build": "tsc", + "test": "vitest", "dev": "tsc --watch", "clean": "tsc --build --clean && rm -rf dist", "typecheck": "tsc --noEmit" @@ -32,10 +33,12 @@ "@types/ms": "2.1.0", "@types/node": "catalog:", "@workflow/tsconfig": "workspace:*", - "@workflow/world": "workspace:*" + "@workflow/world": "workspace:*", + "vitest": "catalog:" }, "dependencies": { "@workflow/utils": "workspace:*", + "chalk": "catalog:", "ms": "2.1.3" } } diff --git a/packages/errors/src/ansi.test.ts b/packages/errors/src/ansi.test.ts new file mode 100644 index 0000000000..814b605338 --- /dev/null +++ b/packages/errors/src/ansi.test.ts @@ -0,0 +1,126 @@ +import { Chalk } from 'chalk'; +import { describe, expect, test } from 'vitest'; +import { frame, inline } from './ansi.js'; + +test('frames', () => { + const output = frame('text text text text\ntext text text text', [ + 'contents0 contents0 contents0\ncontents0 contents0 contents0', + 'contents1 contents1 contents1\ncontents1 contents1 contents1', + ]); + + expect(`\n${output}\n`).toMatchInlineSnapshot(` + " + text text text text + text text text text + ├▶ contents0 contents0 contents0 + │ contents0 contents0 contents0 + ╰▶ contents1 contents1 contents1 + contents1 contents1 contents1 + " + `); +}); + +test('composable', () => { + const output = frame('text text text text\ntext text text text', [ + frame('whatever\nwhenever', ['inner0\ninner0']), + frame('whatever2\nwhenever2', ['inner1\ninner1']), + ]); + expect(`\n${output}\n`).toMatchInlineSnapshot(` + " + text text text text + text text text text + ├▶ whatever + │ whenever + │ ╰▶ inner0 + │ inner0 + ╰▶ whatever2 + whenever2 + ╰▶ inner1 + inner1 + " + `); +}); + +describe('inline', () => { + test('single odd-length explanation', () => { + const value = inline`function ${{ text: 'hello', explain: 'name not allowed bro' }}() {\n return 666\n}`; + expect(value).toEqual( + ` +function hello() { + ──┬── + ╰▶ name not allowed bro + return 666 +} +`.trim() + ); + }); + + test('single even-length explanation', () => { + const value = inline`function ${{ text: 'name', explain: 'name not allowed bro' }}() {\n return 666\n}`; + expect(value).toEqual( + ` +function name() { + ──┬─ + ╰▶ name not allowed bro + return 666 +} +`.trim() + ); + }); + + test('two explanations', () => { + const value = inline`function ${{ text: 'name', explain: 'name not allowed bro' }}(${{ text: 'arg', explain: 'unused' }}) {\n return 666\n}`; + expect(value).toEqual( + ` +function name(arg) { + ──┬─ ─┬─ + ╰───┼─▶ name not allowed bro + ╰─▶ unused + return 666 +} +`.trim() + ); + }); + + test('three explanations', () => { + const value = inline` +${['fun', 'nothing fun about it']}ction ${{ text: 'name', explain: 'name not allowed bro' }}(${{ text: 'arg', explain: 'unused' }}) { + return 666 +}`; + expect(value).toEqual( + ` +function name(arg) { +─┬─ ──┬─ ─┬─ + ╰─────────┼───┼─▶ nothing fun about it + ╰───┼─▶ name not allowed bro + ╰─▶ unused + return 666 +}` + ); + }); + + test('colored explanations', () => { + const red = (s: string) => `${s}`; + const green = (s: string) => `${s}`; + const value = inline` +function ${['name', 'name not allowed bro', { color: green }]}(${['arg', 'unused', { color: red }]}) { + return 666 +}`; + + const chalk = new Chalk({ level: 3 }); + console.log(inline` +function ${['name', 'name not allowed bro', { color: chalk.green }]}(${['arg', 'unused', { color: chalk.red }]}) { + return 666 +}`); + + expect(value).toEqual( + ` +function name(arg) { + ──┬─ ─┬─ + ╰───┼─▶ name not allowed bro + ╰─▶ unused + return 666 +}` + ); + }); +}); diff --git a/packages/errors/src/ansi.ts b/packages/errors/src/ansi.ts new file mode 100644 index 0000000000..cf25c61a9b1 --- /dev/null +++ b/packages/errors/src/ansi.ts @@ -0,0 +1,201 @@ +import chalk from 'chalk'; + +const styles = { + info: chalk.blue, + help: chalk.cyan, + warn: chalk.yellow, + error: chalk.red, +}; + +export function help(messages: string | string[]) { + const message = Array.isArray(messages) ? messages.join('\n') : messages; + return styles.help(`${chalk.bold('help:')} ${message}`); +} + +export function hint(messages: string | string[]) { + const message = Array.isArray(messages) ? messages.join('\n') : messages; + return styles.info(`${chalk.bold('hint:')} ${message}`); +} + +export function note(messages: string | string[]) { + const message = Array.isArray(messages) ? messages.join('\n') : messages; + return styles.info(`${chalk.bold('note:')} ${message}`); +} + +export function code(str: string) { + return chalk.italic(`${chalk.dim('`')}${str}${chalk.dim('`')}`); +} + +export function frame(text: string, contents: string[]): string { + const result = [text]; + + contents.forEach((content, index) => { + const lines = content.split('\n'); + const isLastContent = index === contents.length - 1; + + const firstLinePrefix = isLastContent ? '╰▶ ' : '├▶ '; + const continuationPrefix = isLastContent ? ' ' : '│ '; + + const framedLines = lines.map((line, lineIndex) => { + const prefix = lineIndex === 0 ? firstLinePrefix : continuationPrefix; + return `${prefix}${line}`; + }); + + result.push(...framedLines); + }); + + return result.join('\n'); +} + +interface Explain { + text: string; + explain: string; + /** adds ansi coloring */ + color?: (s: string) => string; +} + +type Explainish = + | Explain + | [text: string, explain: string, opts?: { color: Explain['color'] }]; + +type Marker = { + startCol: number; + endCol: number; + explain: string; + color?: (s: string) => string; +}; + +const identity = (s: string) => s; + +function getMarkerMidpoint(marker: Marker): number { + const textLen = marker.endCol - marker.startCol; + return marker.startCol + Math.floor(textLen / 2); +} + +function buildUnderline(markers: Marker[]): string { + const parts: string[] = []; + let pos = 0; + for (const marker of markers) { + const textLen = marker.endCol - marker.startCol; + const midPoint = Math.floor(textLen / 2); + + if (marker.startCol > pos) { + parts.push(' '.repeat(marker.startCol - pos)); + pos = marker.startCol; + } + const segment = `${'─'.repeat(midPoint)}┬${'─'.repeat(textLen - midPoint - 1)}`; + const colorFn = marker.color ?? identity; + parts.push(colorFn(segment)); + pos += textLen; + } + return parts.join(''); +} + +function buildExplanationLine( + marker: Marker, + midCol: number, + remainingMids: number[], + isOnlyMarker: boolean +): string { + let line = '╰'; + let pos = midCol + 1; + + for (const nextMid of remainingMids) { + while (pos < nextMid) { + line += '─'; + pos++; + } + line += '┼'; + pos++; + } + + const arrow = isOnlyMarker ? '▶ ' : '─▶ '; + line += arrow + marker.explain; + + const colorFn = marker.color ?? identity; + return ' '.repeat(midCol) + colorFn(line); +} + +/** + * @example + * inline`function ${{text: "hello", explain: "name not allowed bro"}}() {\n return 666\n}`; + * => + * function hello() { + * ──┬── + * ╰▶ name not allowed bro + * return 666 + * } + */ +export function inline( + text: TemplateStringsArray, + ...values: Explainish[] +): string { + const resultLines: string[] = []; + let currentLine = ''; + let currentLineVisualLen = 0; + let pendingMarkers: Marker[] = []; + + const flushLine = () => { + resultLines.push(currentLine); + if (pendingMarkers.length === 0) { + currentLine = ''; + currentLineVisualLen = 0; + return; + } + + const markerMids = pendingMarkers.map(getMarkerMidpoint); + + resultLines.push(buildUnderline(pendingMarkers)); + + for (let i = 0; i < pendingMarkers.length; i++) { + const line = buildExplanationLine( + pendingMarkers[i], + markerMids[i], + markerMids.slice(i + 1), + pendingMarkers.length === 1 + ); + resultLines.push(line); + } + + pendingMarkers = []; + currentLine = ''; + currentLineVisualLen = 0; + }; + + for (let i = 0; i < text.length; i++) { + const segment = text[i]; + const lines = segment.split('\n'); + + for (let j = 0; j < lines.length; j++) { + if (j > 0) { + flushLine(); + } + currentLine += lines[j]; + currentLineVisualLen += lines[j].length; + } + + if (i < values.length) { + const val = values[i]; + const value: Explain = !Array.isArray(val) + ? val + : { text: val[0], explain: val[1], ...val[2] }; + const startCol = currentLineVisualLen; + const colorFn = value.color ?? ((s: string) => s); + currentLine += colorFn(value.text); + currentLineVisualLen += value.text.length; + const endCol = currentLineVisualLen; + pendingMarkers.push({ + startCol, + endCol, + explain: value.explain, + color: value.color, + }); + } + } + + if (currentLine || pendingMarkers.length > 0) { + flushLine(); + } + + return resultLines.join('\n'); +} diff --git a/packages/errors/src/index.ts b/packages/errors/src/index.ts index 071fc359d1..c2839a3395 100644 --- a/packages/errors/src/index.ts +++ b/packages/errors/src/index.ts @@ -2,6 +2,8 @@ import { parseDurationToDate } from '@workflow/utils'; import type { StructuredError } from '@workflow/world'; import type { StringValue } from 'ms'; +export * as Ansi from './ansi.js'; + const BASE_URL = 'https://useworkflow.dev/err'; /** diff --git a/packages/world-local/package.json b/packages/world-local/package.json index ff83cf4080..388afeb24c 100644 --- a/packages/world-local/package.json +++ b/packages/world-local/package.json @@ -35,6 +35,7 @@ "@workflow/utils": "workspace:*", "@workflow/world": "workspace:*", "async-sema": "3.1.1", + "chalk": "catalog:", "ulid": "3.0.1", "undici": "6.22.0", "zod": "catalog:" diff --git a/packages/world-local/src/config.test.ts b/packages/world-local/src/config.test.ts index 3bc9a5b779..0569affe3b 100644 --- a/packages/world-local/src/config.test.ts +++ b/packages/world-local/src/config.test.ts @@ -74,7 +74,7 @@ describe('resolveBaseUrl', () => { delete process.env.PORT; await expect(resolveBaseUrl({})).rejects.toThrow( - 'Unable to resolve base URL for workflow queue.' + 'Unable to resolve base URL for workflow queue' ); expect(getWorkflowPort).toHaveBeenCalled(); }); @@ -187,9 +187,18 @@ describe('resolveBaseUrl', () => { vi.mocked(getWorkflowPort).mockResolvedValue(undefined); delete process.env.PORT; - await expect(resolveBaseUrl({})).rejects.toThrow( - 'Unable to resolve base URL for workflow queue.' - ); + await expect( + resolveBaseUrl({}) + ).rejects.toThrowErrorMatchingInlineSnapshot(` + [Error: Unable to resolve base URL for workflow queue + ├▶ The local world works by making HTTP calls to the .well-known/workflow endpoints[1]. + │ Therefore, it needs to have a base URL to connect to the local server. + ├▶ note: we tried inferring the running port but failed. + ├▶ help: fix by setting one of the following environment variables: + │ • \`PORT\` to use \`http://localhost:PORT\` + │ • \`WORKFLOW_LOCAL_BASE_URL\` as a full URL + ╰▶ [1]: Read more about .well-known endpoints: https://useworkflow.dev/docs/how-it-works/framework-integrations#understanding-the-endpoints] + `); }); }); @@ -235,9 +244,18 @@ describe('resolveBaseUrl', () => { vi.mocked(getWorkflowPort).mockResolvedValue(undefined); delete process.env.PORT; - await expect(resolveBaseUrl({})).rejects.toThrow( - 'Unable to resolve base URL for workflow queue.' - ); + await expect( + resolveBaseUrl({}) + ).rejects.toThrowErrorMatchingInlineSnapshot(` + [Error: Unable to resolve base URL for workflow queue + ├▶ The local world works by making HTTP calls to the .well-known/workflow endpoints[1]. + │ Therefore, it needs to have a base URL to connect to the local server. + ├▶ note: we tried inferring the running port but failed. + ├▶ help: fix by setting one of the following environment variables: + │ • \`PORT\` to use \`http://localhost:PORT\` + │ • \`WORKFLOW_LOCAL_BASE_URL\` as a full URL + ╰▶ [1]: Read more about .well-known endpoints: https://useworkflow.dev/docs/how-it-works/framework-integrations#understanding-the-endpoints] + `); }); it('should throw error when all resolution methods fail', async () => { @@ -245,9 +263,18 @@ describe('resolveBaseUrl', () => { vi.mocked(getWorkflowPort).mockResolvedValue(undefined); delete process.env.PORT; - await expect(resolveBaseUrl({})).rejects.toThrow( - 'Unable to resolve base URL for workflow queue.' - ); + await expect( + resolveBaseUrl({}) + ).rejects.toThrowErrorMatchingInlineSnapshot(` + [Error: Unable to resolve base URL for workflow queue + ├▶ The local world works by making HTTP calls to the .well-known/workflow endpoints[1]. + │ Therefore, it needs to have a base URL to connect to the local server. + ├▶ note: we tried inferring the running port but failed. + ├▶ help: fix by setting one of the following environment variables: + │ • \`PORT\` to use \`http://localhost:PORT\` + │ • \`WORKFLOW_LOCAL_BASE_URL\` as a full URL + ╰▶ [1]: Read more about .well-known endpoints: https://useworkflow.dev/docs/how-it-works/framework-integrations#understanding-the-endpoints] + `); }); it('should handle config with only dataDir and use PORT env var', async () => { @@ -281,8 +308,19 @@ describe('resolveBaseUrl', () => { vi.mocked(getWorkflowPort).mockResolvedValue(undefined); delete process.env.PORT; - await expect(resolveBaseUrl({})).rejects.toThrow( - 'Unable to resolve base URL for workflow queue.' + await expect( + resolveBaseUrl({}) + ).rejects.toThrowErrorMatchingInlineSnapshot( + ` + [Error: Unable to resolve base URL for workflow queue + ├▶ The local world works by making HTTP calls to the .well-known/workflow endpoints[1]. + │ Therefore, it needs to have a base URL to connect to the local server. + ├▶ note: we tried inferring the running port but failed. + ├▶ help: fix by setting one of the following environment variables: + │ • \`PORT\` to use \`http://localhost:PORT\` + │ • \`WORKFLOW_LOCAL_BASE_URL\` as a full URL + ╰▶ [1]: Read more about .well-known endpoints: https://useworkflow.dev/docs/how-it-works/framework-integrations#understanding-the-endpoints] + ` ); }); }); diff --git a/packages/world-local/src/config.ts b/packages/world-local/src/config.ts index dfd4e15574..f99727a683 100644 --- a/packages/world-local/src/config.ts +++ b/packages/world-local/src/config.ts @@ -1,3 +1,4 @@ +import { Ansi } from '@workflow/errors'; import { getWorkflowPort } from '@workflow/utils/get-port'; import { once } from './util.js'; @@ -49,5 +50,17 @@ export async function resolveBaseUrl(config: Partial): Promise { return `http://localhost:${detectedPort}`; } - throw new Error('Unable to resolve base URL for workflow queue.'); + throw new Error( + Ansi.frame(`Unable to resolve base URL for workflow queue`, [ + `The local world works by making HTTP calls to the .well-known/workflow endpoints[1].\n` + + 'Therefore, it needs to have a base URL to connect to the local server.', + Ansi.note('we tried inferring the running port but failed.'), + Ansi.help([ + `fix by setting one of the following environment variables:`, + `• ${Ansi.code('PORT')} to use ${Ansi.code('http://localhost:PORT')}`, + `• ${Ansi.code('WORKFLOW_LOCAL_BASE_URL')} as a full URL`, + ]), + '[1]: Read more about .well-known endpoints: https://useworkflow.dev/docs/how-it-works/framework-integrations#understanding-the-endpoints', + ]) + ); } diff --git a/packages/world-local/src/init.ts b/packages/world-local/src/init.ts index 96a32c0c0f..f3249a502e 100644 --- a/packages/world-local/src/init.ts +++ b/packages/world-local/src/init.ts @@ -7,6 +7,9 @@ import { writeFileSync, } from 'node:fs'; import path from 'node:path'; +import { inspect } from 'node:util'; +import { Ansi } from '@workflow/errors'; +import * as Logger from './logger.js'; /** Package name for version tracking */ export const PACKAGE_NAME = '@workflow/world-local'; @@ -302,13 +305,24 @@ export function initDataDir(dataDir: string): void { suggestedVersion ); - console.error( - `[world-local] Failed to upgrade data directory from version ${formatVersion(oldVersion)} to ${formatVersion(currentVersion)}:`, - error instanceof Error ? error.message : error - ); - console.error( - `[world-local] Data is not compatible with the current version. ` + - `Please downgrade to ${PACKAGE_NAME}@${downgradeTarget}` + Logger.write( + 'error', + Ansi.frame( + `Failed to upgrade data directory from version ${formatVersion(oldVersion)} to ${formatVersion(currentVersion)}.`, + [ + `Data is incompatible with the current version: ${error instanceof Error ? error.message : inspect(error)}` + + '\n' + + Ansi.help( + process.env.NODE_ENV === 'production' + ? `Deleting the persistence directory at ${Ansi.code(dataDir)} will allow you to upgrade this version cleanly.` + : `Restarting a development server will resolve this issue by truncating the data.` + ) + + '\n' + + Ansi.hint( + `you can downgrade to fix this issue: ${Ansi.code(`npm i ${PACKAGE_NAME}@${downgradeTarget}`)}` + ), + ] + ) ); throw error; diff --git a/packages/world-local/src/logger.ts b/packages/world-local/src/logger.ts new file mode 100644 index 0000000000..da700c1eb1 --- /dev/null +++ b/packages/world-local/src/logger.ts @@ -0,0 +1,15 @@ +import chalk from 'chalk'; + +const styles = { + info: chalk.blue, + help: chalk.cyan, + warn: chalk.yellow, + error: chalk.red, +}; + +const prefix = chalk.dim(`[world-local] `); + +export function write(level: keyof typeof styles, message: string | string[]) { + const text = Array.isArray(message) ? message.join('\n') : message; + console.error(styles[level](prefix + text)); +} diff --git a/packages/world-local/src/queue.ts b/packages/world-local/src/queue.ts index d52ffc4e65..d821e46bdc 100644 --- a/packages/world-local/src/queue.ts +++ b/packages/world-local/src/queue.ts @@ -1,13 +1,21 @@ import { setTimeout } from 'node:timers/promises'; import { JsonTransport } from '@vercel/queue'; -import { MessageId, type Queue, ValidQueueName } from '@workflow/world'; +import { Ansi } from '@workflow/errors'; +import { + MessageId, + type Queue, + QueuePrefix, + ValidQueueName, +} from '@workflow/world'; import { Sema } from 'async-sema'; +import chalk from 'chalk'; import { monotonicFactory } from 'ulid'; import { Agent } from 'undici'; import z from 'zod'; import type { Config } from './config.js'; import { resolveBaseUrl } from './config.js'; import { PACKAGE_VERSION } from './init.js'; +import * as Logger from './logger.js'; // For local queue, there is no technical limit on the message visibility lifespan, // but the environment variable can be used for testing purposes to set a max visibility limit. @@ -61,7 +69,9 @@ export function createQueue(config: Partial): Queue { } else if (queueName.startsWith('__wkf_workflow_')) { pathname = `flow`; } else { - throw new Error('Unknown queue name prefix'); + throw new Error( + `Unknown queue name prefix. Valid prefixes are ${QueuePrefix.options.map((x) => x.value).join(', ')}` + ); } const messageId = MessageId.parse(`msg_${generateId()}`); @@ -118,17 +128,19 @@ export function createQueue(config: Partial): Queue { } catch {} } - console.error(`[local world] Failed to queue message`, { + writeFailedExecutionMessage({ + willRetry: defaultRetriesLeft > 0, + response, queueName, - text, - status: response.status, - headers: Object.fromEntries(response.headers.entries()), - body: body.toString(), + body, + responseText: text, }); } console.error( - `[local world] Reached max retries of local world queue implementation` + chalk.red( + `[local world] ${chalk.bold('fatal:')} Reached max retries of local world queue implementation` + ) ); } finally { semaphore.release(); @@ -152,36 +164,75 @@ export function createQueue(config: Partial): Queue { return { messageId }; }; - const HeaderParser = z.object({ - 'x-vqs-queue-name': ValidQueueName, - 'x-vqs-message-id': MessageId, - 'x-vqs-message-attempt': z.coerce.number(), - }); + const HeaderParser = z + .object({ + 'x-vqs-queue-name': ValidQueueName, + 'x-vqs-message-id': MessageId, + 'x-vqs-message-attempt': z.coerce.number(), + }) + .transform((data) => ({ + queueName: data['x-vqs-queue-name'], + messageId: data['x-vqs-message-id'], + attempt: data['x-vqs-message-attempt'], + })); + + async function parseRequest( + req: Request + ): Promise< + | [ + headers: z.infer, + bodyStream: ReadableStream, + null, + ] + | [null, null, Response] + > { + const headers = HeaderParser.safeParse(Object.fromEntries(req.headers)); + + if (!headers.success) { + return [ + null, + null, + Response.json( + { + error: 'Invalid or missing headers', + details: z.treeifyError(headers.error), + }, + { status: 400 } + ), + ] as const; + } + + if (!req.body) { + return [ + null, + null, + Response.json({ error: 'Missing request body' }, { status: 400 }), + ] as const; + } + + return [headers.data, req.body, null] as const; + } const createQueueHandler: Queue['createQueueHandler'] = (prefix, handler) => { return async (req) => { - const headers = HeaderParser.safeParse(Object.fromEntries(req.headers)); + const [headers, bodyStream, parseError] = await parseRequest(req); + if (parseError) return parseError; + const { queueName, messageId, attempt } = headers; - if (!headers.success || !req.body) { + if (!queueName.startsWith(prefix)) { return Response.json( { - error: !req.body - ? 'Missing request body' - : 'Missing required headers', + error: 'Mismatched queue prefix', + details: { + requestedQueue: queueName, + configuredPrefix: prefix, + }, }, { status: 400 } ); } - const queueName = headers.data['x-vqs-queue-name']; - const messageId = headers.data['x-vqs-message-id']; - const attempt = headers.data['x-vqs-message-attempt']; - - if (!queueName.startsWith(prefix)) { - return Response.json({ error: 'Unhandled queue' }, { status: 400 }); - } - - const body = await new JsonTransport().deserialize(req.body); + const body = await new JsonTransport().deserialize(bodyStream); try { const result = await handler(body, { attempt, queueName, messageId }); @@ -199,7 +250,7 @@ export function createQueue(config: Partial): Queue { return Response.json({ ok: true }); } catch (error) { - return Response.json(String(error), { status: 500 }); + return new Response(String(error), { status: 500 }); } }; }; @@ -210,3 +261,49 @@ export function createQueue(config: Partial): Queue { return { queue, createQueueHandler, getDeploymentId }; } + +export function writeFailedExecutionMessage({ + queueName, + response, + body, + responseText, + willRetry, +}: { + queueName: string; + willRetry: boolean; + response: Response; + body: Buffer; + responseText: string; +}) { + const level = willRetry ? 'warn' : 'error'; + Logger.write( + level, + Ansi.frame( + `${chalk.bold(`${level}:`)} failed to execute ${!willRetry ? '' : chalk.italic(' and will retry')}.`, + [ + responseText || 'No reason provided.', + ...(willRetry + ? [] + : [chalk.italic('This message failed and will not be retried.')]), + ...(process.env.WORKFLOW_LOCAL_WORLD_DEBUG_REQUEST_ERRORS !== '1' + ? [] + : [ + chalk.reset( + [ + `queue name: ${queueName}`, + `response status: ${response.status}`, + Ansi.frame( + 'headers:', + Array.from( + response.headers, + ([key, value]) => `${key}=${value}` + ) + ), + Ansi.frame('request body', [body.toString()]), + ].join('\n') + ), + ]), + ] + ) + ); +} diff --git a/packages/world-testing/package.json b/packages/world-testing/package.json index 0ea63f9f14..499f455c5c 100644 --- a/packages/world-testing/package.json +++ b/packages/world-testing/package.json @@ -16,7 +16,7 @@ "directory": "packages/world-testing" }, "scripts": { - "build": "wf build && tsc", + "build": "rimraf dist/workflows && wf build && tsc", "clean": "tsc --build --clean && rm -rf dist .well-known* .workflow-data", "start": "node --watch src/server.mts", "test": "vitest" @@ -24,17 +24,18 @@ "dependencies": { "@hono/node-server": "1.19.5", "@workflow/cli": "workspace:*", - "workflow": "workspace:*", "@workflow/world": "workspace:*", "chalk": "5.6.2", "hono": "4.9.10", "jsonlines": "0.1.1", + "workflow": "workspace:*", "zod": "catalog:" }, "devDependencies": { - "@types/node": "catalog:", "@types/jsonlines": "0.1.5", + "@types/node": "catalog:", "@workflow/tsconfig": "workspace:*", + "rimraf": "6.1.2", "vitest": "catalog:" }, "peerDependencies": { diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index 427f696bd5..85f46f8955 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -3,7 +3,9 @@ import { MessageId, type Queue, QueuePayloadSchema, + type StepInvokePayload, ValidQueueName, + type WorkflowInvokePayload, } from '@workflow/world'; import * as z from 'zod'; import { type APIConfig, getHeaders, getHttpUrl } from './utils.js'; @@ -64,7 +66,17 @@ export function createQueue(config?: APIConfig): Queue { headers: Object.fromEntries(headers.entries()), }); - const queue: Queue['queue'] = async (queueName, payload, opts) => { + const sendMessage = async ({ + queueName, + payload, + deploymentId, + idempotencyKey, + }: { + queueName: ValidQueueName; + payload: StepInvokePayload | WorkflowInvokePayload; + deploymentId?: string; + idempotencyKey?: string; + }) => { // zod v3 doesn't have the `encode` method. We only support zod v4 officially, // but codebases that pin zod v3 are still common. const hasEncoder = typeof MessageWrapper.encode === 'function'; @@ -80,17 +92,23 @@ export function createQueue(config?: APIConfig): Queue { payload, queueName, // Store deploymentId in the message so it can be preserved when re-enqueueing - deploymentId: opts?.deploymentId, + deploymentId, }); const sanitizedQueueName = queueName.replace(/[^A-Za-z0-9-_]/g, '-'); - const { messageId } = await queueClient.send( - sanitizedQueueName, - encoded, - opts - ); + const { messageId } = await queueClient.send(sanitizedQueueName, encoded, { + deploymentId, + idempotencyKey, + }); return { messageId: MessageId.parse(messageId) }; }; + const queue: Queue['queue'] = (queueName, payload, opts) => + sendMessage({ + queueName, + payload, + ...opts, + }); + const createQueueHandler: Queue['createQueueHandler'] = (prefix, handler) => { return queueClient.handleCallback({ [`${prefix}*`]: { @@ -117,7 +135,7 @@ export function createQueue(config?: APIConfig): Queue { if (maxAllowedTimeout <= 0) { // Message is at its lifetime limit - re-enqueue to get a fresh 24-hour clock // Preserve the original deploymentId to ensure routing to the same deployment - await queue(queueName, payload, { deploymentId }); + await sendMessage({ deploymentId, queueName, payload }); return undefined; } else if (result.timeoutSeconds > maxAllowedTimeout) { // Clamp timeout to fit within remaining message lifetime diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b86c1df3f2..31ec63c818 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -30,6 +30,9 @@ catalogs: ai: specifier: 5.0.104 version: 5.0.104 + chalk: + specifier: 5.6.2 + version: 5.6.2 esbuild: specifier: ^0.25.11 version: 0.25.12 @@ -456,7 +459,7 @@ importers: specifier: 5.0.0 version: 5.0.0 chalk: - specifier: 5.6.2 + specifier: 'catalog:' version: 5.6.2 chokidar: specifier: 4.0.3 @@ -537,6 +540,9 @@ importers: '@workflow/world-vercel': specifier: workspace:* version: link:../world-vercel + chalk: + specifier: 'catalog:' + version: 5.6.2 debug: specifier: 4.4.3 version: 4.4.3(supports-color@8.1.1) @@ -629,6 +635,9 @@ importers: '@workflow/utils': specifier: workspace:* version: link:../utils + chalk: + specifier: 'catalog:' + version: 5.6.2 ms: specifier: 2.1.3 version: 2.1.3 @@ -645,6 +654,9 @@ importers: '@workflow/world': specifier: workspace:* version: link:../world + vitest: + specifier: 'catalog:' + version: 3.2.4(@types/debug@4.1.12)(@types/node@22.19.0)(jiti@2.6.1)(lightningcss@1.30.2)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.1) packages/next: dependencies: @@ -1127,6 +1139,9 @@ importers: async-sema: specifier: 3.1.1 version: 3.1.1 + chalk: + specifier: 'catalog:' + version: 5.6.2 ulid: specifier: 3.0.1 version: 3.0.1 @@ -1247,6 +1262,9 @@ importers: '@workflow/tsconfig': specifier: workspace:* version: link:../tsconfig + rimraf: + specifier: 6.1.2 + version: 6.1.2 vitest: specifier: 'catalog:' version: 3.2.4(@types/debug@4.1.12)(@types/node@22.19.0)(jiti@2.6.1)(lightningcss@1.30.2)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.1) @@ -9028,6 +9046,10 @@ packages: engines: {node: 20 || >=22} hasBin: true + glob@13.0.0: + resolution: {integrity: sha512-tvZgpqk6fz4BaNZ66ZsRaZnbHvP/jG3uKJvAZOwEVUL4RTA5nJeeLYfyN9/VA8NX/V3IBG+hkeuGpKjvELkVhA==} + engines: {node: 20 || >=22} + global-directory@4.0.1: resolution: {integrity: sha512-wHTUcDUoZ1H5/0iVqEudYW4/kAlN5cZ3j/bXn0Dpbizl9iaUVeWSHqiOjsgk6OW2bkLclbBjzewBz6weQ1zA2Q==} engines: {node: '>=18'} @@ -11782,6 +11804,11 @@ packages: rfdc@1.4.1: resolution: {integrity: sha512-q1b3N5QkRUWUl7iyylaaj3kOpIT0N2i9MqIEQXP73GVsN9cw3fdx8X63cEmWhJGi2PPCF23Ijp7ktmd39rawIA==} + rimraf@6.1.2: + resolution: {integrity: sha512-cFCkPslJv7BAXJsYlK1dZsbP8/ZNLkCAQ0bi1hf5EKX2QHegmDFEFA6QhuYJlk7UDdc+02JjO80YSOrWPpw06g==} + engines: {node: 20 || >=22} + hasBin: true + robust-predicates@3.0.2: resolution: {integrity: sha512-IXgzBWvWQwE6PrDI05OvmXUIruQTcoMDzRsOd5CDvHCVLcLHMTSYvOK5Cm46kWqlV3yAbuSpBZdJ5oP5OUoStg==} @@ -18971,7 +18998,7 @@ snapshots: '@ts-morph/common@0.28.1': dependencies: - minimatch: 10.0.3 + minimatch: 10.1.1 path-browserify: 1.0.1 tinyglobby: 0.2.14 @@ -19555,6 +19582,14 @@ snapshots: chai: 5.2.1 tinyrainbow: 2.0.0 + '@vitest/mocker@3.2.4(vite@7.1.12(@types/node@22.19.0)(jiti@2.6.1)(lightningcss@1.30.2)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.1))': + dependencies: + '@vitest/spy': 3.2.4 + estree-walker: 3.0.3 + magic-string: 0.30.21 + optionalDependencies: + vite: 7.1.12(@types/node@22.19.0)(jiti@2.6.1)(lightningcss@1.30.2)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.1) + '@vitest/mocker@3.2.4(vite@7.1.12(@types/node@24.6.2)(jiti@2.6.1)(lightningcss@1.30.2)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.1))': dependencies: '@vitest/spy': 3.2.4 @@ -22210,6 +22245,12 @@ snapshots: package-json-from-dist: 1.0.1 path-scurry: 2.0.1 + glob@13.0.0: + dependencies: + minimatch: 10.1.1 + minipass: 7.1.2 + path-scurry: 2.0.1 + global-directory@4.0.1: dependencies: ini: 4.1.1 @@ -26114,6 +26155,11 @@ snapshots: rfdc@1.4.1: {} + rimraf@6.1.2: + dependencies: + glob: 13.0.0 + package-json-from-dist: 1.0.1 + robust-predicates@3.0.2: {} rollup-plugin-dts@6.2.3(rollup@4.53.2)(typescript@5.9.3): @@ -27675,7 +27721,7 @@ snapshots: dependencies: '@types/chai': 5.2.2 '@vitest/expect': 3.2.4 - '@vitest/mocker': 3.2.4(vite@7.1.12(@types/node@24.6.2)(jiti@2.6.1)(lightningcss@1.30.2)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.1)) + '@vitest/mocker': 3.2.4(vite@7.1.12(@types/node@22.19.0)(jiti@2.6.1)(lightningcss@1.30.2)(terser@5.44.0)(tsx@4.20.6)(yaml@2.8.1)) '@vitest/pretty-format': 3.2.4 '@vitest/runner': 3.2.4 '@vitest/snapshot': 3.2.4 diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index 7a49576590..3d8809adce 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -13,6 +13,7 @@ catalog: "@vercel/queue": 0.0.0-alpha.33 "@vitest/coverage-v8": ^3.2.4 ai: 5.0.104 + chalk: 5.6.2 esbuild: ^0.25.11 nitro: 3.0.1-alpha.1 typescript: ^5.9.3 diff --git a/workbench/example/workflows/99_e2e.ts b/workbench/example/workflows/99_e2e.ts index 30638c1b0b..ac9ee1e1c6 100644 --- a/workbench/example/workflows/99_e2e.ts +++ b/workbench/example/workflows/99_e2e.ts @@ -233,6 +233,7 @@ export async function workflowAndStepMetadataWorkflow() { workflowRunId: workflowMetadata.workflowRunId, workflowStartedAt: workflowMetadata.workflowStartedAt, url: workflowMetadata.url, + workflowName: workflowMetadata.workflowName, }, stepMetadata, innerWorkflowMetadata,