Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions packages/world-vercel/src/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,123 @@ describe('createQueue', () => {
}
});

it('should auto-inject x-workflow-run-id header for workflow payloads', async () => {
mockSend.mockResolvedValue({ messageId: 'msg-123' });

const originalEnv = process.env.VERCEL_DEPLOYMENT_ID;
process.env.VERCEL_DEPLOYMENT_ID = 'dpl_test';

try {
const queue = createQueue();
await queue.queue('__wkf_workflow_test', { runId: 'wrun_abc123' });

expect(mockSend).toHaveBeenCalledTimes(1);
const sendOpts = mockSend.mock.calls[0][2];
expect(sendOpts.headers).toEqual(
expect.objectContaining({
'x-workflow-run-id': 'wrun_abc123',
})
);
} finally {
if (originalEnv !== undefined) {
process.env.VERCEL_DEPLOYMENT_ID = originalEnv;
} else {
delete process.env.VERCEL_DEPLOYMENT_ID;
}
}
});

it('should auto-inject x-workflow-run-id and x-workflow-step-id headers for step payloads', async () => {
mockSend.mockResolvedValue({ messageId: 'msg-123' });

const originalEnv = process.env.VERCEL_DEPLOYMENT_ID;
process.env.VERCEL_DEPLOYMENT_ID = 'dpl_test';

try {
const queue = createQueue();
await queue.queue('__wkf_step_myStep', {
workflowName: 'test-workflow',
workflowRunId: 'wrun_abc123',
workflowStartedAt: Date.now(),
stepId: 'step_xyz789',
});

expect(mockSend).toHaveBeenCalledTimes(1);
const sendOpts = mockSend.mock.calls[0][2];
expect(sendOpts.headers).toEqual(
expect.objectContaining({
'x-workflow-run-id': 'wrun_abc123',
'x-workflow-step-id': 'step_xyz789',
})
);
} finally {
if (originalEnv !== undefined) {
process.env.VERCEL_DEPLOYMENT_ID = originalEnv;
} else {
delete process.env.VERCEL_DEPLOYMENT_ID;
}
}
});

it('should not inject workflow headers for health check payloads', async () => {
mockSend.mockResolvedValue({ messageId: 'msg-123' });

const originalEnv = process.env.VERCEL_DEPLOYMENT_ID;
process.env.VERCEL_DEPLOYMENT_ID = 'dpl_test';

try {
const queue = createQueue();
await queue.queue('__wkf_workflow_health_check', {
__healthCheck: true as const,
correlationId: 'corr_123',
});

expect(mockSend).toHaveBeenCalledTimes(1);
const sendOpts = mockSend.mock.calls[0][2];
expect(sendOpts.headers).toEqual({});
} finally {
if (originalEnv !== undefined) {
process.env.VERCEL_DEPLOYMENT_ID = originalEnv;
} else {
delete process.env.VERCEL_DEPLOYMENT_ID;
}
}
});

it('should allow caller headers to override auto-injected headers', async () => {
mockSend.mockResolvedValue({ messageId: 'msg-123' });

const originalEnv = process.env.VERCEL_DEPLOYMENT_ID;
process.env.VERCEL_DEPLOYMENT_ID = 'dpl_test';

try {
const queue = createQueue();
await queue.queue(
'__wkf_workflow_test',
{ runId: 'wrun_abc123' },
{
headers: {
'x-workflow-run-id': 'wrun_override',
'x-custom-header': 'custom-value',
},
}
);

expect(mockSend).toHaveBeenCalledTimes(1);
const sendOpts = mockSend.mock.calls[0][2];
expect(sendOpts.headers).toEqual({
'x-workflow-run-id': 'wrun_override',
'x-custom-header': 'custom-value',
});
} finally {
if (originalEnv !== undefined) {
process.env.VERCEL_DEPLOYMENT_ID = originalEnv;
} else {
delete process.env.VERCEL_DEPLOYMENT_ID;
}
}
});

it('should rethrow non-idempotency errors', async () => {
mockSend.mockRejectedValue(new Error('Some other error'));

Expand Down Expand Up @@ -324,6 +441,58 @@ describe('createQueue', () => {
expect(sendClientCall).toBeDefined();
});

it('should auto-inject x-workflow-run-id header on delayed re-enqueue', async () => {
mockSend.mockResolvedValue({ messageId: 'new-msg-123' });
const handler = setupHandler({ timeoutSeconds: 300 });

await handler(
{
payload: { runId: 'wrun_abc123' },
queueName: '__wkf_workflow_test',
deploymentId: 'dpl_original',
},
{ messageId: 'msg-123', deliveryCount: 1, createdAt: new Date() }
);

expect(mockSend).toHaveBeenCalledTimes(1);
const sendOpts = mockSend.mock.calls[0][2];
expect(sendOpts.headers).toEqual(
expect.objectContaining({
'x-workflow-run-id': 'wrun_abc123',
})
);
});

it('should auto-inject step headers on delayed re-enqueue for step payloads', async () => {
mockSend.mockResolvedValue({ messageId: 'new-msg-123' });
const handler = setupHandler({ timeoutSeconds: 300 });

const stepPayload = {
workflowName: 'test-workflow',
workflowRunId: 'wrun_abc123',
workflowStartedAt: Date.now(),
stepId: 'step_xyz789',
};

await handler(
{
payload: stepPayload,
queueName: '__wkf_step_myStep',
deploymentId: 'dpl_original',
},
{ messageId: 'msg-123', deliveryCount: 1, createdAt: new Date() }
);

expect(mockSend).toHaveBeenCalledTimes(1);
const sendOpts = mockSend.mock.calls[0][2];
expect(sendOpts.headers).toEqual(
expect.objectContaining({
'x-workflow-run-id': 'wrun_abc123',
'x-workflow-step-id': 'step_xyz789',
})
);
});

it('should handle step payloads correctly', async () => {
mockSend.mockResolvedValue({ messageId: 'new-msg-123' });
const handler = setupHandler({ timeoutSeconds: 3600 }); // 1 hour
Expand Down
Loading