diff --git a/docs/content/docs/v4/cookbook/common-patterns/meta.json b/docs/content/docs/v4/cookbook/common-patterns/meta.json index 3c1ed7e585..e5b397f663 100644 --- a/docs/content/docs/v4/cookbook/common-patterns/meta.json +++ b/docs/content/docs/v4/cookbook/common-patterns/meta.json @@ -10,6 +10,7 @@ "scheduling", "timeouts", "idempotency", + "resume-or-start", "webhooks" ] } diff --git a/docs/content/docs/v4/cookbook/common-patterns/resume-or-start.mdx b/docs/content/docs/v4/cookbook/common-patterns/resume-or-start.mdx new file mode 100644 index 0000000000..ee32c1ed01 --- /dev/null +++ b/docs/content/docs/v4/cookbook/common-patterns/resume-or-start.mdx @@ -0,0 +1,90 @@ +--- +title: Resume or start by hook token +description: Look up an existing workflow run by a deterministic hook token before calling start(), and why this pattern is not fully atomic under concurrency. +type: guide +summary: Use getHookByToken (or world.hooks.getByToken) to attach to an in-flight run keyed by a stable app ID encoded into the hook token; otherwise start a new run. Pair with stronger idempotency when duplicate starts must be impossible. +--- + +Some HTTP or RPC handlers need **one durable session per stable application identifier**—an ID your app already uses for something in the product domain, such as a **chat thread ID**, **order ID**, **invoice ID**, or **tenant-scoped resource key** (often a primary key, slug, or composite you store or pass in URLs). That value is **not** the Workflow **`runId`**, which the runtime assigns when you call **`start()`**. Instead you **derive a hook token** from the stable ID (for example `` `session:${chatThreadId}` `` for a chat) so every request about the same real-world entity computes the **same** token. + +If a run is already active and has registered a hook with that token, you can **look up the hook**, reuse `hook.runId`, and avoid starting a second workflow. If nothing is registered yet, you **`start()`** a new run. + +## When to use this + +- **Session handoff** — A client or edge handler hits your API without a `runId`, but you already know a **stable application ID** (path param, session subject, database row key, etc.) and can derive the hook token from it. +- **Reconnect before resume** — You want `getRun(runId)` (streams, `returnValue`, status) without persisting `runId` in your own store, as long as the workflow registers a hook with that token early in the run. +- **Custom worlds / admin tools** — You already use [`world.hooks.getByToken()`](/docs/api-reference/workflow-api/world/storage#look-up-hook-by-token) against storage; the app-level pattern is the same. + +## Pattern + +1. Build a **deterministic hook token** from your **stable application ID** plus a fixed prefix or namespace so different features do not collide (for example `` `session:${chatThreadId}` ``). +2. Call **`getHookByToken(token)`** from `workflow/api` (or `world.hooks.getByToken(token)` if you hold a `World` instance). +3. If a hook exists, use **`hook.runId`** with [`getRun()`](/docs/api-reference/workflow-api/get-run) — for example await **`run.returnValue`** when you need the workflow outcome. +4. If no hook is found (typically `404` / not found), call **[`start()`](/docs/api-reference/workflow-api/start)** with the same arguments your workflow would use to create that hook on first execution. + +```typescript lineNumbers +import { start, getRun, getHookByToken } from "workflow/api"; +import { sessionWorkflow } from "./workflows/session"; + +function sessionToken(chatThreadId: string) { + return `session:${chatThreadId}`; +} + +/** `chatThreadId` = your app's stable ID for the conversation (not the Workflow runId). */ +export async function getOrStartSession(chatThreadId: string) { + const token = sessionToken(chatThreadId); + + const existing = await getHookByToken(token).catch(() => null); // [!code highlight] + + if (existing) { + const run = getRun(existing.runId); // [!code highlight] + return run.returnValue; + } + + const run = await start(sessionWorkflow, [chatThreadId]); // [!code highlight] + return run.returnValue; +} +``` + +Your workflow should create the hook with the **same token shape** early (before long-running work), so the lookup succeeds once the run is registered: + +```typescript lineNumbers +import { defineHook } from "workflow"; + +export const sessionHook = defineHook(); + +export async function sessionWorkflow(chatThreadId: string) { + "use workflow"; + + const token = `session:${chatThreadId}`; + const hook = sessionHook.create({ token }); // [!code highlight] + + // ... durable session logic, await hook, etc. +} +``` + +## Concurrency and atomicity + + +This pattern is **not atomic**. Two concurrent requests can both observe “no hook” **before** the first run has written its `hook_created` event, so both can call **`start()`** and you can get **two runs** for the same stable application ID (the same derived hook token). + + +Mitigations: + +- **Accept rare duplicates** and detect them downstream (second run fails on `HookConflictError`, or you branch on run state). +- **Add an outside-workflow coordination layer** — for example a **row keyed by your stable application ID** (the same value you embed in the hook token) with a **unique constraint** that stores the authoritative **`runId`**, using **insert-if-absent**, **`INSERT … ON CONFLICT`**, or a **transaction with row-level locking** so only one concurrent handler reaches **`start()`** for that ID. A bare unique constraint on your domain table primary key does not by itself dedupe workflow runs; you need a **dedicated mapping or lease** tied to that ID. **`start()`** does not expose an idempotency-key option today (only `world`, `specVersion`, and `deploymentId` in [`StartOptions`](/docs/api-reference/workflow-api/start)). +- **Serialize creates** for the same stable ID (queue, advisory lock, or other mutual exclusion) if you must not spin up a second run even briefly. + +For replay-safe **step** side effects inside a workflow, still follow [Idempotency](/cookbook/common-patterns/idempotency) (`getStepMetadata().stepId`, external idempotency keys). + +## Key APIs + +- [`getHookByToken()`](/docs/api-reference/workflow-api/get-hook-by-token) — resolve `runId` (and metadata) from a hook token in app code. +- [`world.hooks.getByToken()`](/docs/api-reference/workflow-api/world/storage#look-up-hook-by-token) — same lookup against a `World` storage interface. +- [`start()`](/docs/api-reference/workflow-api/start) — enqueue a new run when no hook exists yet. +- [`getRun()`](/docs/api-reference/workflow-api/get-run) — attach to an existing run by `runId` (`returnValue`, streams, status). + +## Related + +- [Distributed Abort Controller](/cookbook/advanced/distributed-abort-controller) — full example of reconnect-or-`start()` for a cross-process abort controller. +- [Idempotency](/cookbook/common-patterns/idempotency) — keys for external APIs and duplicate-safe side effects inside steps. diff --git a/docs/content/docs/v4/cookbook/index.mdx b/docs/content/docs/v4/cookbook/index.mdx index 94cb6d9d18..39ec4d0240 100644 --- a/docs/content/docs/v4/cookbook/index.mdx +++ b/docs/content/docs/v4/cookbook/index.mdx @@ -22,6 +22,7 @@ A curated collection of workflow patterns with clean, copy-paste code examples f - [**Scheduling**](/cookbook/common-patterns/scheduling) — Use durable sleep to schedule actions minutes, hours, or weeks ahead - [**Timeouts**](/cookbook/common-patterns/timeouts) — Add deadlines to slow steps, hooks, and webhooks by racing them against a durable sleep - [**Idempotency**](/cookbook/common-patterns/idempotency) — Ensure side effects happen exactly once, even when steps retry +- [**Resume or start by hook token**](/cookbook/common-patterns/resume-or-start) — Encode a stable application ID in the hook token, look up an existing run with `getHookByToken` before `start()`, and understand the concurrency limits - [**Webhooks**](/cookbook/common-patterns/webhooks) — Receive HTTP callbacks from external services and process them durably ## Integrations diff --git a/docs/content/docs/v5/cookbook/common-patterns/meta.json b/docs/content/docs/v5/cookbook/common-patterns/meta.json index 3c1ed7e585..e5b397f663 100644 --- a/docs/content/docs/v5/cookbook/common-patterns/meta.json +++ b/docs/content/docs/v5/cookbook/common-patterns/meta.json @@ -10,6 +10,7 @@ "scheduling", "timeouts", "idempotency", + "resume-or-start", "webhooks" ] } diff --git a/docs/content/docs/v5/cookbook/common-patterns/resume-or-start.mdx b/docs/content/docs/v5/cookbook/common-patterns/resume-or-start.mdx new file mode 100644 index 0000000000..8dab98eca4 --- /dev/null +++ b/docs/content/docs/v5/cookbook/common-patterns/resume-or-start.mdx @@ -0,0 +1,89 @@ +--- +title: Resume or start by hook token +description: Look up an existing workflow run by a deterministic hook token before calling start(), and why this pattern is not fully atomic under concurrency. +type: guide +summary: Use getHookByToken (or world.hooks.getByToken) to attach to an in-flight run keyed by a stable app ID encoded into the hook token; otherwise start a new run. Pair with stronger idempotency when duplicate starts must be impossible. +--- + +Some HTTP or RPC handlers need **one durable session per stable application identifier**—an ID your app already uses for something in the product domain, such as a **chat thread ID**, **order ID**, **invoice ID**, or **tenant-scoped resource key** (often a primary key, slug, or composite you store or pass in URLs). That value is **not** the Workflow **`runId`**, which the runtime assigns when you call **`start()`**. Instead you **derive a hook token** from the stable ID (for example `` `session:${chatThreadId}` `` for a chat) so every request about the same real-world entity computes the **same** token. + +If a run is already active and has registered a hook with that token, you can **look up the hook**, reuse `hook.runId`, and avoid starting a second workflow. If nothing is registered yet, you **`start()`** a new run. + +## When to use this + +- **Session handoff** — A client or edge handler hits your API without a `runId`, but you already know a **stable application ID** (path param, session subject, database row key, etc.) and can derive the hook token from it. +- **Reconnect before resume** — You want `getRun(runId)` (streams, `returnValue`, status) without persisting `runId` in your own store, as long as the workflow registers a hook with that token early in the run. +- **Custom worlds / admin tools** — You already use [`world.hooks.getByToken()`](/docs/api-reference/workflow-api/world/storage#look-up-hook-by-token) against storage; the app-level pattern is the same. + +## Pattern + +1. Build a **deterministic hook token** from your **stable application ID** plus a fixed prefix or namespace so different features do not collide (for example `` `session:${chatThreadId}` ``). +2. Call **`getHookByToken(token)`** from `workflow/api` (or `world.hooks.getByToken(token)` if you hold a `World` instance). +3. If a hook exists, use **`hook.runId`** with [`getRun()`](/docs/api-reference/workflow-api/get-run) — for example await **`run.returnValue`** when you need the workflow outcome. +4. If no hook is found (typically `404` / not found), call **[`start()`](/docs/api-reference/workflow-api/start)** with the same arguments your workflow would use to create that hook on first execution. + +```typescript lineNumbers +import { start, getRun, getHookByToken } from "workflow/api"; +import { sessionWorkflow } from "./workflows/session"; + +function sessionToken(chatThreadId: string) { + return `session:${chatThreadId}`; +} + +/** `chatThreadId` = your app's stable ID for the conversation (not the Workflow runId). */ +export async function getOrStartSession(chatThreadId: string) { + const token = sessionToken(chatThreadId); + + const existing = await getHookByToken(token).catch(() => null); // [!code highlight] + + if (existing) { + const run = getRun(existing.runId); // [!code highlight] + return run.returnValue; + } + + const run = await start(sessionWorkflow, [chatThreadId]); // [!code highlight] + return run.returnValue; +} +``` + +Your workflow should create the hook with the **same token shape** early (before long-running work), so the lookup succeeds once the run is registered: + +```typescript lineNumbers +import { defineHook } from "workflow"; + +export const sessionHook = defineHook(); + +export async function sessionWorkflow(chatThreadId: string) { + "use workflow"; + + const token = `session:${chatThreadId}`; + const hook = sessionHook.create({ token }); // [!code highlight] + + // ... durable session logic, await hook, etc. +} +``` + +## Concurrency and atomicity + + +This pattern is **not atomic**. Two concurrent requests can both observe “no hook” **before** the first run has written its `hook_created` event, so both can call **`start()`** and you can get **two runs** for the same stable application ID (the same derived hook token). + + +Mitigations: + +- **Accept rare duplicates** and detect them downstream (second run fails on `HookConflictError`, or you branch on run state). +- **Add an outside-workflow coordination layer** — for example a **row keyed by your stable application ID** (the same value you embed in the hook token) with a **unique constraint** that stores the authoritative **`runId`**, using **insert-if-absent**, **`INSERT … ON CONFLICT`**, or a **transaction with row-level locking** so only one concurrent handler reaches **`start()`** for that ID. A bare unique constraint on your domain table primary key does not by itself dedupe workflow runs; you need a **dedicated mapping or lease** tied to that ID. **`start()`** does not expose an idempotency-key option today (only `world`, `specVersion`, and `deploymentId` in [`StartOptions`](/docs/api-reference/workflow-api/start)). +- **Serialize creates** for the same stable ID (queue, advisory lock, or other mutual exclusion) if you must not spin up a second run even briefly. + +For replay-safe **step** side effects inside a workflow, still follow [Idempotency](/cookbook/common-patterns/idempotency) (`getStepMetadata().stepId`, external idempotency keys). + +## Key APIs + +- [`getHookByToken()`](/docs/api-reference/workflow-api/get-hook-by-token) — resolve `runId` (and metadata) from a hook token in app code. +- [`world.hooks.getByToken()`](/docs/api-reference/workflow-api/world/storage#look-up-hook-by-token) — same lookup against a `World` storage interface. +- [`start()`](/docs/api-reference/workflow-api/start) — enqueue a new run when no hook exists yet. +- [`getRun()`](/docs/api-reference/workflow-api/get-run) — attach to an existing run by `runId` (`returnValue`, streams, status). + +## Related + +- [Idempotency](/cookbook/common-patterns/idempotency) — keys for external APIs and duplicate-safe side effects inside steps. diff --git a/docs/content/docs/v5/cookbook/index.mdx b/docs/content/docs/v5/cookbook/index.mdx index 0dc1ce62ab..d3507251b3 100644 --- a/docs/content/docs/v5/cookbook/index.mdx +++ b/docs/content/docs/v5/cookbook/index.mdx @@ -22,6 +22,7 @@ A curated collection of workflow patterns with clean, copy-paste code examples f - [**Scheduling**](/cookbook/common-patterns/scheduling) — Use durable sleep to schedule actions minutes, hours, or weeks ahead - [**Timeouts**](/cookbook/common-patterns/timeouts) — Add deadlines to slow steps, hooks, and webhooks by racing them against a durable sleep - [**Idempotency**](/cookbook/common-patterns/idempotency) — Ensure side effects happen exactly once, even when steps retry +- [**Resume or start by hook token**](/cookbook/common-patterns/resume-or-start) — Encode a stable application ID in the hook token, look up an existing run with `getHookByToken` before `start()`, and understand the concurrency limits - [**Webhooks**](/cookbook/common-patterns/webhooks) — Receive HTTP callbacks from external services and process them durably ## Integrations diff --git a/docs/lib/cookbook-tree.ts b/docs/lib/cookbook-tree.ts index 2189235df8..4624ff610f 100644 --- a/docs/lib/cookbook-tree.ts +++ b/docs/lib/cookbook-tree.ts @@ -41,6 +41,7 @@ export const slugToCategory: Record = { scheduling: 'common-patterns', timeouts: 'common-patterns', idempotency: 'common-patterns', + 'resume-or-start': 'common-patterns', webhooks: 'common-patterns', // Agent Patterns @@ -119,6 +120,13 @@ export const recipes: Record = { 'Ensure external side effects happen exactly once, even when steps are retried or workflows are replayed.', category: 'common-patterns', }, + 'resume-or-start': { + slug: 'resume-or-start', + title: 'Resume or start by hook token', + description: + 'Look up an existing run with getHookByToken (or world.hooks.getByToken) before start(); encode a stable application ID (not runId) in the hook token. Pair with stronger idempotency when duplicate starts must be impossible.', + category: 'common-patterns', + }, webhooks: { slug: 'webhooks', title: 'Webhooks & External Callbacks', diff --git a/packages/world-local/src/queue.test.ts b/packages/world-local/src/queue.test.ts index 8b9d2d1e8c..3874f63648 100644 --- a/packages/world-local/src/queue.test.ts +++ b/packages/world-local/src/queue.test.ts @@ -170,7 +170,9 @@ describe('queue timeout re-enqueue', () => { }); it('logs actionable guidance for detached ArrayBuffer proxy failures', async () => { - const consoleError = vi.spyOn(console, 'error').mockImplementation(() => {}); + const consoleError = vi + .spyOn(console, 'error') + .mockImplementation(() => {}); const fetchError = new TypeError('fetch failed'); (fetchError as TypeError & { cause?: unknown }).cause = new TypeError( 'Cannot perform ArrayBuffer.prototype.slice on a detached ArrayBuffer'