diff --git a/.blog/durable-execution-layer.md b/.blog/durable-execution-layer.md new file mode 100644 index 0000000000..5df116a3f8 --- /dev/null +++ b/.blog/durable-execution-layer.md @@ -0,0 +1,155 @@ +# Workflow DevKit makes Agents durable + +## Thesis + +Production AI agents are not single HTTP requests. They are long-running programs that plan, call tools, wait on external systems, and keep internal state across dozens of decisions. + +Stateless compute fights that shape. A cold start or a timeout resets the process mid-loop. A retry replays side effects unless you build your own idempotency ledger. Teams end up rebuilding durable execution out of database rows, queues, and scheduled jobs. + +Workflow DevKit turns that pile of infrastructure back into code. You write an Agent as a workflow function. The runtime persists progress as an event log and deterministically replays the workflow to reconstruct state after failures, cold starts, or scale events. + +## Current state + +Most "production agent" stacks ship the same diagram with different logos: + +* A `agent_runs` table that stores conversation state, tool history, and a cursor. +* A queue that re-invokes the agent after every tool call. +* A cron job that scans for stuck runs, retries failed calls, and advances timers. +* Idempotency keys everywhere to avoid double-charging, double-emailing, or double-writing. + +This works, but it costs engineering time forever. Every tool integration becomes a mini state machine. Every new failure mode adds another column: `attempt`, `next_run_at`, `last_error`, `lock_owner`. The "agent" ends up split across handlers that must agree on invariants. + +Here's the pattern in code. + +**Before: DB row + queue for an Agent tool-calling loop** + +```ts +import { sql } from "./db"; +import { queue } from "./queue"; + +export async function runAgent(runId: string) { + const run = await sql`SELECT * FROM agent_runs WHERE id=${runId}`; + try { + const next = await llmPlan(run.state); + const toolOut = await callTool(next.tool, next.args, { + idempotencyKey: `${runId}:${run.step}`, + }); + await sql`UPDATE agent_runs SET state=${toolOut.state}, step=${run.step + 1} + WHERE id=${runId}`; + await queue.add("agent", { runId }, { jobId: `${runId}:${run.step + 1}` }); + } catch (err) { + await sql`UPDATE agent_runs SET retries=${run.retries + 1}, last_error=${String(err)} + WHERE id=${runId}`; + await queue.add("agent", { runId }, { delay: backoff(run.retries) }); + } +} +``` + +The code above "works" until it doesn't. You now own locking, exactly-once semantics, backoff, and recovery. Any bug that advances `step` at the wrong time corrupts the run. Any mismatch between the stored cursor and the tool history produces duplicated tool calls. + +## The shift + +Durable execution flips the control plane. Instead of persisting *state* and reconstructing control flow, you persist *control flow* and reconstruct state. + +Workflow DevKit records every side effect boundary as an event. When the workflow restarts, the runtime replays the workflow from the top in a deterministic sandbox and feeds it the same event stream. Completed steps return their recorded results. Pending steps suspend the workflow and get scheduled. The workflow code stays readable because it is still just async TypeScript. + +**After: the same Agent as a workflow with steps** + +```ts +type AgentState = { messages: string[]; done: boolean }; + +async function llmPlan(state: AgentState) { + 'use step'; + return decideNextAction(state.messages); +} +async function callTool(name: string, args: unknown) { + 'use step'; + return tools[name](args); +} + +export async function agentLoop(initial: AgentState) { + 'use workflow'; + let state = initial; + while (state.done === false) { + const plan = await llmPlan(state); + state = await callTool(plan.tool, plan.args); + } + return state; +} +``` + +The pain disappears because you stopped simulating a runtime in tables. The workflow function is the state machine. The durable log is the source of truth. Retries stop being a cross-cutting concern you re-implement for every tool. + +## The vision + +Agents need four things that plain serverless does not provide: + +1. **State across tool calls.** The agent has to remember what already happened. +2. **Selective retries.** A transient failure should retry one tool call, not the entire run. +3. **Parallel execution.** Agents fan out: retrieval + enrichment + verification. +4. **Long waits.** Human-in-the-loop and external systems do not fit in a 10-60 second timeout. + +Workflow DevKit maps those directly onto existing JavaScript primitives: + +* Use local variables for state. The runtime reconstructs them by replay. +* Use `FatalError` and `RetryableError` inside steps to control retry and backoff. +* Use `Promise.all()` and `Promise.race()` in workflows for fanout and competition. +* Use `sleep()` for durable delays and hooks to pause until an external event arrives. + +That last pair matters for agents because "waiting" is normal. A workflow can suspend while it waits for a webhook, a human approval, or an upstream batch job. The runtime resumes the workflow when the event shows up, without you writing a scheduler. + +Retries are the other place teams burn weeks. The usual solution is a cron-driven state machine that retries failed calls and advances a `next_retry_at` timestamp. + +**Before: cron + state machine retry for flaky API calls** + +```ts +import { sql } from "./db"; + +export async function retryCron() { + const jobs = await sql`SELECT * FROM api_calls + WHERE status='retry' AND run_at < now() + LIMIT 100`; + for (const job of jobs.rows) { + const res = await fetch(job.url, { method: "POST", body: job.body }); + const status = res.status < 500 ? "done" : "retry"; + await sql`UPDATE api_calls SET status=${status}, attempts=${job.attempts + 1}, + run_at=${nextRunAt(job.attempts)} WHERE id=${job.id}`; + } +} +``` + +That code turns "retry an HTTP call" into an operational subsystem. The database becomes a task scheduler. The cron job becomes a reliability layer. + +**After: RetryableError inside a step** + +```ts +import { FatalError, RetryableError } from "workflow"; + +async function postInvoice(id: string) { + 'use step'; + const origin = process.env.INVOICE_API_ORIGIN ?? ""; + const res = await fetch(`${origin}/invoices/${id}`, { method: "POST" }); + if (res.status >= 500) throw new RetryableError("invoice API 5xx", { retryAfter: "30s" }); + if (res.ok === false) throw new FatalError(`invoice API ${res.status}`); + return res.json(); +} + +export async function invoiceAgent(id: string) { + 'use workflow'; + return await postInvoice(id); +} +``` + +The step throws a structured error. The runtime persists that failure, schedules a retry with backoff, and replays the workflow without re-running completed work. + +## Next steps + +Treat "Agent" as a workflow boundary, not a request handler. Keep the workflow deterministic and push I/O into steps. If a piece of code needs the network, the filesystem, or a timer, it belongs in a step. + +Start small. Pick one agent loop that currently writes state to a database and triggers itself via a queue. Move the loop into a workflow function. Wrap each tool call in a step function. Replace cron-based retry with `RetryableError` and durable `sleep()`. + +Run the workflow locally, then inspect the event log and step timeline. + +```bash +npx -y -p @workflow/cli wf inspect runs +``` diff --git a/.blog/how-deterministic-replay-works-for-ai-agents.md b/.blog/how-deterministic-replay-works-for-ai-agents.md new file mode 100644 index 0000000000..a8561a6925 --- /dev/null +++ b/.blog/how-deterministic-replay-works-for-ai-agents.md @@ -0,0 +1,161 @@ +# How Workflow DevKit executes Agents with deterministic replay + +## Problem + +An Agent that calls tools is a distributed system in a single function body. It crosses process boundaries every time it waits on the network, hits a timeout, or gets retried by the platform. Stateless retries re-run code, not intent. + +The usual mitigation is "checkpoint everything." After every tool call you write a cursor and a blob of state to durable storage. On restart you read the checkpoint and try to reconstruct what happened. This approach turns agent code into a database-backed interpreter. + +## Approach + +Workflow DevKit splits Agent code into two execution models: + +* **Workflow functions** (`'use workflow'`) run inside a sandboxed VM. They orchestrate control flow, hold state in local variables, and stay deterministic. +* **Step functions** (`'use step'`) run with full Node.js access. They perform side effects: network calls, SDKs, file I/O, crypto, and timers. + +The runtime persists every step boundary as an event in an append-only log. When the workflow runs again, it replays the workflow from the top, feeds it the same event stream, and returns recorded results for completed steps. Only missing or failed steps execute. + +That design targets the failure modes that break agents in production: cold starts mid-conversation, platform timeouts, partial success in parallel fanout, and flaky tool calls. + +## Implementation details + +### Build-time split: workflow bundle vs step bundle + +A workflow file contains both orchestrator code and side-effecting code. Workflow DevKit's build pipeline uses an SWC transform to recognize the `'use workflow'` / `'use step'` directives and split them into separate bundles. + +That split is what makes the runtime model crisp: orchestrators run in a deterministic VM, and steps run in normal Node.js. You still write a single file. + +### Determinism in the workflow VM + +The workflow VM runs under constraints that make replay reliable: + +* `Math.random()` is seeded per workflow run. +* `Date.now()` is fixed and advanced based on event timestamps during replay. +* `crypto.getRandomValues()` and `crypto.randomUUID()` are deterministic. +* `process.env` is copied and frozen. +* Timer APIs (`setTimeout`, `setInterval`, `setImmediate`) throw. Use durable `sleep()` instead. +* Global `fetch` is blocked in workflows. Put network I/O in steps. + +This matters for agents because non-determinism breaks replay. If the orchestrator reads "now" or random data to decide which tool to call, it must see the same values on every replay. + +### Event log + suspension + +A workflow run consumes an ordered event stream. When the workflow hits an awaited step, it looks for events with the step's correlation id: + +* `step_created` confirms the step exists. +* `step_started`, `step_retrying`, `step_completed`, `step_failed` drive resolution. +* `wait_created` / `wait_completed` back durable `sleep()`. +* `hook_created` and hook completion events back external resumes. + +When an awaited step has no matching event yet, the workflow throws a `WorkflowSuspension`. The suspension carries a queue of pending invocations (steps, waits, hooks). The runtime handler persists the missing `*_created` events and enqueues step executions with an idempotency key equal to the correlation id. + +The workflow stops at that point. Step workers run, append completion or retry events, and re-enqueue the workflow. On the next replay, the workflow re-runs the same code and picks up exactly where it left off. + +### Built-in retries at the step boundary + +Step execution owns retries. A step can fail in three ways: + +* Throw `FatalError` to fail the step and bubble the error to the workflow. +* Throw `RetryableError` to retry with an explicit `retryAfter`. +* Throw any other error to retry with the default policy, up to `maxRetries` (default is 3). + +Retries do not re-run completed steps. The event log preserves the successful work and the orchestrator replays it. + +## Code patterns + +### Crash recovery without checkpoints + +**Before: manual checkpoint writes and cursor recovery** + +```ts +import { sql } from "./db"; + +export async function agentHandler(runId: string) { + const run = await sql`SELECT cursor, state FROM agent_runs WHERE id=${runId}`; + let { cursor, state } = run.rows[0]; + while (cursor < state.plan.length) { + const out = await tools[state.plan[cursor]](state); + cursor += 1; + state = { ...state, out }; + await sql`UPDATE agent_runs SET cursor=${cursor}, state=${state} WHERE id=${runId}`; + } + return state; +} +``` + +This is a checkpointed interpreter. Every loop iteration writes to storage so the next invocation can reconstruct progress. + +**After: deterministic replay, no explicit checkpoints** + +```ts +async function runTool(name: string, input: unknown) { + 'use step'; + return tools[name](input); +} + +export async function agentRun(plan: { name: string }[], initial: unknown) { + 'use workflow'; + let state = initial; + for (const action of plan) state = await runTool(action.name, state); + return state; +} +``` + +The workflow stores state in local variables. The runtime reconstructs those variables on replay by feeding recorded step results back into the same loop. + +### Parallel fanout without bespoke orchestration + +Agents fan out to keep latency bounded: search + fetch + summarize in parallel. The hard part is partial success. One branch can succeed while another fails, and a stateless retry re-executes both unless you persist per-branch outputs. + +**Before: custom fanout bookkeeping to avoid redoing work** + +```ts +import { sql } from "./db"; + +export async function fanout(runId: string) { + await sql`UPDATE runs SET status='running' WHERE id=${runId}`; + const [a, b] = await Promise.allSettled([callA(), callB()]); + if (a.status === "fulfilled") await sql`UPDATE runs SET a=${a.value} WHERE id=${runId}`; + if (b.status === "fulfilled") await sql`UPDATE runs SET b=${b.value} WHERE id=${runId}`; + if (a.status === "rejected" || b.status === "rejected") throw new Error("retry later"); + return { a: a.value, b: b.value }; +} +``` + +You persist intermediate results because the platform does not. + +**After: Promise.all over durable steps** + +```ts +async function fetchA() { + 'use step'; + return callA(); +} +async function fetchB() { + 'use step'; + return callB(); +} + +export async function fanoutWorkflow() { + 'use workflow'; + const [a, b] = await Promise.all([fetchA(), fetchB()]); + return { a, b }; +} +``` + +Each step has its own event history and retry policy. If `fetchB()` fails and retries, `fetchA()` replays from its `step_completed` event without re-executing. + +## Results + +Workflow DevKit moves agent reliability into the runtime instead of your app code: + +* Cold starts and timeouts resume from the event log, not from ad hoc checkpoints. +* Tool-call retries are selective. Completed steps return recorded results. +* Parallel fanout uses ordinary `Promise.all()` with independent step retries. +* Long waits become first-class via durable `sleep()` and hook-based resume. + +The operational surface area shrinks. You stop maintaining a queue protocol, a scheduler, and a state machine schema per agent. + +```bash +npx -y -p @workflow/cli wf inspect runs --limit 10 +``` diff --git a/.blog/stateless-retries-multiply-agent-side-effects.md b/.blog/stateless-retries-multiply-agent-side-effects.md new file mode 100644 index 0000000000..97ff7a1d21 --- /dev/null +++ b/.blog/stateless-retries-multiply-agent-side-effects.md @@ -0,0 +1,134 @@ +# Workflow DevKit keeps Agents' tool-call volume linear under retries + +## Headline finding + +Stateless retries turn an Agent's tool calls into repeated work. As the number of tool calls per run grows, the expected number of executed calls grows faster than linearly because a single failure forces a full replay of the prefix. + +Workflow DevKit changes the unit of retry. The workflow replays deterministically, but completed steps return recorded results. A transient failure retries one step, not the entire Agent turn. + +## Methodology + +Model an Agent run as `N` sequential tool calls. Each call fails transiently with probability `p` and succeeds with probability `q = 1 - p`. + +Compare two retry strategies: + +* **Stateless retry:** a failure restarts the whole run from tool call 1. +* **Durable step retry:** a failure retries only the failed call; prior successful calls do not re-execute. + +This isolates the retry surface area. It does not assume anything about the LLM or tools beyond an independent per-call failure rate. + +## Data + +With stateless retry, the run completes only after it achieves `N` consecutive successful calls. The expected number of executed calls is: + +`E_stateless = (1 - q^N) / (p * q^N)` + +With durable step retry, each call is a geometric retry until success, so: + +`E_durable = N / q` + +Concrete numbers: + +* `p = 0.02`, `N = 40`: stateless `62.2` calls vs durable `40.8` calls (1.52x). +* `p = 0.05`, `N = 20`: stateless `35.8` calls vs durable `21.1` calls (1.70x). +* `p = 0.10`, `N = 40`: stateless `666.5` calls vs durable `44.4` calls (15.0x). + +The ratio compounds because stateless retry forces the run to finish the entire chain without a single transient failure. Durable steps turn that into independent retries per call. + +## Core insight + +In agent workloads, the expensive part is not the control flow. It is the tool boundary: API calls, database writes, emails, payments, rate-limited endpoints. Stateless retry replays those boundaries unless the application builds its own ledger of what already executed. + +That ledger is the same thing a durable runtime provides: an event log keyed by stable correlation ids. Workflow DevKit already emits a correlation id per step and records its lifecycle (`created`, `started`, `retrying`, `completed`, `failed`). Replay rehydrates the workflow and returns step results without re-executing successful calls. + +## Practical takeaway + +Use durable steps for every side-effecting tool call. Keep the workflow function deterministic and let the runtime handle replay and selective retry. If a tool supports idempotency keys, derive the key from the step correlation id instead of inventing your own scheme. + +### Stateless retry duplicates work + +**Before: retrying an Agent turn replays the full prefix** + +```ts +export async function agentTurn(input: Input) { + for (let attempt = 1; attempt <= 5; attempt += 1) { + try { + const a = await toolA(input); + const b = await toolB(a); + const c = await toolC(b); + return { a, b, c }; + } catch (err) { + if (attempt === 5) throw err; + await sleepMs(1000 * attempt); + } + } + throw new Error("unreachable"); +} +``` + +**After: durable steps replay successful calls and retry only the failed one** + +```ts +import { RetryableError } from "workflow"; + +async function toolA(input: Input) { 'use step'; return callA(input); } +async function toolB(a: A) { 'use step'; return callB(a); } +async function toolC(b: B) { + 'use step'; + const res = await callC(b); + if (res.transient === true) throw new RetryableError("toolC transient", { retryAfter: "2s" }); + return res; +} + +export async function agentTurn(input: Input) { + 'use workflow'; + const a = await toolA(input); + const b = await toolB(a); + return await toolC(b); +} +``` + +### Stop managing idempotency keys by hand + +**Before: generating and persisting idempotency keys across retries** + +```ts +import { sql } from "./db"; +import { randomUUID } from "crypto"; + +export async function purchase(runId: string, userId: string) { + const row = await sql`SELECT charge_key, email_key FROM runs WHERE id=${runId}`; + const chargeKey = row.charge_key ?? randomUUID(); + const emailKey = row.email_key ?? randomUUID(); + await sql`UPDATE runs SET charge_key=${chargeKey}, email_key=${emailKey} WHERE id=${runId}`; + await stripe.charges.create({ amount: 499, customer: userId }, { idempotencyKey: chargeKey }); + await sendReceiptEmail(userId, { idempotencyKey: emailKey }); +} +``` + +**After: use the step correlation id as the idempotency key** + +```ts +import { getStepMetadata } from "workflow"; + +async function chargeCard(userId: string, amount: number) { + 'use step'; + const { stepId } = getStepMetadata(); + return stripe.charges.create({ amount, customer: userId }, { idempotencyKey: stepId }); +} +async function sendReceipt(userId: string) { + 'use step'; + const { stepId } = getStepMetadata(); + await mailer.sendReceipt({ userId }, { idempotencyKey: stepId }); +} + +export async function purchase(userId: string) { + 'use workflow'; + await chargeCard(userId, 499); + await sendReceipt(userId); +} +``` + +```bash +npx -y -p @workflow/cli wf inspect runs +``` diff --git a/docs/content/docs/meta.json b/docs/content/docs/meta.json index 218fd4fa51..afcbce4849 100644 --- a/docs/content/docs/meta.json +++ b/docs/content/docs/meta.json @@ -7,6 +7,7 @@ "how-it-works", "observability", "ai", + "recipes", "deploying", "errors", "api-reference" diff --git a/docs/content/docs/recipes/error-monitoring.mdx b/docs/content/docs/recipes/error-monitoring.mdx new file mode 100644 index 0000000000..095fb832fc --- /dev/null +++ b/docs/content/docs/recipes/error-monitoring.mdx @@ -0,0 +1,395 @@ +--- +title: Error Monitoring & Alerting +description: Build workflows that triage errors, process alerts, and dispatch notifications across channels. +type: guide +summary: Classify errors by severity, deduplicate alerts, and fan out notifications to multiple channels. +prerequisites: + - /docs/foundations/workflows-and-steps + - /docs/foundations/errors-and-retries +related: + - /docs/foundations/hooks + - /docs/foundations/common-patterns + - /docs/api-reference/workflow/create-webhook +--- + +This guide covers building workflows whose purpose is to monitor external systems, classify errors, and route alerts. For handling errors that occur _inside_ your own workflows, see [Errors & Retrying](/docs/foundations/errors-and-retries). + +Error monitoring is one of the most common workflow use cases. A typical pipeline receives error events from external systems, classifies them, deduplicates repeat occurrences, and dispatches notifications to the right channels. Workflows are a natural fit because they survive failures, retry flaky notification APIs, and maintain state across long-running monitoring loops. + +## Error Triage Workflow + +The simplest error monitoring workflow receives an error event via webhook, classifies it by severity, and routes it to the appropriate handler. + +```typescript title="workflows/error-triage.ts" lineNumbers +import { createWebhook } from "workflow"; + +interface ErrorEvent { + source: string; + message: string; + stack?: string; + metadata?: Record; +} + +async function classifyError(event: ErrorEvent) { + "use step"; + + // Classify based on error patterns + if (event.message.includes("FATAL") || event.message.includes("OOM")) { + return "critical" as const; + } + if (event.message.includes("timeout") || event.message.includes("rate limit")) { + return "warning" as const; + } + return "info" as const; +} + +async function handleCritical(event: ErrorEvent) { // [!code highlight] + "use step"; + // Page on-call, create incident ticket, etc. + console.log(`CRITICAL: ${event.source} - ${event.message}`); +} + +async function handleWarning(event: ErrorEvent) { + "use step"; + // Post to team Slack channel + console.log(`WARNING: ${event.source} - ${event.message}`); +} + +async function handleInfo(event: ErrorEvent) { + "use step"; + // Log for later review + console.log(`INFO: ${event.source} - ${event.message}`); +} + +export async function errorTriageWorkflow() { + "use workflow"; + + const webhook = createWebhook(); // [!code highlight] + console.log("Listening for errors at:", webhook.url); + + for await (const request of webhook) { // [!code highlight] + const event: ErrorEvent = await request.json(); + const severity = await classifyError(event); + + if (severity === "critical") { + await handleCritical(event); + } else if (severity === "warning") { + await handleWarning(event); + } else { + await handleInfo(event); + } + } +} +``` + +The workflow creates a persistent webhook endpoint. External systems POST error events to it. Each event is classified in a step (with full Node.js access for pattern matching, database lookups, or ML inference), then routed to the correct handler. Because the webhook uses `for await...of`, the workflow stays alive and processes errors as they arrive. + + +Webhooks implement `AsyncIterable`, so a single workflow instance can process an unlimited stream of events over time. See [Hooks & Webhooks](/docs/foundations/hooks) for details on iteration and custom tokens. + + +## Alert Processing Pipeline + +Real alert pipelines need deduplication. When the same error fires hundreds of times in a minute, you want one alert, not hundreds. Use custom hook tokens to route duplicate events to the same workflow instance. + +```typescript title="workflows/alert-pipeline.ts" lineNumbers +import { createHook } from "workflow"; + +interface Alert { + alertId: string; + source: string; + message: string; + timestamp: number; +} + +interface EnrichedAlert extends Alert { + service: string; + owner: string; + runbook: string; +} + +async function enrichAlert(alert: Alert): Promise { + "use step"; + + // Look up service metadata from your registry + const service = alert.source.split("/")[0]; + return { + ...alert, + service, + owner: `team-${service}`, + runbook: `https://runbooks.internal/${service}/${alert.alertId}`, + }; +} + +async function dispatchNotification(alert: EnrichedAlert) { + "use step"; + + await fetch("https://hooks.slack.com/services/...", { + method: "POST", + body: JSON.stringify({ + text: `[${alert.source}] ${alert.message}\nOwner: ${alert.owner}\nRunbook: ${alert.runbook}`, + }), + }); +} + +export async function alertPipelineWorkflow(alertId: string) { // [!code highlight] + "use workflow"; + + // Custom token ensures duplicate alerts route here + const hook = createHook({ token: `alert:${alertId}` }); // [!code highlight] + + // Process the first alert + const alert = await hook; + const enriched = await enrichAlert(alert); + await dispatchNotification(enriched); +} +``` + +The key pattern here is the custom hook token. When your ingestion layer receives an alert, it can use [`resumeHook()`](/docs/api-reference/workflow-api/resume-hook) to send the payload to a workflow keyed by `alertId`. If the workflow is already running for that alert, the event is delivered to the existing instance. This gives you natural deduplication: one workflow per unique alert. + +```typescript title="app/api/alerts/route.ts" lineNumbers +import { start } from "workflow/api"; +import { resumeHook } from "workflow/api"; +declare function alertPipelineWorkflow(alertId: string): Promise; // @setup + +export async function POST(request: Request) { + const alert = await request.json(); + + // Start workflow for new alerts, or deliver to existing one + await start(alertPipelineWorkflow, [alert.alertId]); // [!code highlight] + await resumeHook(`alert:${alert.alertId}`, alert); // [!code highlight] + + return new Response("OK"); +} +``` + +## Real-Time Alert Dispatch + +When a critical event needs immediate attention, fan out notifications to multiple channels in parallel using `Promise.all`. Each channel is its own step, so a failure in one (e.g., Slack API is down) does not block the others, and each is retried independently. + +```typescript title="workflows/instant-alert.ts" lineNumbers +import { createWebhook } from "workflow"; + +interface CriticalEvent { + title: string; + description: string; + severity: "P1" | "P2"; + source: string; +} + +async function sendSlackAlert(event: CriticalEvent) { + "use step"; + + await fetch("https://hooks.slack.com/services/...", { + method: "POST", + body: JSON.stringify({ + text: `*${event.severity}: ${event.title}*\n${event.description}`, + }), + }); +} + +async function sendEmailAlert(event: CriticalEvent) { + "use step"; + + await fetch("https://api.sendgrid.com/v3/mail/send", { + method: "POST", + headers: { Authorization: `Bearer ${process.env.SENDGRID_KEY}` }, + body: JSON.stringify({ + to: "oncall@example.com", + subject: `${event.severity}: ${event.title}`, + text: event.description, + }), + }); +} + +async function createPagerDutyIncident(event: CriticalEvent) { + "use step"; + + await fetch("https://events.pagerduty.com/v2/enqueue", { + method: "POST", + body: JSON.stringify({ + routing_key: process.env.PAGERDUTY_KEY, + event_action: "trigger", + payload: { + summary: `${event.severity}: ${event.title}`, + source: event.source, + severity: event.severity === "P1" ? "critical" : "error", + }, + }), + }); +} + +export async function instantAlertWorkflow() { + "use workflow"; + + const webhook = createWebhook(); + + const request = await webhook; + const event: CriticalEvent = await request.json(); + + // Fan out to all channels in parallel + await Promise.all([ // [!code highlight] + sendSlackAlert(event), // [!code highlight] + sendEmailAlert(event), // [!code highlight] + createPagerDutyIncident(event), // [!code highlight] + ]); // [!code highlight] +} +``` + +Because each notification is a separate step, the framework retries failures independently. If PagerDuty returns a 500, Slack and email still succeed, and the PagerDuty step retries on its own schedule. + +## External System Monitoring + +Not all monitoring is event-driven. Sometimes you need to poll external systems on a schedule. Use [`sleep()`](/docs/api-reference/workflow/sleep) in a loop to create a durable polling workflow that survives restarts and cold starts. + +```typescript title="workflows/monitor-service.ts" lineNumbers +import { sleep } from "workflow"; + +interface ServiceStatus { + healthy: boolean; + latency: number; + errorRate: number; +} + +async function checkServiceHealth(endpoint: string): Promise { + "use step"; + + const start = Date.now(); + const response = await fetch(endpoint); + const latency = Date.now() - start; + + return { + healthy: response.ok, + latency, + errorRate: response.ok ? 0 : 1, + }; +} + +async function sendAlert(service: string, status: ServiceStatus) { + "use step"; + + await fetch("https://hooks.slack.com/services/...", { + method: "POST", + body: JSON.stringify({ + text: `Service ${service} is unhealthy. Latency: ${status.latency}ms`, + }), + }); +} + +export async function monitorServiceWorkflow( + service: string, + endpoint: string +) { + "use workflow"; + + let consecutiveFailures = 0; + + while (true) { // [!code highlight] + const status = await checkServiceHealth(endpoint); + + if (!status.healthy) { + consecutiveFailures++; + if (consecutiveFailures >= 3) { + await sendAlert(service, status); + consecutiveFailures = 0; + } + } else { + consecutiveFailures = 0; + } + + await sleep("5m"); // [!code highlight] + } +} +``` + +The `sleep("5m")` call is durable - if the workflow process restarts during the sleep, it resumes at the correct time without re-running previous checks. The `while (true)` loop runs indefinitely, checking the service every 5 minutes and alerting after 3 consecutive failures. + + +`sleep()` accepts duration strings like `"5m"`, `"1h"`, or `"30s"`, as well as `Date` objects for sleeping until a specific time. See the [`sleep()` API reference](/docs/api-reference/workflow/sleep) for all supported formats. + + +## Content Security Scanning + +Workflows can also monitor content against security or policy rules. This pattern receives content via webhook, scans it in a step, and takes action on violations. + +```typescript title="workflows/content-security.ts" lineNumbers +import { createWebhook } from "workflow"; + +interface ContentEvent { + contentId: string; + body: string; + author: string; + type: "post" | "comment" | "message"; +} + +interface ScanResult { + passed: boolean; + violations: string[]; +} + +async function scanContent(event: ContentEvent): Promise { + "use step"; + + const violations: string[] = []; + + // Check against policy rules + const blockedPatterns = [/credential/i, /api[_-]?key/i, /password\s*=/i]; + for (const pattern of blockedPatterns) { + if (pattern.test(event.body)) { + violations.push(`Blocked pattern: ${pattern.source}`); + } + } + + return { passed: violations.length === 0, violations }; +} + +async function quarantineContent(contentId: string, violations: string[]) { + "use step"; + + // Move content to review queue + await fetch("https://api.internal/content/quarantine", { + method: "POST", + body: JSON.stringify({ contentId, violations }), + }); +} + +async function notifySecurityTeam(event: ContentEvent, result: ScanResult) { + "use step"; + + await fetch("https://hooks.slack.com/services/...", { + method: "POST", + body: JSON.stringify({ + text: `Content violation in ${event.type} by ${event.author}: ${result.violations.join(", ")}`, + }), + }); +} + +export async function contentSecurityWorkflow() { + "use workflow"; + + const webhook = createWebhook(); + + for await (const request of webhook) { + const event: ContentEvent = await request.json(); + const result = await scanContent(event); // [!code highlight] + + if (!result.passed) { // [!code highlight] + await Promise.all([ + quarantineContent(event.contentId, result.violations), + notifySecurityTeam(event, result), + ]); + } + } +} +``` + +The scanning step has full Node.js access, so it can call external scanning APIs, run regex-based rules, or invoke ML models. When a violation is found, the workflow quarantines the content and notifies the security team in parallel. + +## Related Documentation + +- [Errors & Retrying](/docs/foundations/errors-and-retries) - Handle errors inside your own steps with retry semantics +- [Hooks & Webhooks](/docs/foundations/hooks) - Deep dive on hooks, webhooks, and custom tokens +- [Common Patterns](/docs/foundations/common-patterns) - Sequential, parallel, timeout, and composition patterns +- [`createWebhook()` API Reference](/docs/api-reference/workflow/create-webhook) - Full webhook API documentation +- [`createHook()` API Reference](/docs/api-reference/workflow/create-hook) - Full hook API documentation +- [`sleep()` API Reference](/docs/api-reference/workflow/sleep) - Sleep and scheduling API diff --git a/docs/content/docs/recipes/index.mdx b/docs/content/docs/recipes/index.mdx new file mode 100644 index 0000000000..6d4295c6b2 --- /dev/null +++ b/docs/content/docs/recipes/index.mdx @@ -0,0 +1,17 @@ +--- +title: Recipes +description: Production-ready patterns and workflows you can adapt for your own applications. +type: overview +summary: Build real-world workflows using proven patterns from production use cases. +related: + - /docs/foundations/workflows-and-steps + - /docs/foundations/common-patterns +--- + +Recipes are practical, end-to-end guides that show how to build complete workflows for common use cases. Each recipe builds on the [Foundations](/docs/foundations) and demonstrates patterns you can adapt for your own applications. + + + + Build workflows that triage errors, deduplicate alerts, and dispatch notifications across channels. + + diff --git a/docs/content/docs/recipes/meta.json b/docs/content/docs/recipes/meta.json new file mode 100644 index 0000000000..d518eaec7d --- /dev/null +++ b/docs/content/docs/recipes/meta.json @@ -0,0 +1,4 @@ +{ + "title": "Recipes", + "pages": ["error-monitoring"] +} diff --git a/packages/next/README.md b/packages/next/README.md index bb8604d934..dbd1e8ea1f 100644 --- a/packages/next/README.md +++ b/packages/next/README.md @@ -1,3 +1,199 @@ # @workflow/next -Next.js plugin for [Workflow DevKit](https://useworkflow.dev). +Next.js integration for Workflow DevKit. + +## Install + +```bash +npm install workflow next +# or +pnpm add workflow next +# or +yarn add workflow next +# or +bun add workflow next +``` + +`next` is a peer dependency. `workflow` includes this package as `workflow/next`. + +## Usage + +Wrap your Next config with `withWorkflow()`. + +```ts +import type { NextConfig } from 'next'; +import { withWorkflow } from '@workflow/next'; + +const nextConfig: NextConfig = { + // your Next.js config +}; + +export default withWorkflow(nextConfig); +``` + +### Type signature + +```ts +import type { NextConfig } from 'next'; + +export declare function withWorkflow( + nextConfigOrFn: + | NextConfig + | (( + phase: string, + ctx: { defaultConfig: NextConfig } + ) => Promise), + { + workflows, + }?: { + workflows?: { + local?: { + port?: number; + dataDir?: string; + }; + }; + } +): ( + phase: string, + ctx: { defaultConfig: NextConfig } +) => Promise; +``` + +### Example: object config + +```ts +import type { NextConfig } from 'next'; +import { withWorkflow } from '@workflow/next'; + +const nextConfig: NextConfig = { + reactStrictMode: true, +}; + +export default withWorkflow(nextConfig, { + workflows: { + local: { + port: 3152, + }, + }, +}); +``` + +### Example: async config function + +```ts +import type { NextConfig } from 'next'; +import { withWorkflow } from '@workflow/next'; + +export default withWorkflow(async (phase, { defaultConfig }) => { + const nextConfig: NextConfig = { + ...defaultConfig, + reactStrictMode: true, + }; + + if (phase === 'phase-production-build') { + nextConfig.productionBrowserSourceMaps = true; + } + + return nextConfig; +}); +``` + +## What `withWorkflow()` does + +When you wrap your config, `withWorkflow()`: + +1. Sets runtime defaults for local and Vercel worlds. +2. Registers the Workflow loader in both Turbopack and webpack. +3. Builds generated workflow routes in `.well-known/workflow/v1/*`. +4. Watches source files in development and incrementally rebuilds bundles. +5. Avoids duplicate builder runs per process using `WORKFLOW_NEXT_PRIVATE_BUILT`. + +## Environment variables + +| Variable | Used by | Behavior | +| --- | --- | --- | +| `WORKFLOW_TARGET_WORLD` | `withWorkflow()` + runtime world selection | If not set: defaults to `local` when not on Vercel, and `vercel` when `VERCEL_DEPLOYMENT_ID` is present. | +| `WORKFLOW_LOCAL_DATA_DIR` | Local world runtime | Set to `.next/workflow-data` by `withWorkflow()` when defaulting to local world. You can override it explicitly in your environment. | +| `PORT` | Next dev/build process | Set from `workflows.local.port` when running outside Vercel. | +| `WORKFLOW_NEXT_PRIVATE_BUILT` | `withWorkflow()` internals | Internal guard to ensure builder setup runs once per main process. | +| `WORKFLOW_PUBLIC_MANIFEST` | Builder/public output | When set to `1`, copies `manifest.json` to `public/.well-known/workflow/v1/manifest.json` so Next serves it publicly. | +| `WATCHPACK_WATCHER_LIMIT` | Watch mode on macOS | Set to `20` during dev watch mode on Darwin to mitigate slow watcher teardown behavior. | + +## Package exports + +| Export path | Description | +| --- | --- | +| `@workflow/next` | Main Next integration export. Provides `withWorkflow()`. | +| `@workflow/next/loader` | Loader that applies Workflow client-mode transforms for `"use workflow"` and `"use step"`. | +| `@workflow/next/runtime` | Re-export of `@workflow/core/dist/runtime` for runtime compatibility. | + +If you install the umbrella `workflow` package, these are available from `workflow/next` and related subpaths. + +## Generated `.well-known/workflow/v1/*` files + +`@workflow/next` generates these files under your app directory (`app/` or `src/app/`): + +| File | Purpose | Public route | +| --- | --- | --- | +| `.well-known/workflow/v1/flow/route.js` | Workflow orchestration handler bundle. | `POST /.well-known/workflow/v1/flow` | +| `.well-known/workflow/v1/step/route.js` | Step execution handler bundle. | `POST /.well-known/workflow/v1/step` | +| `.well-known/workflow/v1/webhook/[token]/route.js` | Webhook delivery handler bundle. | `POST /.well-known/workflow/v1/webhook/:token` | +| `.well-known/workflow/v1/manifest.json` | Workflow/step/class manifest (with graph metadata). | Not public unless `WORKFLOW_PUBLIC_MANIFEST=1` | +| `.well-known/workflow/v1/config.json` | Production function trigger config for Next build output. | Internal build artifact | +| `.well-known/workflow/v1/.gitignore` | Prevents committing generated artifacts. | N/A | + +If your app uses `pages/` only, the builder creates a sibling `app/` (or `src/app/`) directory for generated routes. + +## How generated files work at runtime + +1. Your app calls `start()` with a transformed workflow function. +2. Runtime posts to `/.well-known/workflow/v1/flow` to advance orchestration. +3. Steps execute through `/.well-known/workflow/v1/step`. +4. Webhook resumptions arrive through `/.well-known/workflow/v1/webhook/:token`. +5. Manifest metadata is used by tooling and can be exposed for observability. + +## Serving the manifest publicly + +To expose the manifest over HTTP, set: + +```bash +WORKFLOW_PUBLIC_MANIFEST=1 +``` + +On build, `@workflow/next` copies: + +- From: `app/.well-known/workflow/v1/manifest.json` (or `src/app/...`) +- To: `public/.well-known/workflow/v1/manifest.json` + +Next.js then serves it at: + +- `/.well-known/workflow/v1/manifest.json` + +## Troubleshooting + +### `'start' received an invalid workflow function` + +- Ensure your workflow function has `"use workflow"`. +- Ensure step functions use `"use step"` where required. +- Ensure `next.config.*` is wrapped with `withWorkflow()`. + +### Workflow routes return 404 + +- Confirm one of these exists: `app/`, `src/app/`, `pages/`, or `src/pages/`. +- Confirm generated files exist under `.well-known/workflow/v1/*`. +- If using a Next proxy handler, exclude `/.well-known/workflow/` paths. + +### Manifest route is missing + +- Set `WORKFLOW_PUBLIC_MANIFEST=1` before running/building. +- Rebuild so `manifest.json` is copied into `public/.well-known/workflow/v1/`. + +### Next.js 16.1+ build error + +If you see: + +```text +Error: Cannot find module 'next/dist/lib/server-external-packages.json' +``` + +Upgrade to `workflow@4.0.1-beta.26` or newer. diff --git a/packages/swc-plugin-workflow/spec.md b/packages/swc-plugin-workflow/spec.md index a697ad9b8e..dddeb78866 100644 --- a/packages/swc-plugin-workflow/spec.md +++ b/packages/swc-plugin-workflow/spec.md @@ -576,7 +576,10 @@ registerSerializationClass("class//./input//Point", Point); ## Static Methods -Static class methods can be marked with directives. Instance methods are **not supported**. +Static class methods can be marked with directives. Instance methods are supported for `"use step"` (with custom serialization), but `"use workflow"` is only supported on static methods. + +- `"use step"`: supported on **static** and **instance** methods (instance methods require custom serialization). +- `"use workflow"`: supported on **static** methods only (instance methods are rejected). ### Static Step Method @@ -795,6 +798,8 @@ Files containing classes with custom serialization are automatically discovered This allows serialization classes to be defined in separate files (such as Next.js API routes or utility modules) and still be registered in the serialization system when the application is built. +> **Compatibility note:** For auto-discovery of serialization-only files, prefer importing `WORKFLOW_SERIALIZE` / `WORKFLOW_DESERIALIZE` from `@workflow/serde` consistently. If you import these symbols from `@vercel/workflow` in a file that contains only serialization classes (no `"use step"`/`"use workflow"`), the file may not match the discovery heuristic. If you must use `@vercel/workflow`, either use `Symbol.for("workflow-serialize"/"workflow-deserialize")` directly or ensure the file also contains a workflow directive so it is transformed. + ### Cross-Context Class Registration Classes with custom serialization are automatically included in **all bundle contexts** (step, workflow, client) to ensure they can be properly serialized and deserialized when crossing execution boundaries: @@ -850,6 +855,9 @@ The plugin emits errors for invalid usage: |-------|-------------| | Non-async function | Functions with `"use step"` or `"use workflow"` must be async | | Instance methods with `"use workflow"` | Only static methods can have `"use workflow"` (not instance methods) | +| Forbidden `this` in step function | Step functions cannot reference `this` (they are hoisted and executed out of instance context). | +| Forbidden `arguments` in step function | Step functions cannot reference `arguments`. Use explicit parameters or rest params instead. | +| Forbidden `super` in step function | Step functions cannot use `super` calls. Move that logic outside the step boundary. | | Misplaced directive | Directive must be at top of file or start of function body | | Conflicting directives | Cannot have both `"use step"` and `"use workflow"` at module level | | Invalid exports | Module-level directive files can only export async functions |