Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- Migrate pending pg-boss jobs to a staging table before dropping the pgboss schema.
-- The application code will re-enqueue these jobs into graphile-worker on first start.
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.schemata WHERE schema_name = 'pgboss') THEN
CREATE TABLE IF NOT EXISTS "workflow"."_pgboss_pending_jobs" (
name text NOT NULL,
data jsonb,
singleton_key text,
retry_limit integer
);

INSERT INTO "workflow"."_pgboss_pending_jobs" (name, data, singleton_key, retry_limit)
SELECT name, data, singleton_key, retry_limit
FROM pgboss.job
WHERE state IN ('created', 'retry');

DROP SCHEMA pgboss CASCADE;
END IF;
END $$;
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@
"when": 1769500000000,
"tag": "0007_add_waits_table",
"breakpoints": true
},
{
"idx": 8,
"version": "7",
"when": 1770000000000,
"tag": "0008_migrate_pgboss_to_graphile",
"breakpoints": true
}
]
}
2 changes: 1 addition & 1 deletion packages/world-postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export function createWorld(
): World & { start(): Promise<void> } {
const postgres = createPostgres(config.connectionString);
const drizzle = createClient(postgres);
const queue = createQueue(config);
const queue = createQueue(config, postgres);
const storage = createStorage(drizzle);
const streamer = createStreamer(postgres, drizzle);

Expand Down
76 changes: 67 additions & 9 deletions packages/world-postgres/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import { createLocalWorld } from '@workflow/world-local';
import {
Logger,
makeWorkerUtils,
run,
type Runner,
run,
type WorkerUtils,
} from 'graphile-worker';
import type Postgres from 'postgres';
import { monotonicFactory } from 'ulid';
import { MessageData } from './message.js';
import type { PostgresWorldConfig } from './config.js';
import { MessageData } from './message.js';

// Redirect graphile-worker logs to stderr so CLI --json on stdout stays clean.
// TODO: When CI=1 suppresses logging, replace with conditional stdout (e.g. log to stdout when not in JSON/CI mode).
Expand All @@ -43,7 +44,10 @@ export type PostgresQueue = Queue & {
close(): Promise<void>;
};

export function createQueue(config: PostgresWorldConfig): PostgresQueue {
export function createQueue(
config: PostgresWorldConfig,
postgres: Postgres.Sql
): PostgresQueue {
const port = process.env.PORT ? Number(process.env.PORT) : undefined;
const localWorld = createLocalWorld({ dataDir: undefined, port });

Expand All @@ -66,15 +70,68 @@ export function createQueue(config: PostgresWorldConfig): PostgresQueue {
let runner: Runner | null = null;
let startPromise: Promise<void> | null = null;

async function migratePgBossJobs(utils: WorkerUtils): Promise<void> {
// Scenario A: Drizzle migration already ran — staging table exists
const hasStaging = await postgres`
SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'workflow'
AND table_name = '_pgboss_pending_jobs'
) AS exists
`;
if (hasStaging[0].exists) {
const jobs = await postgres`
SELECT name, data, singleton_key, retry_limit
FROM "workflow"."_pgboss_pending_jobs"
`;
for (const job of jobs) {
await utils.addJob(job.name, job.data as Record<string, unknown>, {
jobKey: job.singleton_key ?? undefined,
maxAttempts: job.retry_limit ?? 3,
});
}
await postgres`DROP TABLE "workflow"."_pgboss_pending_jobs"`;
return;
}

// Scenario B: Drizzle migration didn't run — pgboss schema still exists
const hasPgBoss = await postgres`
SELECT EXISTS (
SELECT 1 FROM information_schema.schemata
WHERE schema_name = 'pgboss'
) AS exists
`;
if (hasPgBoss[0].exists) {
const jobs = await postgres`
SELECT name, data, singleton_key, retry_limit
FROM pgboss.job
WHERE state IN ('created', 'retry')
`;
for (const job of jobs) {
await utils.addJob(job.name, job.data as Record<string, unknown>, {
jobKey: job.singleton_key ?? undefined,
maxAttempts: job.retry_limit ?? 3,
});
}
await postgres`DROP SCHEMA pgboss CASCADE`;
}
}

async function start(): Promise<void> {
if (!startPromise) {
startPromise = (async () => {
workerUtils = await makeWorkerUtils({
connectionString: config.connectionString,
logger: stderrLogger,
});
await workerUtils.migrate();
await setupListeners();
try {
workerUtils = await makeWorkerUtils({
connectionString: config.connectionString,
logger: stderrLogger,
});
await workerUtils.migrate();
await migratePgBossJobs(workerUtils);
await setupListeners();
} catch (err) {
startPromise = null;
throw err;
}
})();
}
await startPromise;
Expand Down Expand Up @@ -155,6 +212,7 @@ export function createQueue(config: PostgresWorldConfig): PostgresQueue {
await workerUtils.release();
workerUtils = null;
}
startPromise = null;
await localWorld.close?.();
},
};
Expand Down