diff --git a/packages/world-postgres/src/drizzle/migrations/0008_migrate_pgboss_to_graphile.sql b/packages/world-postgres/src/drizzle/migrations/0008_migrate_pgboss_to_graphile.sql new file mode 100644 index 0000000000..25ec8c039e --- /dev/null +++ b/packages/world-postgres/src/drizzle/migrations/0008_migrate_pgboss_to_graphile.sql @@ -0,0 +1,20 @@ +-- Migrate pending pg-boss jobs to a staging table before dropping the pgboss schema. +-- The application code will re-enqueue these jobs into graphile-worker on first start. +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM information_schema.schemata WHERE schema_name = 'pgboss') THEN + CREATE TABLE IF NOT EXISTS "workflow"."_pgboss_pending_jobs" ( + name text NOT NULL, + data jsonb, + singleton_key text, + retry_limit integer + ); + + INSERT INTO "workflow"."_pgboss_pending_jobs" (name, data, singleton_key, retry_limit) + SELECT name, data, singleton_key, retry_limit + FROM pgboss.job + WHERE state IN ('created', 'retry'); + + DROP SCHEMA pgboss CASCADE; + END IF; +END $$; diff --git a/packages/world-postgres/src/drizzle/migrations/meta/_journal.json b/packages/world-postgres/src/drizzle/migrations/meta/_journal.json index 810aacbcac..8fa6f70a27 100644 --- a/packages/world-postgres/src/drizzle/migrations/meta/_journal.json +++ b/packages/world-postgres/src/drizzle/migrations/meta/_journal.json @@ -57,6 +57,13 @@ "when": 1769500000000, "tag": "0007_add_waits_table", "breakpoints": true + }, + { + "idx": 8, + "version": "7", + "when": 1770000000000, + "tag": "0008_migrate_pgboss_to_graphile", + "breakpoints": true } ] } diff --git a/packages/world-postgres/src/index.ts b/packages/world-postgres/src/index.ts index 41c09a9228..6f2993e3db 100644 --- a/packages/world-postgres/src/index.ts +++ b/packages/world-postgres/src/index.ts @@ -34,7 +34,7 @@ export function createWorld( ): World & { start(): Promise } { const postgres = createPostgres(config.connectionString); const drizzle = createClient(postgres); - const queue = createQueue(config); + const queue = createQueue(config, postgres); const storage = createStorage(drizzle); const streamer = createStreamer(postgres, drizzle); diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts index 3f622a4449..ad11ffd69d 100644 --- a/packages/world-postgres/src/queue.ts +++ b/packages/world-postgres/src/queue.ts @@ -11,13 +11,14 @@ import { createLocalWorld } from '@workflow/world-local'; import { Logger, makeWorkerUtils, - run, type Runner, + run, type WorkerUtils, } from 'graphile-worker'; +import type Postgres from 'postgres'; import { monotonicFactory } from 'ulid'; -import { MessageData } from './message.js'; import type { PostgresWorldConfig } from './config.js'; +import { MessageData } from './message.js'; // Redirect graphile-worker logs to stderr so CLI --json on stdout stays clean. // TODO: When CI=1 suppresses logging, replace with conditional stdout (e.g. log to stdout when not in JSON/CI mode). @@ -43,7 +44,10 @@ export type PostgresQueue = Queue & { close(): Promise; }; -export function createQueue(config: PostgresWorldConfig): PostgresQueue { +export function createQueue( + config: PostgresWorldConfig, + postgres: Postgres.Sql +): PostgresQueue { const port = process.env.PORT ? Number(process.env.PORT) : undefined; const localWorld = createLocalWorld({ dataDir: undefined, port }); @@ -66,15 +70,68 @@ export function createQueue(config: PostgresWorldConfig): PostgresQueue { let runner: Runner | null = null; let startPromise: Promise | null = null; + async function migratePgBossJobs(utils: WorkerUtils): Promise { + // Scenario A: Drizzle migration already ran — staging table exists + const hasStaging = await postgres` + SELECT EXISTS ( + SELECT 1 FROM information_schema.tables + WHERE table_schema = 'workflow' + AND table_name = '_pgboss_pending_jobs' + ) AS exists + `; + if (hasStaging[0].exists) { + const jobs = await postgres` + SELECT name, data, singleton_key, retry_limit + FROM "workflow"."_pgboss_pending_jobs" + `; + for (const job of jobs) { + await utils.addJob(job.name, job.data as Record, { + jobKey: job.singleton_key ?? undefined, + maxAttempts: job.retry_limit ?? 3, + }); + } + await postgres`DROP TABLE "workflow"."_pgboss_pending_jobs"`; + return; + } + + // Scenario B: Drizzle migration didn't run — pgboss schema still exists + const hasPgBoss = await postgres` + SELECT EXISTS ( + SELECT 1 FROM information_schema.schemata + WHERE schema_name = 'pgboss' + ) AS exists + `; + if (hasPgBoss[0].exists) { + const jobs = await postgres` + SELECT name, data, singleton_key, retry_limit + FROM pgboss.job + WHERE state IN ('created', 'retry') + `; + for (const job of jobs) { + await utils.addJob(job.name, job.data as Record, { + jobKey: job.singleton_key ?? undefined, + maxAttempts: job.retry_limit ?? 3, + }); + } + await postgres`DROP SCHEMA pgboss CASCADE`; + } + } + async function start(): Promise { if (!startPromise) { startPromise = (async () => { - workerUtils = await makeWorkerUtils({ - connectionString: config.connectionString, - logger: stderrLogger, - }); - await workerUtils.migrate(); - await setupListeners(); + try { + workerUtils = await makeWorkerUtils({ + connectionString: config.connectionString, + logger: stderrLogger, + }); + await workerUtils.migrate(); + await migratePgBossJobs(workerUtils); + await setupListeners(); + } catch (err) { + startPromise = null; + throw err; + } })(); } await startPromise; @@ -155,6 +212,7 @@ export function createQueue(config: PostgresWorldConfig): PostgresQueue { await workerUtils.release(); workerUtils = null; } + startPromise = null; await localWorld.close?.(); }, };