Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/corrupted-event-log-code.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@workflow/core": patch
"@workflow/errors": patch
"@workflow/world": patch
---

Report corrupted event logs with a distinct `CorruptedEventLogError` type and `CORRUPTED_EVENT_LOG` run error code.
6 changes: 3 additions & 3 deletions packages/core/src/abort-controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* (for real-time step propagation).
*/

import { WorkflowRuntimeError } from '@workflow/errors';
import { CorruptedEventLogError } from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import type { Event } from '@workflow/world';
import * as nanoid from 'nanoid';
Expand Down Expand Up @@ -118,7 +118,7 @@ describe('AbortController in workflow VM', () => {
expect(controller.signal.aborted).toBe(true);
});

it('reports a WorkflowRuntimeError when abort hook_received token mismatches the controller', async () => {
it('reports a CorruptedEventLogError when abort hook_received token mismatches the controller', async () => {
ctx = setupWorkflowContext([]);
const ProbeAbortController = createCreateAbortController(ctx);
new ProbeAbortController();
Expand Down Expand Up @@ -157,7 +157,7 @@ describe('AbortController in workflow VM', () => {
new AbortController();

const workflowError = await errorReceived.promise;
expect(workflowError).toBeInstanceOf(WorkflowRuntimeError);
expect(workflowError).toBeInstanceOf(CorruptedEventLogError);
expect(workflowError?.message).toContain('hook_received');
expect(workflowError?.message).toContain('wrong-token');
expect(workflowError?.message).toContain(probeHookItem.token);
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/classify-error.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
CorruptedEventLogError,
HookConflictError,
RUN_ERROR_CODES,
WorkflowNotRegisteredError,
Expand All @@ -9,6 +10,12 @@ import { describe, expect, it } from 'vitest';
import { classifyRunError } from './classify-error.js';

describe('classifyRunError', () => {
it('classifies CorruptedEventLogError as CORRUPTED_EVENT_LOG', () => {
expect(
classifyRunError(new CorruptedEventLogError('corrupted event log'))
).toBe(RUN_ERROR_CODES.CORRUPTED_EVENT_LOG);
});

it('classifies WorkflowRuntimeError as RUNTIME_ERROR', () => {
expect(
classifyRunError(new WorkflowRuntimeError('corrupted event log'))
Expand Down
11 changes: 8 additions & 3 deletions packages/core/src/classify-error.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
CorruptedEventLogError,
RUN_ERROR_CODES,
type RunErrorCode,
StepNotRegisteredError,
Expand All @@ -7,7 +8,7 @@ import {
} from '@workflow/errors';

/**
* Set of error names that should classify as `RUNTIME_ERROR`. Each
* Set of error names that should classify as generic `RUNTIME_ERROR`. Each
* `*.is()` static does a name-based duck check, so subclassing alone is
* not enough — we have to enumerate every concrete subclass we want to
* recognize. Keep in sync with the `WorkflowRuntimeError` class hierarchy
Expand All @@ -25,8 +26,8 @@ const RUNTIME_ERROR_CHECKS = [
* After the structural separation of infrastructure vs user code error
* handling, the only errors that reach the `run_failed` try/catch are:
* - User code errors (throws from workflow functions, propagated step failures)
* - WorkflowRuntimeError and subclasses (corrupted event log, missing
* timestamps, workflow/step not registered, etc.)
* - WorkflowRuntimeError and subclasses (missing timestamps, workflow/step
* not registered, corrupted event log, etc.)
*
* Uses each subclass's `.is()` static (a name-based duck check) instead of
* a single `instanceof` check because workflows execute in a separate
Expand All @@ -36,6 +37,10 @@ const RUNTIME_ERROR_CHECKS = [
* errors as user errors.
*/
export function classifyRunError(err: unknown): RunErrorCode {
if (CorruptedEventLogError.is(err)) {
return RUN_ERROR_CODES.CORRUPTED_EVENT_LOG;
}

for (const isMatch of RUNTIME_ERROR_CHECKS) {
if (isMatch(err)) {
return RUN_ERROR_CODES.RUNTIME_ERROR;
Expand Down
39 changes: 39 additions & 0 deletions packages/core/src/describe-error.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
CorruptedEventLogError,
RUN_ERROR_CODES,
SerializationError,
StepNotRegisteredError,
Expand Down Expand Up @@ -63,6 +64,15 @@ describe('describeError', () => {
expect(result.hint).toContain('internal workflow SDK error');
});

test('CorruptedEventLogError is attributed to the SDK with a distinct code', () => {
const result = describeError(
new CorruptedEventLogError('corrupted event log')
);
expect(result.attribution).toBe('sdk');
expect(result.errorCode).toBe(RUN_ERROR_CODES.CORRUPTED_EVENT_LOG);
expect(result.hint).toContain('event log contains');
});

test('StepNotRegisteredError (subclass of WorkflowRuntimeError) is attributed to the SDK', () => {
const result = describeError(new StepNotRegisteredError('missingStep'));
expect(result.attribution).toBe('sdk');
Expand Down Expand Up @@ -144,6 +154,23 @@ describe('describeRunError', () => {
expect(result.hint).toContain('replay took too long');
});

test('CORRUPTED_EVENT_LOG errorCode is attributed to the SDK', () => {
const result = describeRunError({
errorCode: RUN_ERROR_CODES.CORRUPTED_EVENT_LOG,
});
expect(result.attribution).toBe('sdk');
expect(result.hint).toContain('event log contains');
});

test('CorruptedEventLogError name restores the distinct code', () => {
const result = describeRunError({
errorName: 'CorruptedEventLogError',
});
expect(result.attribution).toBe('sdk');
expect(result.errorCode).toBe(RUN_ERROR_CODES.CORRUPTED_EVENT_LOG);
expect(result.hint).toContain('event log contains');
});

test('MAX_DELIVERIES_EXCEEDED errorCode is attributed to the SDK', () => {
const result = describeRunError({
errorCode: RUN_ERROR_CODES.MAX_DELIVERIES_EXCEEDED,
Expand Down Expand Up @@ -226,6 +253,18 @@ describe('describeError — payload shape snapshots', () => {
`);
});

test('CorruptedEventLogError payload', () => {
expect(
describeError(new CorruptedEventLogError('event mismatch'))
).toMatchInlineSnapshot(`
{
"attribution": "sdk",
"errorCode": "CORRUPTED_EVENT_LOG",
"hint": "The workflow event log contains orphaned or mismatched events and cannot be replayed. This is an internal workflow SDK error; please report it with the runId.",
}
`);
});

test('REPLAY_TIMEOUT via precomputed errorCode payload', () => {
expect(
describeError(undefined, RUN_ERROR_CODES.REPLAY_TIMEOUT)
Expand Down
42 changes: 39 additions & 3 deletions packages/core/src/describe-error.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
CorruptedEventLogError,
RUN_ERROR_CODES,
type RunErrorCode,
SerializationError,
Expand Down Expand Up @@ -76,6 +77,8 @@ const CONTEXT_ERROR_HINT =
'A workflow-only or step-only API was called from the wrong context. The error message includes the exact API and how to move the call.';
const RUNTIME_ERROR_HINT =
'This is an internal workflow SDK error, not a bug in your code. If it keeps happening, please report it with the stack trace and the runId.';
const CORRUPTED_EVENT_LOG_HINT =
'The workflow event log contains orphaned or mismatched events and cannot be replayed. This is an internal workflow SDK error; please report it with the runId.';
const REPLAY_TIMEOUT_HINT =
'The workflow replay took too long. This usually means the event log is unusually large or the workflow function is doing heavy synchronous work between step boundaries.';
const MAX_DELIVERIES_HINT =
Expand All @@ -100,24 +103,41 @@ function normalizeErrorCode(code: string | undefined): RunErrorCode {
export function describeRunError(
signal: PersistedErrorSignal
): ErrorDescription {
const errorCode = normalizeErrorCode(signal.errorCode);
const name = signal.errorName;
const errorCode =
name === 'CorruptedEventLogError'
? RUN_ERROR_CODES.CORRUPTED_EVENT_LOG
: normalizeErrorCode(signal.errorCode);

if (name === 'SerializationError') {
return { attribution: 'user', errorCode, hint: SERIALIZATION_ERROR_HINT };
}
if (name && CONTEXT_ERROR_NAMES.has(name)) {
return { attribution: 'user', errorCode, hint: CONTEXT_ERROR_HINT };
}
if (name === 'WorkflowRuntimeError' || name === 'StepNotRegisteredError') {
return { attribution: 'sdk', errorCode, hint: RUNTIME_ERROR_HINT };
if (name === 'CorruptedEventLogError') {
return {
attribution: 'sdk',
errorCode,
hint: CORRUPTED_EVENT_LOG_HINT,
};
}
if (errorCode === RUN_ERROR_CODES.REPLAY_TIMEOUT) {
return { attribution: 'sdk', errorCode, hint: REPLAY_TIMEOUT_HINT };
}
if (errorCode === RUN_ERROR_CODES.MAX_DELIVERIES_EXCEEDED) {
return { attribution: 'sdk', errorCode, hint: MAX_DELIVERIES_HINT };
}
if (errorCode === RUN_ERROR_CODES.CORRUPTED_EVENT_LOG) {
return {
attribution: 'sdk',
errorCode,
hint: CORRUPTED_EVENT_LOG_HINT,
};
}
if (name === 'WorkflowRuntimeError' || name === 'StepNotRegisteredError') {
return { attribution: 'sdk', errorCode, hint: RUNTIME_ERROR_HINT };
}
if (errorCode === RUN_ERROR_CODES.RUNTIME_ERROR) {
return { attribution: 'sdk', errorCode, hint: RUNTIME_ERROR_HINT };
}
Expand Down Expand Up @@ -167,6 +187,14 @@ export function describeError(
};
}

if (CorruptedEventLogError.is(err)) {
return {
attribution: 'sdk',
errorCode: effectiveCode,
hint: CORRUPTED_EVENT_LOG_HINT,
};
}

if (err instanceof WorkflowRuntimeError) {
return {
attribution: 'sdk',
Expand All @@ -191,5 +219,13 @@ export function describeError(
};
}

if (effectiveCode === RUN_ERROR_CODES.CORRUPTED_EVENT_LOG) {
return {
attribution: 'sdk',
errorCode: effectiveCode,
hint: CORRUPTED_EVENT_LOG_HINT,
};
}

return { attribution: 'user', errorCode: effectiveCode };
}
12 changes: 6 additions & 6 deletions packages/core/src/log-format.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,21 @@ describe('composeLogLine', () => {
test('renders sdk-attributed errors with the sdk badge', () => {
const out = composeLogLine(
PREFIX,
'Workflow myFlow failed due to an SDK runtime error\nWorkflowRuntimeError: corrupted event log',
'Workflow myFlow failed due to an SDK runtime error\nCorruptedEventLogError: corrupted event log',
{
errorCode: 'RUNTIME_ERROR',
errorCode: 'CORRUPTED_EVENT_LOG',
errorAttribution: 'sdk',
errorName: 'WorkflowRuntimeError',
errorName: 'CorruptedEventLogError',
errorMessage: 'corrupted event log',
hint: 'This is an internal workflow SDK error.',
}
);
expect(out).toMatchInlineSnapshot(`
"[workflow-sdk] Workflow myFlow failed due to an SDK runtime error
sdk error · WorkflowRuntimeError
code RUNTIME_ERROR
sdk error · CorruptedEventLogError
code CORRUPTED_EVENT_LOG
hint: This is an internal workflow SDK error.
WorkflowRuntimeError: corrupted event log"
CorruptedEventLogError: corrupted event log"
`);
});

Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ describe('workflowEntrypoint replay guards', () => {
expect.objectContaining({
eventType: 'run_failed',
eventData: expect.objectContaining({
errorCode: RUN_ERROR_CODES.RUNTIME_ERROR,
errorCode: RUN_ERROR_CODES.CORRUPTED_EVENT_LOG,
}),
})
);
Expand Down Expand Up @@ -210,7 +210,7 @@ describe('workflowEntrypoint replay guards', () => {
expect.objectContaining({
eventType: 'run_failed',
eventData: expect.objectContaining({
errorCode: RUN_ERROR_CODES.RUNTIME_ERROR,
errorCode: RUN_ERROR_CODES.CORRUPTED_EVENT_LOG,
}),
})
);
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1067,9 +1067,9 @@ export function workflowEntrypoint(
);
}

// Classify the error: WorkflowRuntimeError indicates an
// internal issue (corrupted event log, missing data);
// everything else is a user code error.
// Classify the error: WorkflowRuntimeError indicates
// an SDK/runtime issue, and selected subclasses use
// more specific codes for backend tracking.
const errorCode = classifyRunError(err);

runtimeLogger.error('Error while running workflow', {
Expand Down
16 changes: 10 additions & 6 deletions packages/core/src/step.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { FatalError, WorkflowRuntimeError } from '@workflow/errors';
import {
CorruptedEventLogError,
FatalError,
WorkflowRuntimeError,
} from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import type { Event } from '@workflow/world';
import * as nanoid from 'nanoid';
Expand Down Expand Up @@ -459,7 +463,7 @@ describe('createUseStep', () => {
void add(1, 2);

const workflowError = await errorReceived.promise;
expect(workflowError).toBeInstanceOf(WorkflowRuntimeError);
expect(workflowError).toBeInstanceOf(CorruptedEventLogError);
expect(workflowError.message).toContain('Corrupted event log');
expect(workflowError.message).toContain('step_created');
expect(workflowError.message).toContain('subtract');
Expand Down Expand Up @@ -568,7 +572,7 @@ describe('createUseStep', () => {
void add(1, 2);

const workflowError = await errorReceived.promise;
expect(workflowError).toBeInstanceOf(WorkflowRuntimeError);
expect(workflowError).toBeInstanceOf(CorruptedEventLogError);
expect(workflowError.message).toContain('Corrupted event log');
expect(workflowError.message).toContain('step_completed');
expect(workflowError.message).toContain('subtract');
Expand Down Expand Up @@ -629,7 +633,7 @@ describe('createUseStep', () => {
void add(1, 2);

const workflowError = await errorReceived.promise;
expect(workflowError).toBeInstanceOf(WorkflowRuntimeError);
expect(workflowError).toBeInstanceOf(CorruptedEventLogError);
expect(workflowError.message).toContain('Corrupted event log');
expect(workflowError.message).toContain('step_failed');
expect(workflowError.message).toContain('subtract');
Expand Down Expand Up @@ -753,7 +757,7 @@ describe('createUseStep', () => {
expect(error?.message).toBe('Plain error message');
});

it('should invoke workflow error handler with WorkflowRuntimeError for unexpected event type', async () => {
it('should invoke workflow error handler with CorruptedEventLogError for unexpected event type', async () => {
// Simulate a corrupted event log where a step receives an unexpected event type
// (e.g., a wait_completed event when expecting step_completed/step_failed)
const ctx = setupWorkflowContext([
Expand All @@ -779,7 +783,7 @@ describe('createUseStep', () => {
const stepPromise = add(1, 2);

const workflowError = await errorReceived.promise;
expect(workflowError).toBeInstanceOf(WorkflowRuntimeError);
expect(workflowError).toBeInstanceOf(CorruptedEventLogError);
expect(workflowError?.message).toContain('Unexpected event type for step');
expect(workflowError?.message).toContain('step_01K11TFZ62YS0YYFDQ3E8B9YCV');
expect(workflowError?.message).toContain('add');
Expand Down
8 changes: 4 additions & 4 deletions packages/core/src/step.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { FatalError, WorkflowRuntimeError } from '@workflow/errors';
import { CorruptedEventLogError, FatalError } from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import { EventConsumerResult } from './events-consumer.js';
import { type StepInvocationQueueItem, WorkflowSuspension } from './global.js';
Expand Down Expand Up @@ -88,7 +88,7 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) {
if (typeof eventStepName === 'string' && eventStepName !== stepName) {
ctx.promiseQueue = ctx.promiseQueue.then(() => {
ctx.onWorkflowError(
new WorkflowRuntimeError(
new CorruptedEventLogError(
`Corrupted event log: step event ${event.eventType} for ${correlationId} belongs to "${eventStepName}", but the current step consumer is "${stepName}"`
)
);
Expand All @@ -106,7 +106,7 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) {
// but the step was never invoked in the workflow during replay.
ctx.promiseQueue = ctx.promiseQueue.then(() => {
reject(
new WorkflowRuntimeError(
new CorruptedEventLogError(
`Corrupted event log: step ${correlationId} (${stepName}) created but not found in invocation queue`
)
);
Expand Down Expand Up @@ -203,7 +203,7 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) {
// An unexpected event type has been received, this event log looks corrupted. Let's fail immediately.
ctx.promiseQueue = ctx.promiseQueue.then(() => {
ctx.onWorkflowError(
new WorkflowRuntimeError(
new CorruptedEventLogError(
`Unexpected event type for step ${correlationId} (name: ${stepName}) "${event.eventType}"`
)
);
Expand Down
Loading
Loading