Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/tall-carrots-thank.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@workflow/web": patch
"@workflow/world-postgres": patch
"@workflow/world-testing": patch
---

Replace queue `pg-boss`-based implementation with `graphile-worker`
4 changes: 2 additions & 2 deletions docs/content/docs/deploying/building-a-world.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ A **World** is the abstraction that allows workflows to run on any infrastructur
</Callout>

<Callout type="info">
**Reference Implementation:** The [Postgres World source code](https://github.com/vercel/workflow/tree/main/packages/world-postgres) is a production-ready example of how to implement the World interface with a database backend and pg-boss for queuing.
**Reference Implementation:** The [Postgres World source code](https://github.com/vercel/workflow/tree/main/packages/world-postgres) is a production-ready example of how to implement the World interface with a database backend and graphile-worker for queuing.
</Callout>

## What is a World?
Expand Down Expand Up @@ -190,7 +190,7 @@ Streams are identified by a combination of `runId` and `name`. Each workflow run
Study these implementations for guidance:

- **[Local World](https://github.com/vercel/workflow/tree/main/packages/world-local)** — Filesystem-based, great for understanding the basics
- **[Postgres World](https://github.com/vercel/workflow/tree/main/packages/world-postgres)** — Database-backed with pg-boss for queuing
- **[Postgres World](https://github.com/vercel/workflow/tree/main/packages/world-postgres)** — Database-backed with graphile-worker for queuing

## Testing Your World

Expand Down
12 changes: 6 additions & 6 deletions docs/content/docs/deploying/world/postgres-world.mdx
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
---
title: Postgres World
description: Production-ready, self-hosted world using PostgreSQL for storage and pg-boss for job processing.
description: Production-ready, self-hosted world using PostgreSQL for storage and graphile-worker for job processing.
type: integration
summary: Deploy workflows to your own infrastructure using PostgreSQL and pg-boss.
summary: Deploy workflows to your own infrastructure using PostgreSQL and graphile-worker.
prerequisites:
- /docs/deploying
related:
- /docs/deploying/world/local-world
- /docs/deploying/world/vercel-world
---

The Postgres World is a production-ready backend for self-hosted deployments. It uses PostgreSQL for durable storage and [pg-boss](https://github.com/timgit/pg-boss) for reliable job processing.
The Postgres World is a production-ready backend for self-hosted deployments. It uses PostgreSQL for durable storage and [graphile-worker](https://github.com/graphile/worker) for reliable job processing.

Use the Postgres World when you need to deploy workflows on your own infrastructure outside of Vercel - such as a Docker container, Kubernetes cluster, or any cloud that supports long-running servers.

Expand Down Expand Up @@ -41,7 +41,7 @@ The migration is idempotent and can safely be run as a post-deployment lifecycle

## Starting the World

To subscribe to the pg-boss queue, your workflow app needs to start the world on server start. Here are examples for a few frameworks:
To subscribe to the graphile-worker queue, your workflow app needs to start the world on server start. Here are examples for a few frameworks:

<Tabs items={["Next.js", "SvelteKit", "Nitro"]}>

Expand Down Expand Up @@ -154,7 +154,7 @@ Default: `postgres://world:world@localhost:5432/world`

### `WORKFLOW_POSTGRES_JOB_PREFIX`

Prefix for pg-boss queue job names. Useful when sharing a database between multiple applications.
Prefix for graphile-worker queue job names. Useful when sharing a database between multiple applications.

### `WORKFLOW_POSTGRES_WORKER_CONCURRENCY`

Expand All @@ -178,7 +178,7 @@ const world = createWorld({
The Postgres World uses PostgreSQL as a durable backend:

- **Storage** - Workflow runs, events, steps, and hooks are stored in PostgreSQL tables
- **Job Queue** - [pg-boss](https://github.com/timgit/pg-boss) handles reliable job processing with retries
- **Job Queue** - [graphile-worker](https://github.com/graphile/worker) handles reliable job processing with retries
- **Streaming** - PostgreSQL NOTIFY/LISTEN enables real-time event distribution

This architecture ensures workflows survive application restarts with all state reliably persisted. For implementation details, see the [source code](https://github.com/vercel/workflow/tree/main/packages/world-postgres).
Expand Down
2 changes: 1 addition & 1 deletion packages/web/app/lib/known-worlds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export const KNOWN_WORLDS: KnownWorld[] = [
id: 'postgres',
displayName: 'PostgreSQL',
packageName: '@workflow/world-postgres',
description: 'PostgreSQL-based storage with pg-boss queue',
description: 'PostgreSQL-based storage with graphile-worker queue',
isBuiltIn: false,
},
];
8 changes: 4 additions & 4 deletions packages/world-postgres/HOW_IT_WORKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ If you want to use any other ORM, query builder or underlying database client, y

```mermaid
graph LR
Client --> PG[pg-boss queue]
Client --> PG[graphile-worker queue]
PG --> Worker[Embedded Worker]
Worker --> HTTP[HTTP fetch]
HTTP --> EW[Local World]
Expand All @@ -33,15 +33,15 @@ Real-time data streaming via **PostgreSQL LISTEN/NOTIFY**:

## Setup

Call `world.start()` to initialize pg-boss workers. When `.start()` is called, workers begin listening to pg-boss queues. When a job arrives, workers make HTTP fetch calls to the local world endpoints (`.well-known/workflow/v1/flow` or `.well-known/workflow/v1/step`) to execute the actual workflow logic.
Call `world.start()` to initialize graphile-worker workers. When `.start()` is called, workers begin listening to graphile-worker queues. When a job arrives, workers make HTTP fetch calls to the local world endpoints (`.well-known/workflow/v1/flow` or `.well-known/workflow/v1/step`) to execute the actual workflow logic.

In **Next.js**, the `world.setup()` function needs to be added to `instrumentation.ts|js` to ensure workers start before request handling:
In **Next.js**, the `world.start()` call needs to be added to `instrumentation.ts|js` to ensure workers start before request handling. Use `workflow/runtime` for `getWorld` (same as the testing server and other framework plugins):

```ts
// instrumentation.ts

if (process.env.NEXT_RUNTIME !== "edge") {
import("workflow/api").then(async ({ getWorld }) => {
import("workflow/runtime").then(async ({ getWorld }) => {
// start listening to the jobs.
await getWorld().start?.();
});
Expand Down
18 changes: 16 additions & 2 deletions packages/world-postgres/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const world = createWorld({

This package uses PostgreSQL with the following components:

- **pg-boss**: For queue processing and job management
- **graphile-worker**: For queue processing and job management
- **Drizzle ORM**: For database operations and schema management
- **postgres**: For PostgreSQL client connections

Expand Down Expand Up @@ -115,7 +115,7 @@ Make sure your PostgreSQL database is accessible and the user has sufficient per
## Features

- **Durable Storage**: Stores workflow runs, events, steps, hooks, and webhooks in PostgreSQL
- **Queue Processing**: Uses pg-boss for reliable job queue processing
- **Queue Processing**: Uses graphile-worker for reliable job queue processing
- **Streaming**: Real-time event streaming capabilities
- **Health Checks**: Built-in connection health monitoring
- **Configurable Concurrency**: Adjustable worker concurrency for queue processing
Expand All @@ -137,6 +137,20 @@ export WORKFLOW_POSTGRES_URL="postgres://world:world@localhost:5432/world"
export WORKFLOW_TARGET_WORLD="@workflow/world-postgres"
```

## Testing

Integration tests use [Testcontainers](https://testcontainers.com/) to start a PostgreSQL container. **Docker must be installed and running** before you run tests.

- **Linux/macOS**: Start the Docker daemon (e.g. `sudo systemctl start docker` or Docker Desktop).
- **WSL2**: Use Docker Desktop with WSL2 integration, or run the Docker engine inside WSL and ensure the daemon is started. Verify with `docker info`.

Then from the package directory:

```bash
pnpm build
pnpm test
```

## World Selection

To use the PostgreSQL world, set the `WORKFLOW_TARGET_WORLD` environment variable to the package name:
Expand Down
2 changes: 1 addition & 1 deletion packages/world-postgres/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"cbor-x": "1.6.0",
"dotenv": "16.4.5",
"drizzle-orm": "0.44.7",
"pg-boss": "11.0.7",
"graphile-worker": "0.16.6",
"postgres": "3.4.7",
"ulid": "3.0.1",
"zod": "catalog:"
Expand Down
12 changes: 1 addition & 11 deletions packages/world-postgres/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { Socket } from 'node:net';
import type { Storage, World } from '@workflow/world';
import PgBoss from 'pg-boss';
import createPostgres from 'postgres';
import type { PostgresWorldConfig } from './config.js';
import { createClient, type Drizzle } from './drizzle/index.js';
Expand Down Expand Up @@ -33,12 +32,9 @@ export function createWorld(
10,
}
): World & { start(): Promise<void> } {
const boss = new PgBoss({
connectionString: config.connectionString,
});
const postgres = createPostgres(config.connectionString);
const drizzle = createClient(postgres);
const queue = createQueue(boss, config);
const queue = createQueue(config);
const storage = createStorage(drizzle);
const streamer = createStreamer(postgres, drizzle);

Expand All @@ -50,12 +46,6 @@ export function createWorld(
await queue.start();
},
async close() {
await boss.stop();
const bossDb = boss.getDb() as {
opened?: boolean;
close?: () => Promise<void>;
};
if (bossDb.opened) await bossDb.close?.();
await streamer.close();
await queue.close();
await postgres.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import * as z from 'zod';
import { Base64Buffer } from './zod.js';

/**
/* pgboss is using JSON under the hood, so we need to base64 encode
/* the body to ensure binary safety
/* maybe later we can have a `blobs` table for larger payloads
**/
* graphile-worker is using JSON under the hood, so we need to base64 encode
* the body to ensure binary safety
* maybe later we can have a `blobs` table for larger payloads
*/
export const MessageData = z.object({
attempt: z.number().describe('The attempt number of the message'),
messageId: MessageId.describe('The unique ID of the message'),
Expand Down
117 changes: 67 additions & 50 deletions packages/world-postgres/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,32 @@ import {
type ValidQueueName,
} from '@workflow/world';
import { createLocalWorld } from '@workflow/world-local';
import type PgBoss from 'pg-boss';
import {
Logger,
makeWorkerUtils,
run,
type Runner,
type WorkerUtils,
} from 'graphile-worker';
import { monotonicFactory } from 'ulid';
import { MessageData } from './boss.js';
import { MessageData } from './message.js';
import type { PostgresWorldConfig } from './config.js';

// Redirect graphile-worker logs to stderr so CLI --json on stdout stays clean.
// TODO: When CI=1 suppresses logging, replace with conditional stdout (e.g. log to stdout when not in JSON/CI mode).
const stderrLogger = new Logger(
() => (level: string, message: string, meta?: unknown) => {
const line = [level, message, meta].filter(Boolean).join(' ') + '\n';
process.stderr.write(line);
}
);
Comment on lines +22 to +29
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kschmelter13 Graphile Worker is quite noisy in the logs. Can we change the log level to show only errors maybe? Currently, it even shows debug messages like "spawned" (multiple times, I hope that's not some indicator of unexpected/unsupported behavior).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default it logs all levels, I can default to errors only and add an env for override?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, something like that. Just check yourself which kinds of messages come up in npm run dev. The default logger seems to ignore debug messages unless some env is set: https://worker.graphile.org/docs/library/logger
Probably that's gone because you override the logger. But I think it also wrote several non-debug messages, like which queues it's listening to, that's why I suggested error only, maybe warning.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the messages I see:

info Failed to read crontab file '/workspaces/playground/crontab'; cron is disabled
debug Registering termination signal handlers (SIGUSR2, SIGINT, SIGTERM, SIGHUP, SIGABRT) [object Object]
debug Spawned
debug Spawned
debug Spawned
debug Spawned
debug Spawned
debug Spawned
debug Spawned
debug Spawned
debug Spawned
debug Spawned
info Worker connected and looking for jobs... (task names: 'workflow_flows', 'workflow_steps')
debug Graphile Worker Cron fired 1.796s too early (clock skew?); rescheduling [object Object]
debug Graphile Worker Cron fired 3.557s too early (clock skew?); rescheduling [object Object]
debug Graphile Worker Cron fired 3.581s too early (clock skew?); rescheduling [object Object]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rovo89 I wrote some tests around the logging and chose to just use the default graphile logger, and remove the cusotm logger implementation which wasnt really needed. I opened a pr if you want to take a look: #1163


/**
* The Postgres queue works by creating two job types in pg-boss:
* The Postgres queue works by creating two job types in graphile-worker:
* - `workflow` for workflow jobs
* - `step` for step jobs
*
* When a message is queued, it is sent to pg-boss with the appropriate job type.
* When a message is queued, it is sent to graphile-worker with the appropriate job type.
* When a job is processed, it is deserialized and then re-queued into the _local world_, showing that
* we can reuse the local world, mix and match worlds to build
* hybrid architectures, and even migrate between worlds.
Expand All @@ -28,10 +43,7 @@ export type PostgresQueue = Queue & {
close(): Promise<void>;
};

export function createQueue(
boss: PgBoss,
config: PostgresWorldConfig
): PostgresQueue {
export function createQueue(config: PostgresWorldConfig): PostgresQueue {
const port = process.env.PORT ? Number(process.env.PORT) : undefined;
const localWorld = createLocalWorld({ dataDir: undefined, port });

Expand All @@ -50,59 +62,50 @@ export function createQueue(
return 'postgres';
};

const createdQueues = new Map<string, Promise<void>>();
let workerUtils: WorkerUtils | null = null;
let runner: Runner | null = null;
let startPromise: Promise<void> | null = null;

function createQueue(name: string) {
let createdQueue = createdQueues.get(name);
if (!createdQueue) {
createdQueue = boss.createQueue(name);
createdQueues.set(name, createdQueue);
async function start(): Promise<void> {
if (!startPromise) {
Comment thread
kschmelter13 marked this conversation as resolved.
startPromise = (async () => {
workerUtils = await makeWorkerUtils({
connectionString: config.connectionString,
logger: stderrLogger,
});
await workerUtils.migrate();
await setupListeners();
})();
}
return createdQueue;
await startPromise;
}

const queue: Queue['queue'] = async (queue, message, opts) => {
await boss.start();
await start();
const [prefix, queueId] = parseQueueName(queue);
const jobName = Queues[prefix];
await createQueue(jobName);
const body = transport.serialize(message);
const messageId = MessageId.parse(`msg_${generateMessageId()}`);
await boss.send({
name: jobName,
options: {
singletonKey: opts?.idempotencyKey ?? messageId,
retryLimit: 3,
},
data: MessageData.encode({
await workerUtils!.addJob(
jobName,
MessageData.encode({
id: queueId,
data: body,
attempt: 1,
messageId,
idempotencyKey: opts?.idempotencyKey,
}),
});
{
jobKey: opts?.idempotencyKey ?? messageId,
maxAttempts: 3,
}
);
return { messageId };
};

async function setupListener(queue: QueuePrefix, jobName: string) {
await createQueue(jobName);
await Promise.all(
Array.from({ length: config.queueConcurrency || 10 }, async () => {
await boss.work(
jobName,
{
// The default is 2s, which is far too slow for running steps in quick succession.
// The min is 0.5s, which is still too slow. We should move to a pg NOTIFY/LISTEN-based job system.
pollingIntervalSeconds: 0.5,
},
work
);
})
);

async function work([job]: PgBoss.Job[]) {
const messageData = MessageData.parse(job.data);
function createTaskHandler(queue: QueuePrefix) {
return async (payload: unknown) => {
const messageData = MessageData.parse(payload);
const bodyStream = Stream.Readable.toWeb(
Stream.Readable.from([messageData.data])
);
Expand All @@ -113,31 +116,45 @@ export function createQueue(
const queueName = `${queue}${messageData.id}` as const;
// TODO: Custom headers from opts.headers are not propagated into MessageData.
// To support this, MessageData schema would need to include a headers field
// and the headers would need to be stored/retrieved from pg-boss job data.
// and the headers would need to be stored/retrieved from graphile-worker job data.
await localWorld.queue(queueName, message, {
idempotencyKey: messageData.idempotencyKey,
});
}
};
}

async function setupListeners() {
const taskList: Record<string, (payload: unknown) => Promise<void>> = {};
for (const [prefix, jobName] of Object.entries(Queues) as [
QueuePrefix,
string,
][]) {
await setupListener(prefix, jobName);
taskList[jobName] = createTaskHandler(prefix);
}

runner = await run({
connectionString: config.connectionString,
concurrency: config.queueConcurrency || 10,
logger: stderrLogger,
pollInterval: 500, // 500ms = 0.5s (graphile-worker uses LISTEN/NOTIFY when available)
taskList,
});
}

return {
createQueueHandler,
getDeploymentId,
queue,
async start() {
boss = await boss.start();
await setupListeners();
},
start,
async close() {
if (runner) {
await runner.stop();
runner = null;
}
if (workerUtils) {
await workerUtils.release();
workerUtils = null;
}
await localWorld.close?.();
},
};
Expand Down
Loading