From 79a4827a7ef50efbcbf105c52ced0580c339e1c3 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 12 Mar 2026 10:43:25 -0700 Subject: [PATCH 1/3] [world-local] Enforce hook token uniqueness and atomicity, matches other worlds Signed-off-by: Peter Wielander --- .changeset/fruity-shirts-shave.md | 5 ++ packages/world-local/src/fs.ts | 21 ++++++++ packages/world-local/src/storage.test.ts | 28 ++++++++++ .../world-local/src/storage/events-storage.ts | 52 ++++++++++++++----- .../world-local/src/storage/hooks-storage.ts | 5 ++ 5 files changed, 97 insertions(+), 14 deletions(-) create mode 100644 .changeset/fruity-shirts-shave.md diff --git a/.changeset/fruity-shirts-shave.md b/.changeset/fruity-shirts-shave.md new file mode 100644 index 0000000000..21ebf6f527 --- /dev/null +++ b/.changeset/fruity-shirts-shave.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-local": patch +--- + +Ensure atomicity for hook token, matches world-postgres and world-vercel diff --git a/packages/world-local/src/fs.ts b/packages/world-local/src/fs.ts index 81233763ab..9f53dc60b5 100644 --- a/packages/world-local/src/fs.ts +++ b/packages/world-local/src/fs.ts @@ -187,6 +187,27 @@ export async function deleteJSON(filePath: string): Promise { } } +/** + * Atomically create a file using O_CREAT | O_EXCL flags. + * Returns true if the file was created, false if it already exists. + * This is atomic at the OS level, safe for concurrent access. + */ +export async function writeExclusive( + filePath: string, + data: string +): Promise { + await ensureDir(path.dirname(filePath)); + try { + await fs.writeFile(filePath, data, { flag: 'wx' }); + return true; + } catch (error: any) { + if (error.code === 'EEXIST') { + return false; + } + throw error; + } +} + export async function listJSONFiles(dirPath: string): Promise { return listFilesByExtension(dirPath, '.json'); } diff --git a/packages/world-local/src/storage.test.ts b/packages/world-local/src/storage.test.ts index 3566023d4e..b6f5b079c0 100644 --- a/packages/world-local/src/storage.test.ts +++ b/packages/world-local/src/storage.test.ts @@ -1300,6 +1300,34 @@ describe('Storage', () => { expect((result.event as any).eventData.token).toBe(token); expect(result.hook).toBeUndefined(); }); + + it('should reject concurrent creates for the same token atomically', async () => { + const token = 'concurrent-token'; + + // Fire 5 concurrent hook creations with the same token + const results = await Promise.allSettled( + Array.from({ length: 5 }, (_, i) => + storage.events.create(testRunId, { + eventType: 'hook_created', + correlationId: `concurrent_hook_${i}`, + eventData: { token }, + }) + ) + ); + + const fulfilled = results.filter( + (r) => r.status === 'fulfilled' + ) as PromiseFulfilledResult[]; + const created = fulfilled.filter( + (r) => r.value.event.eventType === 'hook_created' + ); + const conflicts = fulfilled.filter( + (r) => r.value.event.eventType === 'hook_conflict' + ); + + expect(created).toHaveLength(1); + expect(conflicts).toHaveLength(4); + }); }); describe('get', () => { diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index 806a439936..fcdf84b298 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -1,3 +1,4 @@ +import { createHash } from 'node:crypto'; import path from 'node:path'; import { RunNotSupportedError, WorkflowAPIError } from '@workflow/errors'; import type { @@ -27,6 +28,7 @@ import { listJSONFiles, paginatedFileSystemQuery, readJSON, + writeExclusive, writeJSON, } from '../fs.js'; import { filterEventData } from './filters.js'; @@ -577,20 +579,28 @@ export function createEventsStorage(basedir: string): Storage['events'] { isWebhook?: boolean; }; - // Check for duplicate token before creating hook - const hooksDir = path.join(basedir, 'hooks'); - const hookFiles = await listJSONFiles(hooksDir); - let hasConflict = false; - for (const file of hookFiles) { - const existingHookPath = path.join(hooksDir, `${file}.json`); - const existingHook = await readJSON(existingHookPath, HookSchema); - if (existingHook && existingHook.token === hookData.token) { - hasConflict = true; - break; - } - } + // Atomically claim the token using an exclusive-create constraint file. + // This mirrors workflow-server's HookTokenConstraintEntity pattern and + // avoids the TOCTOU race of the previous read-all-then-check approach. + const tokenHash = createHash('sha256') + .update(hookData.token) + .digest('hex'); + const constraintPath = path.join( + basedir, + 'hooks', + 'tokens', + `${tokenHash}.json` + ); + const tokenClaimed = await writeExclusive( + constraintPath, + JSON.stringify({ + token: hookData.token, + hookId: data.correlationId, + runId: effectiveRunId, + }) + ); - if (hasConflict) { + if (!tokenClaimed) { // Create hook_conflict event instead of hook_created // This allows the workflow to continue and fail gracefully when the hook is awaited const conflictEvent: Event = { @@ -647,12 +657,26 @@ export function createEventsStorage(basedir: string): Storage['events'] { ); await writeJSON(hookPath, hook); } else if (data.eventType === 'hook_disposed') { - // Delete the hook when disposed + // Read the hook to get its token before deleting const hookPath = path.join( basedir, 'hooks', `${data.correlationId}.json` ); + const existingHook = await readJSON(hookPath, HookSchema); + if (existingHook) { + // Delete the token constraint file to free up the token for reuse + const disposedTokenHash = createHash('sha256') + .update(existingHook.token) + .digest('hex'); + const disposedConstraintPath = path.join( + basedir, + 'hooks', + 'tokens', + `${disposedTokenHash}.json` + ); + await deleteJSON(disposedConstraintPath); + } await deleteJSON(hookPath); } else if (data.eventType === 'wait_created' && 'eventData' in data) { // wait_created: Creates wait entity with status 'waiting' diff --git a/packages/world-local/src/storage/hooks-storage.ts b/packages/world-local/src/storage/hooks-storage.ts index 331938239a..ea7f3291e7 100644 --- a/packages/world-local/src/storage/hooks-storage.ts +++ b/packages/world-local/src/storage/hooks-storage.ts @@ -1,3 +1,4 @@ +import { createHash } from 'node:crypto'; import path from 'node:path'; import { HookNotFoundError } from '@workflow/errors'; import type { @@ -113,6 +114,10 @@ export async function deleteAllHooksForRun( const hookPath = path.join(hooksDir, `${file}.json`); const hook = await readJSON(hookPath, HookSchema); if (hook && hook.runId === runId) { + // Delete the token constraint file to free up the token + const tokenHash = createHash('sha256').update(hook.token).digest('hex'); + const constraintPath = path.join(hooksDir, 'tokens', `${tokenHash}.json`); + await deleteJSON(constraintPath); await deleteJSON(hookPath); } } From 577cbeff4bae0d245da822b7269def7254a3f465 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 12 Mar 2026 11:33:17 -0700 Subject: [PATCH 2/3] comment Signed-off-by: Peter Wielander --- packages/world-local/src/storage/events-storage.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index fcdf84b298..a085e0a806 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -580,8 +580,7 @@ export function createEventsStorage(basedir: string): Storage['events'] { }; // Atomically claim the token using an exclusive-create constraint file. - // This mirrors workflow-server's HookTokenConstraintEntity pattern and - // avoids the TOCTOU race of the previous read-all-then-check approach. + // This avoids the TOCTOU race of the previous read-all-then-check approach. const tokenHash = createHash('sha256') .update(hookData.token) .digest('hex'); From dfdf60ae61e697b445bfe0a9e54b28bc5dfe1e8b Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 12 Mar 2026 11:36:10 -0700 Subject: [PATCH 3/3] extract helper file Signed-off-by: Peter Wielander --- packages/world-local/src/storage/events-storage.ts | 13 +++---------- packages/world-local/src/storage/helpers.ts | 8 ++++++++ packages/world-local/src/storage/hooks-storage.ts | 9 ++++++--- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index a085e0a806..12ab6556de 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -1,4 +1,3 @@ -import { createHash } from 'node:crypto'; import path from 'node:path'; import { RunNotSupportedError, WorkflowAPIError } from '@workflow/errors'; import type { @@ -32,7 +31,7 @@ import { writeJSON, } from '../fs.js'; import { filterEventData } from './filters.js'; -import { getObjectCreatedAt, monotonicUlid } from './helpers.js'; +import { getObjectCreatedAt, hashToken, monotonicUlid } from './helpers.js'; import { deleteAllHooksForRun } from './hooks-storage.js'; import { handleLegacyEvent } from './legacy.js'; @@ -581,14 +580,11 @@ export function createEventsStorage(basedir: string): Storage['events'] { // Atomically claim the token using an exclusive-create constraint file. // This avoids the TOCTOU race of the previous read-all-then-check approach. - const tokenHash = createHash('sha256') - .update(hookData.token) - .digest('hex'); const constraintPath = path.join( basedir, 'hooks', 'tokens', - `${tokenHash}.json` + `${hashToken(hookData.token)}.json` ); const tokenClaimed = await writeExclusive( constraintPath, @@ -665,14 +661,11 @@ export function createEventsStorage(basedir: string): Storage['events'] { const existingHook = await readJSON(hookPath, HookSchema); if (existingHook) { // Delete the token constraint file to free up the token for reuse - const disposedTokenHash = createHash('sha256') - .update(existingHook.token) - .digest('hex'); const disposedConstraintPath = path.join( basedir, 'hooks', 'tokens', - `${disposedTokenHash}.json` + `${hashToken(existingHook.token)}.json` ); await deleteJSON(disposedConstraintPath); } diff --git a/packages/world-local/src/storage/helpers.ts b/packages/world-local/src/storage/helpers.ts index 34fd450b0a..8b8e3c8969 100644 --- a/packages/world-local/src/storage/helpers.ts +++ b/packages/world-local/src/storage/helpers.ts @@ -1,6 +1,14 @@ +import { createHash } from 'node:crypto'; import { monotonicFactory } from 'ulid'; import { ulidToDate } from '../fs.js'; +/** + * Hash a hook token to produce a filesystem-safe constraint filename. + */ +export function hashToken(token: string): string { + return createHash('sha256').update(token).digest('hex'); +} + /** * Create a monotonic ULID factory that ensures ULIDs are always increasing * even when generated within the same millisecond. diff --git a/packages/world-local/src/storage/hooks-storage.ts b/packages/world-local/src/storage/hooks-storage.ts index ea7f3291e7..ef86ccd9e6 100644 --- a/packages/world-local/src/storage/hooks-storage.ts +++ b/packages/world-local/src/storage/hooks-storage.ts @@ -1,4 +1,3 @@ -import { createHash } from 'node:crypto'; import path from 'node:path'; import { HookNotFoundError } from '@workflow/errors'; import type { @@ -17,6 +16,7 @@ import { readJSON, } from '../fs.js'; import { filterHookData } from './filters.js'; +import { hashToken } from './helpers.js'; /** * Creates a hooks storage implementation using the filesystem. @@ -115,8 +115,11 @@ export async function deleteAllHooksForRun( const hook = await readJSON(hookPath, HookSchema); if (hook && hook.runId === runId) { // Delete the token constraint file to free up the token - const tokenHash = createHash('sha256').update(hook.token).digest('hex'); - const constraintPath = path.join(hooksDir, 'tokens', `${tokenHash}.json`); + const constraintPath = path.join( + hooksDir, + 'tokens', + `${hashToken(hook.token)}.json` + ); await deleteJSON(constraintPath); await deleteJSON(hookPath); }