Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/guard-step-consumer-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@workflow/core': patch
'@workflow/world': patch
---

Validate step, wait, and hook lifecycle events against replay ownership metadata.
9 changes: 7 additions & 2 deletions packages/core/src/abort-consistency.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,20 @@ describe('AbortController consistency', () => {
runId: 'wrun_test',
eventType: 'hook_created',
correlationId: hookItem.correlationId,
eventData: {},
eventData: {
token: 'test-token',
},
createdAt: new Date(),
},
{
eventId: 'evnt_1',
runId: 'wrun_test',
eventType: 'hook_received',
correlationId: hookItem.correlationId,
eventData: { payload: { reason: 'hook worked' } },
eventData: {
token: 'test-token',
payload: { reason: 'hook worked' },
},
createdAt: new Date(),
},
];
Expand Down
48 changes: 48 additions & 0 deletions packages/core/src/abort-controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
* (for real-time step propagation).
*/

import { WorkflowRuntimeError } from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import type { Event } from '@workflow/world';
import * as nanoid from 'nanoid';
import { monotonicFactory } from 'ulid';
import { describe, expect, it, vi } from 'vitest';
import { EventsConsumer } from './events-consumer.js';
import type { WorkflowOrchestratorContext } from './private.js';
import { dehydrateStepReturnValue } from './serialization.js';
import { createContext } from './vm/index.js';
import {
createCreateAbortController,
Expand Down Expand Up @@ -115,6 +118,51 @@ describe('AbortController in workflow VM', () => {
expect(controller.signal.aborted).toBe(true);
});

it('reports a WorkflowRuntimeError when abort hook_received token mismatches the controller', async () => {
ctx = setupWorkflowContext([]);
const ProbeAbortController = createCreateAbortController(ctx);
new ProbeAbortController();
const probeHookItem = [...ctx.invocationsQueue.values()].find(
(item) => item.type === 'hook'
);
expect(probeHookItem).toBeDefined();
if (!probeHookItem || probeHookItem.type !== 'hook') {
throw new Error('Expected abort hook item');
}

const ops: Promise<any>[] = [];
ctx = setupWorkflowContext([
{
eventId: 'evnt_0',
runId: 'wrun_test',
eventType: 'hook_received',
correlationId: probeHookItem.correlationId,
eventData: {
token: 'wrong-token',
payload: await dehydrateStepReturnValue(
{ reason: 'aborted' },
'wrun_test',
undefined,
ops
),
},
createdAt: new Date(),
},
]);

const errorReceived = withResolvers<Error>();
ctx.onWorkflowError = errorReceived.resolve;

const AbortController = createCreateAbortController(ctx);
new AbortController();

const workflowError = await errorReceived.promise;
expect(workflowError).toBeInstanceOf(WorkflowRuntimeError);
expect(workflowError?.message).toContain('hook_received');
expect(workflowError?.message).toContain('wrong-token');
expect(workflowError?.message).toContain(probeHookItem.token);
});

it('signal.aborted is false initially', () => {
ctx = setupWorkflowContext([]);
const AbortController = createCreateAbortController(ctx);
Expand Down
86 changes: 69 additions & 17 deletions packages/core/src/async-deserialization-ordering.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,21 @@ describe('async deserialization ordering', () => {
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: { result: resultA },
eventData: {
stepName: 'stepA',
result: resultA,
},
createdAt: new Date(),
},
{
eventId: 'evnt_1',
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCW',
eventData: { result: resultB },
eventData: {
stepName: 'stepB',
result: resultB,
},
createdAt: new Date(),
},
]);
Expand Down Expand Up @@ -160,23 +166,32 @@ describe('async deserialization ordering', () => {
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: { result: results[0] },
eventData: {
stepName: 'step1',
result: results[0],
},
createdAt: new Date(),
},
{
eventId: 'evnt_1',
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCW',
eventData: { result: results[1] },
eventData: {
stepName: 'step2',
result: results[1],
},
createdAt: new Date(),
},
{
eventId: 'evnt_2',
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCX',
eventData: { result: results[2] },
eventData: {
stepName: 'step3',
result: results[2],
},
createdAt: new Date(),
},
]);
Expand Down Expand Up @@ -246,22 +261,31 @@ describe('async deserialization ordering', () => {
runId: 'wrun_test',
eventType: 'hook_received',
correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: { payload: payloadA },
eventData: {
token: 'test-token',
payload: payloadA,
},
createdAt: new Date(),
},
{
eventId: 'evnt_1',
runId: 'wrun_test',
eventType: 'hook_received',
correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: { payload: payloadB },
eventData: {
token: 'test-token',
payload: payloadB,
},
createdAt: new Date(),
},
{
eventId: 'evnt_2',
runId: 'wrun_test',
eventType: 'hook_disposed',
correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: {
token: 'test-token',
},
createdAt: new Date(),
},
]);
Expand All @@ -282,7 +306,7 @@ describe('async deserialization ordering', () => {
);

const createHook = createCreateHook(ctx);
const hook = createHook();
const hook = createHook({ token: 'test-token' });

// Await two payloads from the hook
const resolveOrder: string[] = [];
Expand Down Expand Up @@ -329,23 +353,32 @@ describe('async deserialization ordering', () => {
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: { result: resultA },
eventData: {
stepName: 'stepA',
result: resultA,
},
createdAt: new Date(),
},
{
eventId: 'evnt_1',
runId: 'wrun_test',
eventType: 'step_failed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCW',
eventData: { error: errorB },
eventData: {
stepName: 'stepB',
error: errorB,
},
createdAt: new Date(),
},
{
eventId: 'evnt_2',
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCX',
eventData: { result: resultC },
eventData: {
stepName: 'stepC',
result: resultC,
},
createdAt: new Date(),
},
]);
Expand Down Expand Up @@ -481,7 +514,10 @@ describe('async deserialization ordering', () => {
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: { result: resultA },
eventData: {
stepName: 'stepA',
result: resultA,
},
createdAt: new Date(),
},
{
Expand All @@ -497,14 +533,20 @@ describe('async deserialization ordering', () => {
runId: 'wrun_test',
eventType: 'wait_completed',
correlationId: 'wait_01K11TFZ62YS0YYFDQ3E8B9YCW',
eventData: {
resumeAt: new Date('2024-01-01T00:00:05.000Z'),
},
createdAt: new Date(),
},
{
eventId: 'evnt_3',
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCX',
eventData: { result: resultC },
eventData: {
stepName: 'stepC',
result: resultC,
},
createdAt: new Date(),
},
]);
Expand Down Expand Up @@ -567,31 +609,41 @@ describe('async deserialization ordering', () => {
runId: 'wrun_test',
eventType: 'step_started',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: {},
eventData: {
stepName: 'stepA',
},
createdAt: new Date(),
},
{
eventId: 'evnt_1',
runId: 'wrun_test',
eventType: 'step_started',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCW',
eventData: {},
eventData: {
stepName: 'stepB',
},
createdAt: new Date(),
},
{
eventId: 'evnt_2',
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: { result: resultA },
eventData: {
stepName: 'stepA',
result: resultA,
},
createdAt: new Date(),
},
{
eventId: 'evnt_3',
runId: 'wrun_test',
eventType: 'step_completed',
correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCW',
eventData: { result: resultB },
eventData: {
stepName: 'stepB',
result: resultB,
},
createdAt: new Date(),
},
]);
Expand Down
Loading
Loading