Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fruity-shirts-shave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/world-local": patch
---

Ensure atomicity for hook token, matches world-postgres and world-vercel
21 changes: 21 additions & 0 deletions packages/world-local/src/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,27 @@ export async function deleteJSON(filePath: string): Promise<void> {
}
}

/**
* Atomically create a file using O_CREAT | O_EXCL flags.
* Returns true if the file was created, false if it already exists.
* This is atomic at the OS level, safe for concurrent access.
*/
export async function writeExclusive(
filePath: string,
data: string
): Promise<boolean> {
await ensureDir(path.dirname(filePath));
try {
await fs.writeFile(filePath, data, { flag: 'wx' });
return true;
} catch (error: any) {
if (error.code === 'EEXIST') {
return false;
}
throw error;
}
}

export async function listJSONFiles(dirPath: string): Promise<string[]> {
return listFilesByExtension(dirPath, '.json');
}
Expand Down
28 changes: 28 additions & 0 deletions packages/world-local/src/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,34 @@ describe('Storage', () => {
expect((result.event as any).eventData.token).toBe(token);
expect(result.hook).toBeUndefined();
});

it('should reject concurrent creates for the same token atomically', async () => {
const token = 'concurrent-token';

// Fire 5 concurrent hook creations with the same token
const results = await Promise.allSettled(
Array.from({ length: 5 }, (_, i) =>
storage.events.create(testRunId, {
eventType: 'hook_created',
correlationId: `concurrent_hook_${i}`,
eventData: { token },
})
)
);

const fulfilled = results.filter(
(r) => r.status === 'fulfilled'
) as PromiseFulfilledResult<any>[];
const created = fulfilled.filter(
(r) => r.value.event.eventType === 'hook_created'
);
const conflicts = fulfilled.filter(
(r) => r.value.event.eventType === 'hook_conflict'
);

expect(created).toHaveLength(1);
expect(conflicts).toHaveLength(4);
});
});

describe('get', () => {
Expand Down
46 changes: 31 additions & 15 deletions packages/world-local/src/storage/events-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import {
listJSONFiles,
paginatedFileSystemQuery,
readJSON,
writeExclusive,
writeJSON,
} from '../fs.js';
import { filterEventData } from './filters.js';
import { getObjectCreatedAt, monotonicUlid } from './helpers.js';
import { getObjectCreatedAt, hashToken, monotonicUlid } from './helpers.js';
import { deleteAllHooksForRun } from './hooks-storage.js';
import { handleLegacyEvent } from './legacy.js';

Expand Down Expand Up @@ -577,20 +578,24 @@ export function createEventsStorage(basedir: string): Storage['events'] {
isWebhook?: boolean;
};

// Check for duplicate token before creating hook
const hooksDir = path.join(basedir, 'hooks');
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only minor concern is that if an existing workspace(from before this version) has hook files, a duplicate token can be created once because the constraint file is missing?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We limit backwards compat in world-local to only ensure old runs can continue running, and can be viewed in o11y, so I think this is fine

const hookFiles = await listJSONFiles(hooksDir);
let hasConflict = false;
for (const file of hookFiles) {
const existingHookPath = path.join(hooksDir, `${file}.json`);
const existingHook = await readJSON(existingHookPath, HookSchema);
if (existingHook && existingHook.token === hookData.token) {
hasConflict = true;
break;
}
}
// Atomically claim the token using an exclusive-create constraint file.
// This avoids the TOCTOU race of the previous read-all-then-check approach.
const constraintPath = path.join(
basedir,
'hooks',
'tokens',
`${hashToken(hookData.token)}.json`
);
const tokenClaimed = await writeExclusive(
constraintPath,
JSON.stringify({
token: hookData.token,
hookId: data.correlationId,
runId: effectiveRunId,
})
);

if (hasConflict) {
if (!tokenClaimed) {
// Create hook_conflict event instead of hook_created
// This allows the workflow to continue and fail gracefully when the hook is awaited
const conflictEvent: Event = {
Expand Down Expand Up @@ -647,12 +652,23 @@ export function createEventsStorage(basedir: string): Storage['events'] {
);
await writeJSON(hookPath, hook);
} else if (data.eventType === 'hook_disposed') {
// Delete the hook when disposed
// Read the hook to get its token before deleting
const hookPath = path.join(
basedir,
'hooks',
`${data.correlationId}.json`
);
const existingHook = await readJSON(hookPath, HookSchema);
if (existingHook) {
// Delete the token constraint file to free up the token for reuse
const disposedConstraintPath = path.join(
basedir,
'hooks',
'tokens',
`${hashToken(existingHook.token)}.json`
);
await deleteJSON(disposedConstraintPath);
}
await deleteJSON(hookPath);
} else if (data.eventType === 'wait_created' && 'eventData' in data) {
// wait_created: Creates wait entity with status 'waiting'
Expand Down
8 changes: 8 additions & 0 deletions packages/world-local/src/storage/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import { createHash } from 'node:crypto';
import { monotonicFactory } from 'ulid';
import { ulidToDate } from '../fs.js';

/**
* Hash a hook token to produce a filesystem-safe constraint filename.
*/
export function hashToken(token: string): string {
return createHash('sha256').update(token).digest('hex');
}

/**
* Create a monotonic ULID factory that ensures ULIDs are always increasing
* even when generated within the same millisecond.
Expand Down
8 changes: 8 additions & 0 deletions packages/world-local/src/storage/hooks-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
readJSON,
} from '../fs.js';
import { filterHookData } from './filters.js';
import { hashToken } from './helpers.js';

/**
* Creates a hooks storage implementation using the filesystem.
Expand Down Expand Up @@ -113,6 +114,13 @@ export async function deleteAllHooksForRun(
const hookPath = path.join(hooksDir, `${file}.json`);
const hook = await readJSON(hookPath, HookSchema);
if (hook && hook.runId === runId) {
// Delete the token constraint file to free up the token
const constraintPath = path.join(
hooksDir,
'tokens',
`${hashToken(hook.token)}.json`
);
await deleteJSON(constraintPath);
await deleteJSON(hookPath);
}
}
Expand Down
Loading