diff --git a/CHANGES.md b/CHANGES.md index b17cddbcc..bb84e01bb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,6 +65,14 @@ To be released. ### @fedify/mysql + - Added `MysqlMessageQueue` class to the `@fedify/mysql` package, a + MySQL/MariaDB-backed `MessageQueue` implementation. It uses periodic + polling (`SELECT … FOR UPDATE SKIP LOCKED`) to deliver messages and + MySQL advisory locks (`GET_LOCK`/`RELEASE_LOCK`) for ordering-key + serialization. Supports delayed delivery, ordering keys, + `enqueueMany()`, and concurrent workers. Requires MySQL 8.0+ or + MariaDB 10.6+. [[#586], [#599]] + - Added `@fedify/mysql` package, a MySQL/MariaDB-backed `KvStore` implementation. It provides `MysqlKvStore`, which stores key–value pairs in a MySQL table using the [`mysql2`] driver. Supports TTL, @@ -73,7 +81,9 @@ To be released. [`mysql2`]: https://www.npmjs.com/package/mysql2 [#585]: https://github.com/fedify-dev/fedify/issues/585 +[#586]: https://github.com/fedify-dev/fedify/issues/586 [#597]: https://github.com/fedify-dev/fedify/pull/597 +[#599]: https://github.com/fedify-dev/fedify/pull/599 Version 2.0.3 diff --git a/docs/manual/deploy.md b/docs/manual/deploy.md index a56e660d9..dadcecc33 100644 --- a/docs/manual/deploy.md +++ b/docs/manual/deploy.md @@ -168,7 +168,9 @@ Development Production : Consider [`PostgresKvStore`](./kv.md#postgreskvstore) and [`PostgresMessageQueue`](./mq.md#postgresmessagequeue) if you already use - PostgreSQL, or [`RedisKvStore`](./kv.md#rediskvstore) and + PostgreSQL, [`MysqlKvStore`](./kv.md#mysqlkvstore) and + [`MysqlMessageQueue`](./mq.md#mysqlmessagequeue) if you already use + MySQL or MariaDB, or [`RedisKvStore`](./kv.md#rediskvstore) and [`RedisMessageQueue`](./mq.md#redismessagequeue) for dedicated caching infrastructure. There is also [`AmqpMessageQueue`](./mq.md#amqpmessagequeue) for RabbitMQ users. diff --git a/docs/manual/federation.md b/docs/manual/federation.md index d6cca84e1..5e8b536df 100644 --- a/docs/manual/federation.md +++ b/docs/manual/federation.md @@ -115,8 +115,10 @@ runtime). As separate packages, [`@fedify/redis`] provides [`RedisMessageQueue`] class, which is a Redis-backed implementation for production use, -and [`@fedify/postgres`] provides [`PostgresMessageQueue`] class, which is a -PostgreSQL-backed implementation for production use, and [`@fedify/amqp`] +[`@fedify/postgres`] provides [`PostgresMessageQueue`] class, which is a +PostgreSQL-backed implementation for production use, +[`@fedify/mysql`] provides [`MysqlMessageQueue`] class, which is a +MySQL/MariaDB-backed implementation for production use, and [`@fedify/amqp`] provides [`AmqpMessageQueue`] class, which is an AMQP broker-backed implementation for production use. @@ -187,6 +189,7 @@ Further details are explained in the [*Message queue* section](./mq.md). [`RedisMessageQueue`]: https://jsr.io/@fedify/redis/doc/mq/~/RedisMessageQueue [`PostgresMessageQueue`]: https://jsr.io/@fedify/postgres/doc/mq/~/PostgresMessageQueue +[`MysqlMessageQueue`]: https://jsr.io/@fedify/mysql/doc/mq/~/MysqlMessageQueue [`@fedify/amqp`]: https://github.com/fedify-dev/fedify/tree/main/packages/amqp [`AmqpMessageQueue`]: https://jsr.io/@fedify/amqp/doc/mq/~/AmqpMessageQueue diff --git a/docs/manual/inbox.md b/docs/manual/inbox.md index 36e67802f..66916f0f5 100644 --- a/docs/manual/inbox.md +++ b/docs/manual/inbox.md @@ -349,8 +349,9 @@ const federation = createFederation({ > The `InProcessMessageQueue` is a simple in-memory message queue that is > suitable for development and testing. For production use, you should > consider using a more robust message queue, such as [`DenoKvMessageQueue`] -> from [`@fedify/denokv`] package or [`RedisMessageQueue`] from -> [`@fedify/redis`] package. +> from [`@fedify/denokv`] package, [`RedisMessageQueue`] from +> [`@fedify/redis`] package, or [`MysqlMessageQueue`] from +> [`@fedify/mysql`] package. > > For more information, see the [*Message queue* section](./mq.md). @@ -389,6 +390,8 @@ duplicate retry mechanisms and leverages the backend's optimized retry features. [`@fedify/denokv`]: https://github.com/fedify-dev/fedify/tree/main/packages/denokv [`RedisMessageQueue`]: https://jsr.io/@fedify/redis/doc/mq/~/RedisMessageQueue [`@fedify/redis`]: https://github.com/fedify-dev/fedify/tree/main/packages/redis +[`MysqlMessageQueue`]: https://jsr.io/@fedify/mysql/doc/mq/~/MysqlMessageQueue +[`@fedify/mysql`]: https://github.com/fedify-dev/fedify/tree/main/packages/mysql Activity idempotency diff --git a/docs/manual/mq.md b/docs/manual/mq.md index 2b33f1185..c368f2f30 100644 --- a/docs/manual/mq.md +++ b/docs/manual/mq.md @@ -293,6 +293,88 @@ const federation = createFederation({ [`AmqpMessageQueue`]: https://jsr.io/@fedify/amqp/doc/mq/~/AmqpMessageQueue [RabbitMQ]: https://www.rabbitmq.com/ +### [`MysqlMessageQueue`] + +*This API is available since Fedify 2.1.0.* + +To use [`MysqlMessageQueue`], you need to install the *@fedify/mysql* package +first: + +::: code-group + +~~~~ bash [Deno] +deno add jsr:@fedify/mysql +~~~~ + +~~~~ bash [npm] +npm add @fedify/mysql mysql2 +~~~~ + +~~~~ bash [pnpm] +pnpm add @fedify/mysql mysql2 +~~~~ + +~~~~ bash [Yarn] +yarn add @fedify/mysql mysql2 +~~~~ + +~~~~ bash [Bun] +bun add @fedify/mysql mysql2 +~~~~ + +::: + +[`MysqlMessageQueue`] is a message queue implementation that uses a MySQL or +MariaDB database as the backend. Since MySQL and MariaDB do not provide a +`LISTEN`/`NOTIFY` mechanism, it uses **polling** to discover new messages. +The polling interval is configurable and defaults to 1 second to minimize +latency; this is shorter than the default for PostgreSQL-backed queues. + +Concurrent workers are safely supported via `SELECT … FOR UPDATE SKIP LOCKED` +and MySQL advisory locks (`GET_LOCK`/`RELEASE_LOCK`). + +> [!NOTE] +> `MysqlMessageQueue` requires MySQL 8.0+ or MariaDB 10.6+ for +> `SELECT … FOR UPDATE SKIP LOCKED` support. + +> [!NOTE] +> Because `MysqlMessageQueue` uses polling rather than a push-based +> notification system, there is an inherent latency between when a message +> is enqueued and when it is delivered. With the default 1-second poll +> interval, messages may take up to 1 second to be picked up. You can +> lower the `pollInterval` option to reduce this latency at the cost of +> additional database load. + +Best for +: Production use in systems that already use MySQL or MariaDB. + +Pros +: Persistent, supports multiple workers, minimal additional infrastructure + for MySQL/MariaDB users. + +Cons +: Polling-based delivery (up to `pollInterval` latency); requires + MySQL 8.0+ or MariaDB 10.6+. + +~~~~ typescript twoslash +import type { KvStore } from "@fedify/fedify"; +// ---cut-before--- +import { createFederation } from "@fedify/fedify"; +import { MysqlMessageQueue } from "@fedify/mysql"; +import mysql from "mysql2/promise"; + +const pool = mysql.createPool("mysql://user:pass@localhost/db"); +const federation = createFederation({ +// ---cut-start--- + kv: null as unknown as KvStore, +// ---cut-end--- + queue: new MysqlMessageQueue(pool), // [!code highlight] + // ... other options +}); +~~~~ + +[`MysqlMessageQueue`]: https://jsr.io/@fedify/mysql/doc/mq/~/MysqlMessageQueue + ### `SqliteMessageQueue` *This API is available since Fedify 2.0.0.* @@ -767,6 +849,9 @@ The following implementations do not yet support native retry: [`PostgresMessageQueue`] : Native retry support planned for future release. +[`MysqlMessageQueue`] +: No native retry support (`~MessageQueue.nativeRetrial` is `false`). + [`AmqpMessageQueue`] : Native retry support planned for future release. @@ -835,6 +920,7 @@ The following implementations support ordering keys: | [`DenoKvMessageQueue`] | Yes | | [`RedisMessageQueue`] | Yes | | [`PostgresMessageQueue`] | Yes | +| [`MysqlMessageQueue`] | Yes | | [`AmqpMessageQueue`] | Yes[^1] | | [`SqliteMessageQueue`] | Yes | | `WorkersMessageQueue` | Yes[^2] | diff --git a/docs/manual/relay.md b/docs/manual/relay.md index c533ec2ee..4d50ef941 100644 --- a/docs/manual/relay.md +++ b/docs/manual/relay.md @@ -154,7 +154,8 @@ Configuration options ~~~~ > [!NOTE] - > For production, use [`RedisMessageQueue`] or [`PostgresMessageQueue`]. + > For production, use [`RedisMessageQueue`], [`PostgresMessageQueue`], + > or [`MysqlMessageQueue`]. `subscriptionHandler` (required) : Callback to approve or reject subscription requests. See @@ -177,6 +178,7 @@ Configuration options [`RedisMessageQueue`]: https://jsr.io/@fedify/redis/doc/mq/~/RedisMessageQueue [`PostgresMessageQueue`]: https://jsr.io/@fedify/postgres/doc/mq/~/PostgresMessageQueue +[`MysqlMessageQueue`]: https://jsr.io/@fedify/mysql/doc/mq/~/MysqlMessageQueue Relay types diff --git a/docs/manual/send.md b/docs/manual/send.md index 9b06ccc6d..2d58b4360 100644 --- a/docs/manual/send.md +++ b/docs/manual/send.md @@ -390,8 +390,9 @@ const federation = createFederation({ > The `InProcessMessageQueue` is a simple in-memory message queue that is > suitable for development and testing. For production use, you should > consider using a more robust message queue, such as [`DenoKvMessageQueue`] -> from [`@fedify/denokv`] package or [`RedisMessageQueue`] from -> [`@fedify/redis`] package. +> from [`@fedify/denokv`] package, [`RedisMessageQueue`] from +> [`@fedify/redis`] package, or [`MysqlMessageQueue`] from +> [`@fedify/mysql`] package. > > For further information, see the [*Message queue* section](./mq.md). @@ -414,6 +415,8 @@ an error and does not retry the delivery. [`@fedify/denokv`]: https://github.com/fedify-dev/fedify/tree/main/packages/denokv [`RedisMessageQueue`]: https://jsr.io/@fedify/redis/doc/mq/~/RedisMessageQueue [`@fedify/redis`]: https://github.com/fedify-dev/fedify/tree/main/packages/redis +[`MysqlMessageQueue`]: https://jsr.io/@fedify/mysql/doc/mq/~/MysqlMessageQueue +[`@fedify/mysql`]: https://github.com/fedify-dev/fedify/tree/main/packages/mysql Optimizing activity delivery for large audiences diff --git a/packages/mysql/README.md b/packages/mysql/README.md index 46ab68da6..1703aeebb 100644 --- a/packages/mysql/README.md +++ b/packages/mysql/README.md @@ -6,20 +6,22 @@ [![JSR][JSR badge]][JSR] [![npm][npm badge]][npm] -This package provides [Fedify]'s [`KvStore`] implementation for -MySQL/MariaDB: +This package provides [Fedify]'s [`KvStore`] and [`MessageQueue`] +implementations for MySQL/MariaDB: - [`MysqlKvStore`] + - [`MysqlMessageQueue`] ~~~~ typescript import { createFederation } from "@fedify/fedify"; -import { MysqlKvStore } from "@fedify/mysql"; +import { MysqlKvStore, MysqlMessageQueue } from "@fedify/mysql"; import mysql from "mysql2/promise"; const pool = mysql.createPool("mysql://user:password@localhost/dbname"); const federation = createFederation({ kv: new MysqlKvStore(pool), + queue: new MysqlMessageQueue(pool), }); ~~~~ @@ -29,7 +31,9 @@ const federation = createFederation({ [npm]: https://www.npmjs.com/package/@fedify/mysql [Fedify]: https://fedify.dev/ [`KvStore`]: https://jsr.io/@fedify/fedify/doc/federation/~/KvStore +[`MessageQueue`]: https://jsr.io/@fedify/fedify/doc/federation/~/MessageQueue [`MysqlKvStore`]: https://jsr.io/@fedify/mysql/doc/~/MysqlKvStore +[`MysqlMessageQueue`]: https://jsr.io/@fedify/mysql/doc/mq/~/MysqlMessageQueue Installation diff --git a/packages/mysql/deno.json b/packages/mysql/deno.json index ece62a473..1f4243b96 100644 --- a/packages/mysql/deno.json +++ b/packages/mysql/deno.json @@ -4,7 +4,8 @@ "license": "MIT", "exports": { ".": "./src/mod.ts", - "./kv": "./src/kv.ts" + "./kv": "./src/kv.ts", + "./mq": "./src/mq.ts" }, "exclude": [ "dist", diff --git a/packages/mysql/package.json b/packages/mysql/package.json index 6ca657445..820e1c70d 100644 --- a/packages/mysql/package.json +++ b/packages/mysql/package.json @@ -51,6 +51,16 @@ "require": "./dist/kv.cjs", "default": "./dist/kv.js" }, + "./mq": { + "types": { + "import": "./dist/mq.d.ts", + "require": "./dist/mq.d.cts", + "default": "./dist/mq.d.ts" + }, + "import": "./dist/mq.js", + "require": "./dist/mq.cjs", + "default": "./dist/mq.js" + }, "./package.json": "./package.json" }, "files": [ diff --git a/packages/mysql/src/mod.ts b/packages/mysql/src/mod.ts index b80b5dde8..c9e04c42b 100644 --- a/packages/mysql/src/mod.ts +++ b/packages/mysql/src/mod.ts @@ -1 +1,2 @@ export { MysqlKvStore, type MysqlKvStoreOptions } from "./kv.ts"; +export { MysqlMessageQueue, type MysqlMessageQueueOptions } from "./mq.ts"; diff --git a/packages/mysql/src/mq.test.ts b/packages/mysql/src/mq.test.ts new file mode 100644 index 000000000..f0db6b9c1 --- /dev/null +++ b/packages/mysql/src/mq.test.ts @@ -0,0 +1,1079 @@ +import { MysqlMessageQueue } from "@fedify/mysql/mq"; +import { getRandomKey, testMessageQueue, waitFor } from "@fedify/testing"; +import * as temporal from "@js-temporal/polyfill"; +import assert from "node:assert/strict"; +import process from "node:process"; +import { test } from "node:test"; +import mysql from "mysql2/promise"; + +let Temporal: typeof temporal.Temporal; +if ("Temporal" in globalThis) { + Temporal = (globalThis as unknown as { Temporal: typeof temporal.Temporal }) + .Temporal; +} else { + Temporal = temporal.Temporal; +} + +const dbUrl = process.env.MYSQL_URL; + +/** + * Returns a short, MySQL-identifier-safe table name unique to this test run. + * The name is at most 30 characters long, well within the 46-character limit + * imposed by the `idx__deliver_after` index constraint. + */ +function randomTableName(prefix: string): string { + // crypto.randomUUID() returns a 36-char UUID with hyphens. + // We strip hyphens and take the first 16 hex digits for uniqueness. + const hex = crypto.randomUUID().replace(/-/g, "").slice(0, 16); + return `t_${prefix}_${hex}`; +} + +// --------------------------------------------------------------------------- +// Constructor validation (no DB required) +// --------------------------------------------------------------------------- + +test("MysqlMessageQueue rejects invalid table names", () => { + const fakePool = {} as mysql.Pool; + + // Must start with letter or underscore + assert.throws( + () => new MysqlMessageQueue(fakePool, { tableName: "1starts_digit" }), + RangeError, + ); + // No hyphens + assert.throws( + () => new MysqlMessageQueue(fakePool, { tableName: "bad-name" }), + RangeError, + ); + // No spaces + assert.throws( + () => new MysqlMessageQueue(fakePool, { tableName: "has space" }), + RangeError, + ); + // No special chars + assert.throws( + () => new MysqlMessageQueue(fakePool, { tableName: "name!" }), + RangeError, + ); + // Table name > 46 chars: derived index name would exceed MySQL's 64-char limit + assert.throws( + () => new MysqlMessageQueue(fakePool, { tableName: "a".repeat(47) }), + RangeError, + ); + + // Valid names should not throw + new MysqlMessageQueue(fakePool, { tableName: "valid_name" }); + new MysqlMessageQueue(fakePool, { tableName: "_leading_underscore" }); + new MysqlMessageQueue(fakePool, { tableName: "CamelCase123" }); + // Exactly 46 chars is valid + new MysqlMessageQueue(fakePool, { tableName: "a".repeat(46) }); +}); + +test("MysqlMessageQueue uses default options when none are provided", () => { + const fakePool = {} as mysql.Pool; + // Should not throw with default options + const mq = new MysqlMessageQueue(fakePool); + assert.strictEqual(mq.nativeRetrial, false); +}); + +// --------------------------------------------------------------------------- +// Standard shared test suite +// --------------------------------------------------------------------------- + +test("MysqlMessageQueue", { skip: dbUrl == null }, () => { + if (dbUrl == null) return; // Bun does not support skip option + const tableName = randomTableName("mq"); + const pools: mysql.Pool[] = []; + + function makeQueue(): MysqlMessageQueue { + const pool = mysql.createPool(dbUrl!); + pools.push(pool); + return new MysqlMessageQueue(pool, { tableName }); + } + + return testMessageQueue( + makeQueue, + async ({ mq1, mq2, controller }) => { + controller.abort(); + await mq1.drop(); + await mq2.drop(); + for (const pool of pools) await pool.end(); + }, + { testOrderingKey: true }, + ); +}); + +// --------------------------------------------------------------------------- +// initialize() and drop() +// --------------------------------------------------------------------------- + +test("MysqlMessageQueue.initialize()", { skip: dbUrl == null }, async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("init"); + const mq = new MysqlMessageQueue(pool, { tableName }); + try { + await mq.initialize(); + + // Table must exist + const [tables] = await pool.query( + `SELECT COUNT(*) AS cnt + FROM information_schema.tables + WHERE table_schema = DATABASE() AND table_name = ?`, + [tableName], + ); + assert.strictEqual(tables[0].cnt, 1); + + // The deliver_after index must exist + const [idxRows] = await pool.query( + `SELECT COUNT(*) AS cnt + FROM information_schema.statistics + WHERE table_schema = DATABASE() + AND table_name = ? + AND index_name = ?`, + [tableName, `idx_${tableName}_deliver_after`], + ); + assert.strictEqual(idxRows[0].cnt, 1, "deliver_after index must exist"); + + // The composite (ordering_key, deliver_after) index must exist so that + // #findOrderingKeyCandidates() can scan efficiently. + const [compIdxRows] = await pool.query( + `SELECT COUNT(*) AS cnt + FROM information_schema.statistics + WHERE table_schema = DATABASE() + AND table_name = ? + AND index_name = ?`, + [tableName, `idx_${tableName}_ok_da`], + ); + assert.strictEqual( + compIdxRows[0].cnt, + 2, + "composite (ordering_key, deliver_after) index must exist with 2 columns", + ); + + // id column must be CHAR(36) (for UUID storage) + const [cols] = await pool.query( + `SELECT CHARACTER_MAXIMUM_LENGTH + FROM information_schema.columns + WHERE table_schema = DATABASE() + AND table_name = ? + AND column_name = 'id'`, + [tableName], + ); + assert.ok( + cols[0].CHARACTER_MAXIMUM_LENGTH >= 36, + "id column must fit a UUID", + ); + } finally { + await mq.drop(); + await pool.end(); + } +}); + +test( + "MysqlMessageQueue.initialize() is idempotent", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("idem"); + const mq = new MysqlMessageQueue(pool, { tableName }); + try { + // Calling initialize() twice must not throw + await mq.initialize(); + await mq.initialize(); + + const [rows] = await pool.query( + `SELECT COUNT(*) AS cnt + FROM information_schema.tables + WHERE table_schema = DATABASE() AND table_name = ?`, + [tableName], + ); + assert.strictEqual(rows[0].cnt, 1); + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +test("MysqlMessageQueue.drop()", { skip: dbUrl == null }, async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("drop"); + const mq = new MysqlMessageQueue(pool, { tableName }); + try { + await mq.initialize(); + await mq.drop(); + + const [rows] = await pool.query( + `SELECT COUNT(*) AS cnt + FROM information_schema.tables + WHERE table_schema = DATABASE() AND table_name = ?`, + [tableName], + ); + assert.strictEqual(rows[0].cnt, 0, "table must be dropped"); + } finally { + await pool.end(); + } +}); + +test( + "MysqlMessageQueue.drop() resets initialized flag so re-initialize works", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("reinit"); + const mq = new MysqlMessageQueue(pool, { tableName }); + try { + await mq.initialize(); + await mq.drop(); + + // After drop(), initialize() must be able to recreate the table + await mq.initialize(); + const [rows] = await pool.query( + `SELECT COUNT(*) AS cnt + FROM information_schema.tables + WHERE table_schema = DATABASE() AND table_name = ?`, + [tableName], + ); + assert.strictEqual(rows[0].cnt, 1); + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +test( + "MysqlMessageQueue.drop() waits for in-flight initialize() before dropping", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("droprace"); + const mq = new MysqlMessageQueue(pool, { tableName }); + try { + // Fire initialize() but do not await it yet; then immediately drop(). + // Without the fix, drop() could complete before initialize() finishes, + // leaving #initialized === true with no table in the database. + const initPromise = mq.initialize(); + await mq.drop(); + await initPromise; + + // After drop(), the table must not exist and #initialized must be false + // (verified indirectly: a subsequent initialize() + enqueue() must work). + await mq.initialize(); + await mq.enqueue("after-drop"); + const [rows] = await pool.query( + `SELECT COUNT(*) AS cnt FROM \`${tableName}\``, + ); + assert.strictEqual(rows[0].cnt, 1, "message must survive drop+reinit"); + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// Concurrent initialization +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue concurrent initialization does not throw", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pools: mysql.Pool[] = []; + const tableName = randomTableName("concinit"); + try { + // 10 instances all racing to initialize the same table simultaneously + const instances = Array.from({ length: 10 }, () => { + const pool = mysql.createPool(dbUrl!); + pools.push(pool); + return new MysqlMessageQueue(pool, { tableName }); + }); + await assert.doesNotReject( + Promise.all(instances.map((mq) => mq.initialize())), + "Concurrent initialization must not throw", + ); + } finally { + // Clean up: drop via first pool + await pools[0]?.query(`DROP TABLE IF EXISTS \`${tableName}\``); + for (const pool of pools) await pool.end(); + } + }, +); + +test( + "MysqlMessageQueue enqueue() and listen() racing on initialize() is safe", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("race"); + const mq = new MysqlMessageQueue(pool, { tableName }); + const controller = new AbortController(); + const received: string[] = []; + try { + // Start listen() and enqueue() simultaneously — both will trigger + // initialize() concurrently + const listening = mq.listen( + (msg: string) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + await mq.enqueue("race-message"); + await waitFor(() => received.length >= 1, 15_000); + assert.deepStrictEqual(received, ["race-message"]); + controller.abort(); + await listening; + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// Pre-enqueued messages discovered via initial poll +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue processes messages enqueued before listen() starts", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("preq"); + const mq = new MysqlMessageQueue(pool, { tableName }); + const controller = new AbortController(); + const received: string[] = []; + try { + // Enqueue messages BEFORE starting the listener + await mq.enqueue("pre-queued-1"); + await mq.enqueue("pre-queued-2"); + await mq.enqueue("pre-queued-3"); + + const listening = mq.listen( + (msg: string) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + + await waitFor(() => received.length >= 3, 15_000); + assert.deepStrictEqual( + new Set(received), + new Set(["pre-queued-1", "pre-queued-2", "pre-queued-3"]), + ); + controller.abort(); + await listening; + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// Delayed message delivery +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue delayed message is not delivered early", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("delay"); + const mq = new MysqlMessageQueue(pool, { + tableName, + pollInterval: { milliseconds: 200 }, + }); + const controller = new AbortController(); + const received: string[] = []; + try { + const listening = mq.listen( + (msg: string) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + + const started = Date.now(); + await mq.enqueue("immediate"); + await waitFor(() => received.length >= 1, 10_000); + + // The delayed message must not appear during the first 2 seconds + await mq.enqueue( + "delayed", + { delay: Temporal.Duration.from({ seconds: 3 }) }, + ); + await new Promise((r) => setTimeout(r, 2_000)); + assert.strictEqual( + received.length, + 1, + "delayed message must not arrive within 2 seconds", + ); + + await waitFor(() => received.length >= 2, 10_000); + assert.ok( + Date.now() - started >= 3_000, + "delayed message must arrive after at least 3 seconds", + ); + assert.strictEqual(received[1], "delayed"); + + controller.abort(); + await listening; + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// Concurrent enqueue stress test +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue handles 30 concurrent enqueue() calls", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("stress"); + const mq = new MysqlMessageQueue(pool, { + tableName, + pollInterval: { milliseconds: 200 }, + }); + const controller = new AbortController(); + const received: number[] = []; + try { + const listening = mq.listen( + (msg: number) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + + // Fire 30 enqueue() calls in parallel + await Promise.all( + Array.from({ length: 30 }, (_, i) => mq.enqueue(i)), + ); + + await waitFor(() => received.length >= 30, 30_000); + assert.deepStrictEqual( + new Set(received), + new Set(Array.from({ length: 30 }, (_, i) => i)), + ); + controller.abort(); + await listening; + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// enqueueMany() edge cases +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue.enqueueMany() with empty array is a no-op", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("emptyb"); + const mq = new MysqlMessageQueue(pool, { tableName }); + try { + // Should not throw and should not touch the DB (table not yet created) + await assert.doesNotReject(mq.enqueueMany([])); + // Table should NOT have been auto-created by an empty enqueueMany() + const [rows] = await pool.query( + `SELECT COUNT(*) AS cnt + FROM information_schema.tables + WHERE table_schema = DATABASE() AND table_name = ?`, + [tableName], + ); + assert.strictEqual( + rows[0].cnt, + 0, + "empty enqueueMany() must not create the table", + ); + } finally { + await pool.end(); + } + }, +); + +test( + "MysqlMessageQueue.enqueueMany() inserts all messages atomically", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("batcha"); + const mq = new MysqlMessageQueue(pool, { tableName }); + try { + const msgs = ["a", "b", "c", "d", "e"]; + await mq.enqueueMany(msgs); + + const [rows] = await pool.query( + `SELECT COUNT(*) AS cnt FROM \`${tableName}\``, + ); + assert.strictEqual(rows[0].cnt, msgs.length); + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +test( + "MysqlMessageQueue.enqueueMany() delivers all 100 messages via single INSERT", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("bulk"); + const mq = new MysqlMessageQueue(pool, { + tableName, + pollInterval: { milliseconds: 200 }, + }); + const controller = new AbortController(); + const received: number[] = []; + try { + const listening = mq.listen( + (msg: number) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + + const msgs = Array.from({ length: 100 }, (_, i) => i); + await mq.enqueueMany(msgs); + + await waitFor(() => received.length >= 100, 30_000); + assert.deepStrictEqual( + new Set(received), + new Set(msgs), + "all 100 messages must be delivered", + ); + controller.abort(); + await listening; + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +test( + "MysqlMessageQueue.enqueueMany() preserves insertion order for same ordering key", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("bord"); + const mq = new MysqlMessageQueue(pool, { + tableName, + pollInterval: { milliseconds: 50 }, + }); + const controller = new AbortController(); + const received: number[] = []; + try { + const listening = mq.listen( + (msg: number) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + + // All 5 messages share the same ordering key. Without per-row + // tie-breaker timestamps, all deliver_after values would be identical + // (NOW(6) is constant within a single SQL statement) and dequeue order + // would be nondeterministic. + await mq.enqueueMany([1, 2, 3, 4, 5], { orderingKey: "order-test" }); + + await waitFor(() => received.length >= 5, 10_000); + assert.deepStrictEqual( + received, + [1, 2, 3, 4, 5], + "enqueueMany() must deliver messages in insertion order for the same ordering key", + ); + controller.abort(); + await listening; + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// Handler error survival +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue listener survives handler errors", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("hderr"); + const mq = new MysqlMessageQueue(pool, { + tableName, + pollInterval: { milliseconds: 200 }, + }); + const controller = new AbortController(); + const received: string[] = []; + let calls = 0; + try { + const listening = mq.listen( + (msg: string) => { + calls++; + if (calls === 1) throw new Error("simulated handler error"); + received.push(msg); + }, + { signal: controller.signal }, + ); + + // Enqueue two messages; the first triggers an error, the second must + // still be processed. + await mq.enqueue("error-message"); + await mq.enqueue("success-message"); + + await waitFor(() => received.length >= 1, 15_000); + assert.deepStrictEqual(received, ["success-message"]); + controller.abort(); + await listening; + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// Handler timeout +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue handlerTimeout prevents hung handler from blocking queue", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("hdto"); + const mq = new MysqlMessageQueue(pool, { + tableName, + pollInterval: { milliseconds: 200 }, + // Very short timeout so the hung handler is evicted quickly in tests + handlerTimeout: { seconds: 1 }, + }); + const controller = new AbortController(); + const received: string[] = []; + let calls = 0; + try { + const listening = mq.listen( + async (msg: string) => { + calls++; + if (calls === 1) { + // Hang forever — the timeout must kick us out + await new Promise(() => {}); + } + received.push(msg); + }, + { signal: controller.signal }, + ); + + await mq.enqueue("hung-message"); + await mq.enqueue("next-message"); + + // The second message must eventually be processed even though the first + // handler hung and was timed out. + await waitFor(() => received.length >= 1, 15_000); + assert.deepStrictEqual(received, ["next-message"]); + controller.abort(); + await listening; + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +test( + "MysqlMessageQueue handlerTimeout with ordering key releases the lock", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("hdtolk"); + const mq = new MysqlMessageQueue(pool, { + tableName, + pollInterval: { milliseconds: 200 }, + handlerTimeout: { seconds: 1 }, + }); + const controller = new AbortController(); + const received: string[] = []; + let calls = 0; + try { + const listening = mq.listen( + async (msg: string) => { + calls++; + if (calls === 1) { + // Hang forever — the timeout must release the ordering-key lock + await new Promise(() => {}); + } + received.push(msg); + }, + { signal: controller.signal }, + ); + + // Both messages share the same ordering key + await mq.enqueue("key-msg-1", { orderingKey: "keyA" }); + await mq.enqueue("key-msg-2", { orderingKey: "keyA" }); + + // The second message must be processed even after the first handler times out + await waitFor(() => received.length >= 1, 15_000); + assert.deepStrictEqual(received, ["key-msg-2"]); + controller.abort(); + await listening; + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// Ordering key: advisory lock release regression +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue advisory lock is released after processing (regression for lock-leak)", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + // Use a pool with a small max to make lock leaks visible + const pool = mysql.createPool({ uri: dbUrl!, connectionLimit: 3 }); + const tableName = randomTableName("lockleak"); + const mq = new MysqlMessageQueue(pool, { + tableName, + pollInterval: { milliseconds: 200 }, + }); + const controller = new AbortController(); + const received: string[] = []; + try { + const listening = mq.listen( + (msg: string) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + + // Enqueue 5 messages with the same ordering key; they must all be + // processed in order, proving the lock is released between messages. + for (let i = 1; i <= 5; i++) { + await mq.enqueue(`ordered-${i}`, { orderingKey: "locktest" }); + } + + await waitFor(() => received.length >= 5, 30_000); + assert.deepStrictEqual(received, [ + "ordered-1", + "ordered-2", + "ordered-3", + "ordered-4", + "ordered-5", + ]); + controller.abort(); + await listening; + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +test( + "MysqlMessageQueue GET_LOCK succeeds with a very long ordering key", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + // A 200-char ordering key combined with a 20-char table name produces a + // raw lock name that is 221 chars — well over MySQL's 64-char limit. + // The lock name hashing logic must shorten it before calling GET_LOCK, + // otherwise MySQL returns an error or NULL. + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("lockname"); + const mq = new MysqlMessageQueue(pool, { + tableName, + pollInterval: { milliseconds: 200 }, + }); + const longOrderingKey = "x".repeat(200); + const controller = new AbortController(); + const received: string[] = []; + try { + const listening = mq.listen( + (msg: string) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + // Enqueue a message with the very long ordering key; if GET_LOCK were + // called with the raw (>64-char) name, MySQL would return an error or + // NULL and the message would never be processed. + await mq.enqueue("long-key-msg", { orderingKey: longOrderingKey }); + await waitFor(() => received.length >= 1, 10_000); + assert.deepStrictEqual(received, ["long-key-msg"]); + controller.abort(); + await listening; + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// Multiple workers: each message delivered exactly once +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue delivers each message to exactly one worker", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool1 = mysql.createPool(dbUrl!); + const pool2 = mysql.createPool(dbUrl!); + const tableName = randomTableName("onceonly"); + const mq1 = new MysqlMessageQueue(pool1, { + tableName, + pollInterval: { milliseconds: 200 }, + }); + const mq2 = new MysqlMessageQueue(pool2, { + tableName, + pollInterval: { milliseconds: 200 }, + }); + const controller = new AbortController(); + const received: number[] = []; + try { + const listening1 = mq1.listen( + (msg: number) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + const listening2 = mq2.listen( + (msg: number) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + + const count = 20; + for (let i = 0; i < count; i++) await mq1.enqueue(i); + + await waitFor(() => received.length >= count, 30_000); + // All messages must be received + assert.strictEqual(received.length, count); + // Each message must be received exactly once (no duplicates) + assert.deepStrictEqual( + new Set(received).size, + count, + "each message must be delivered exactly once", + ); + controller.abort(); + await listening1; + await listening2; + } finally { + await mq1.drop(); + await mq2.drop(); + await pool1.end(); + await pool2.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// Ordering key: two workers respect sequential ordering +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue two workers preserve ordering-key order", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool1 = mysql.createPool(dbUrl!); + const pool2 = mysql.createPool(dbUrl!); + const tableName = randomTableName("twowork"); + const mq1 = new MysqlMessageQueue(pool1, { + tableName, + pollInterval: { milliseconds: 200 }, + }); + const mq2 = new MysqlMessageQueue(pool2, { + tableName, + pollInterval: { milliseconds: 200 }, + }); + const controller = new AbortController(); + const received: number[] = []; + try { + const listening1 = mq1.listen( + (msg: number) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + const listening2 = mq2.listen( + (msg: number) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + + // Enqueue 10 messages in order under the same ordering key + for (let i = 1; i <= 10; i++) { + await mq1.enqueue(i, { orderingKey: "strict-order" }); + } + + await waitFor(() => received.length >= 10, 30_000); + assert.deepStrictEqual( + received, + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + "messages with the same ordering key must be delivered in order", + ); + controller.abort(); + await listening1; + await listening2; + } finally { + await mq1.drop(); + await mq2.drop(); + await pool1.end(); + await pool2.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// Abort during long poll interval resolves promptly (clearTimeout regression) +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue listen() resolves promptly when aborted during poll interval", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("clrtmo"); + // Use a very long poll interval so the test would hang if clearTimeout + // is missing: the setTimeout would keep the event loop alive and the + // listen() promise would not resolve until the timer fires. + const mq = new MysqlMessageQueue(pool, { + tableName, + pollInterval: { seconds: 60 }, + }); + const controller = new AbortController(); + try { + const listening = mq.listen(() => {}, { signal: controller.signal }); + + // Give the listener time to enter the poll-interval sleep, then abort. + await new Promise((resolve) => setTimeout(resolve, 300)); + controller.abort(); + + // listen() must resolve well within the 60-second poll interval. + const deadline = new Promise((_, reject) => + setTimeout( + () => reject(new Error("listen() did not resolve in time")), + 3_000, + ) + ); + await Promise.race([listening, deadline]); + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// getRandomKey() integration: using @fedify/testing helpers +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue works with getRandomKey() from @fedify/testing", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + // getRandomKey returns names like "fedify_test_mq_" which may contain + // hyphens — unsuitable for MySQL identifiers. Users must replace hyphens. + const rawKey = getRandomKey("mq"); + const tableName = rawKey.replace(/-/g, "_").slice(0, 46); + + const pool = mysql.createPool(dbUrl!); + const mq = new MysqlMessageQueue(pool, { tableName }); + const controller = new AbortController(); + const received: string[] = []; + try { + const listening = mq.listen( + (msg: string) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + await mq.enqueue("hello"); + await waitFor(() => received.length >= 1, 10_000); + assert.deepStrictEqual(received, ["hello"]); + controller.abort(); + await listening; + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + +// --------------------------------------------------------------------------- +// initialized option skips table creation +// --------------------------------------------------------------------------- + +test( + "MysqlMessageQueue with initialized: true skips DDL on first use", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl!); + const tableName = randomTableName("preini"); + // Create table manually first + const setupMq = new MysqlMessageQueue(pool, { tableName }); + await setupMq.initialize(); + + try { + // A second instance with initialized: true must not issue CREATE TABLE + const mq = new MysqlMessageQueue(pool, { tableName, initialized: true }); + const controller = new AbortController(); + const received: string[] = []; + const listening = mq.listen( + (msg: string) => { + received.push(msg); + }, + { signal: controller.signal }, + ); + await mq.enqueue("pre-initialized"); + await waitFor(() => received.length >= 1, 10_000); + assert.deepStrictEqual(received, ["pre-initialized"]); + controller.abort(); + await listening; + } finally { + await setupMq.drop(); + await pool.end(); + } + }, +); diff --git a/packages/mysql/src/mq.ts b/packages/mysql/src/mq.ts new file mode 100644 index 000000000..9a63f629d --- /dev/null +++ b/packages/mysql/src/mq.ts @@ -0,0 +1,614 @@ +import type { + MessageQueue, + MessageQueueEnqueueOptions, + MessageQueueListenOptions, +} from "@fedify/fedify"; +import { getLogger } from "@logtape/logtape"; +import type { Pool, PoolConnection, RowDataPacket } from "mysql2/promise"; + +const logger = getLogger(["fedify", "mysql", "mq"]); +const INITIALIZE_MAX_ATTEMPTS = 5; +const INITIALIZE_BACKOFF_MS = 10; +// Maximum number of distinct ordering-key candidates fetched per poll cycle. +// Raising this reduces the chance of missing a key under high contention, at +// the cost of a slightly larger result set from the GROUP BY query. +const ORDERING_KEY_CANDIDATE_LIMIT = 10; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function withTimeout( + result: void | Promise, + timeoutMs: number, +): Promise { + const resolved = Promise.resolve(result); + if (timeoutMs <= 0) return resolved; + let timer: ReturnType; + const timeoutPromise = new Promise((_, reject) => { + timer = setTimeout( + () => reject(new Error(`Message handler timed out after ${timeoutMs}ms`)), + timeoutMs, + ); + }); + return Promise.race([resolved, timeoutPromise]).finally(() => + clearTimeout(timer!) + ); +} + +/** + * Computes a MySQL advisory lock name for the given table name and ordering + * key. The result is always at most 64 characters, which is well within + * MySQL's advisory lock name length limit. + */ +function getMysqlLockName(tableName: string, orderingKey: string): string { + const raw = `${tableName}:${orderingKey}`; + if (raw.length <= 64) return raw; + // Use two djb2-variant hash functions to produce a 20-char collision-resistant + // name that fits within MySQL's advisory lock name length limit. + let h1 = 0; + let h2 = 5381; + for (let i = 0; i < raw.length; i++) { + const c = raw.charCodeAt(i); + h1 = (((h1 << 5) - h1) + c) | 0; + h2 = (((h2 << 5) + h2) + c) | 0; + } + return `fdy:${(h1 >>> 0).toString(16).padStart(8, "0")}${ + (h2 >>> 0).toString(16).padStart(8, "0") + }`; +} + +/** + * Options for the MySQL message queue. + * + * @since 2.1.0 + */ +export interface MysqlMessageQueueOptions { + /** + * The table name to use for the message queue. + * `"fedify_mq"` by default. + * @default `"fedify_mq"` + * @since 2.1.0 + */ + readonly tableName?: string; + + /** + * Whether the table has been initialized. `false` by default. + * @default `false` + * @since 2.1.0 + */ + readonly initialized?: boolean; + + /** + * The poll interval for the message queue. 1 second by default. + * + * Since MySQL/MariaDB has no `LISTEN`/`NOTIFY` equivalent, messages are + * discovered using periodic polling. A shorter interval reduces message + * delivery latency at the cost of additional database load. + * + * @default `{ seconds: 1 }` + * @since 2.1.0 + */ + readonly pollInterval?: Temporal.Duration | Temporal.DurationLike; + + /** + * The maximum time to wait for a message handler to complete before + * the queue moves on. This is a *soft* timeout: when a handler exceeds + * the limit, the queue stops waiting and logs a timeout error, but it + * cannot cancel the handler itself — the handler's promise continues + * running in the background. In the ordered-key path the advisory lock + * is always released in a `finally` block, so a timed-out handler will + * not permanently block the same ordering key. However, concurrent + * side-effects from the still-running handler may interleave with the + * next handler for the same key; callers should be aware of this. + * + * Set to zero to disable the timeout (not recommended in production). + * + * 60 seconds by default. + * @default `{ seconds: 60 }` + * @since 2.1.0 + */ + readonly handlerTimeout?: Temporal.Duration | Temporal.DurationLike; +} + +/** + * A message queue that uses MySQL or MariaDB as the underlying storage. + * Messages are delivered via periodic polling, since MySQL and MariaDB do not + * provide a `LISTEN`/`NOTIFY` equivalent. + * + * Concurrent workers are supported via `SELECT … FOR UPDATE SKIP LOCKED` + * (requires MySQL 8.0+ or MariaDB 10.6+) and MySQL advisory locks + * (`GET_LOCK`/`RELEASE_LOCK`) for ordering-key serialization. + * + * @example + * ```ts + * import { createFederation } from "@fedify/fedify"; + * import { MysqlKvStore, MysqlMessageQueue } from "@fedify/mysql"; + * import mysql from "mysql2/promise"; + * + * const pool = mysql.createPool("mysql://user:pass@localhost/db"); + * + * const federation = createFederation({ + * kv: new MysqlKvStore(pool), + * queue: new MysqlMessageQueue(pool), + * }); + * ``` + * + * @since 2.1.0 + */ +export class MysqlMessageQueue implements MessageQueue { + /** + * MySQL/MariaDB does not provide native retry mechanisms; Fedify handles + * retries itself. + * @since 2.1.0 + */ + readonly nativeRetrial = false; + + readonly #pool: Pool; + readonly #tableName: string; + readonly #pollIntervalMs: number; + readonly #handlerTimeoutMs: number; + #initialized: boolean; + #initPromise?: Promise; + + /** + * Creates a new MySQL message queue. + * @param pool The MySQL connection pool to use. + * @param options Options for the message queue. + * @since 2.1.0 + */ + constructor(pool: Pool, options: MysqlMessageQueueOptions = {}) { + this.#pool = pool; + const tableName = options.tableName ?? "fedify_mq"; + if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(tableName)) { + throw new RangeError( + `Invalid table name: ${JSON.stringify(tableName)}. ` + + "Table names must start with a letter or underscore and contain " + + "only letters, digits, and underscores.", + ); + } + // MySQL identifiers are limited to 64 characters. The derived index name + // is "idx__deliver_after" (18 extra chars), so the table name + // itself must be at most 46 characters long. + if (tableName.length > 46) { + throw new RangeError( + `Invalid table name: ${JSON.stringify(tableName)}. ` + + "Table names must be at most 46 characters long (MySQL identifier " + + 'limit is 64 chars; the derived index "idx__deliver_after" ' + + "uses 18 more).", + ); + } + this.#tableName = tableName; + this.#pollIntervalMs = Temporal.Duration.from( + options.pollInterval ?? { seconds: 1 }, + ).total("millisecond"); + this.#handlerTimeoutMs = Temporal.Duration.from( + options.handlerTimeout ?? { seconds: 60 }, + ).total("millisecond"); + this.#initialized = options.initialized ?? false; + } + + /** + * {@inheritDoc MessageQueue.enqueue} + * @since 2.1.0 + */ + async enqueue( + // deno-lint-ignore no-explicit-any + message: any, + options?: MessageQueueEnqueueOptions, + ): Promise { + await this.initialize(); + const delayMs = options?.delay == null + ? 0 + : Math.max(Math.round(options.delay.total("millisecond")), 0); + const orderingKey = options?.orderingKey ?? null; + if (options?.delay) { + logger.debug("Enqueuing a message with a delay of {delayMs}ms...", { + delayMs, + message, + orderingKey, + }); + } else { + logger.debug("Enqueuing a message...", { message, orderingKey }); + } + await this.#pool.query( + `INSERT INTO \`${this.#tableName}\` + (\`id\`, \`message\`, \`deliver_after\`, \`ordering_key\`) + VALUES ( + UUID(), + ?, + DATE_ADD(NOW(6), INTERVAL ? MICROSECOND), + ? + )`, + [JSON.stringify(message), delayMs * 1000, orderingKey], + ); + logger.debug("Enqueued a message.", { message, orderingKey }); + } + + /** + * {@inheritDoc MessageQueue.enqueueMany} + * @since 2.1.0 + */ + async enqueueMany( + // deno-lint-ignore no-explicit-any + messages: readonly any[], + options?: MessageQueueEnqueueOptions, + ): Promise { + if (messages.length === 0) return; + await this.initialize(); + const delayMs = options?.delay == null + ? 0 + : Math.max(Math.round(options.delay.total("millisecond")), 0); + const orderingKey = options?.orderingKey ?? null; + if (options?.delay) { + logger.debug( + "Enqueuing {count} messages with a delay of {delayMs}ms...", + { count: messages.length, delayMs, orderingKey }, + ); + } else { + logger.debug("Enqueuing {count} messages...", { + count: messages.length, + orderingKey, + }); + } + const placeholders = messages.map(() => + "(UUID(), ?, DATE_ADD(NOW(6), INTERVAL ? MICROSECOND), ?)" + ).join(", "); + // Each row gets a monotonically increasing interval so that messages + // sharing the same orderingKey are assigned distinct deliver_after values + // within a single INSERT statement. Without this tie-breaker, all rows + // would receive the same NOW(6) timestamp (MySQL evaluates NOW(6) once per + // statement), making dequeue order nondeterministic for ordered keys. + const values = messages.flatMap((message, index) => [ + JSON.stringify(message), + delayMs * 1000 + index, + orderingKey, + ]); + let conn: PoolConnection | undefined; + try { + conn = await this.#pool.getConnection(); + await conn.beginTransaction(); + await conn.query( + `INSERT INTO \`${this.#tableName}\` + (\`id\`, \`message\`, \`deliver_after\`, \`ordering_key\`) + VALUES ${placeholders}`, + values, + ); + await conn.commit(); + } catch (e) { + if (conn != null) await conn.rollback(); + throw e; + } finally { + conn?.release(); + } + logger.debug("Enqueued {count} messages.", { + count: messages.length, + orderingKey, + }); + } + + /** + * {@inheritDoc MessageQueue.listen} + * @since 2.1.0 + */ + async listen( + // deno-lint-ignore no-explicit-any + handler: (message: any) => void | Promise, + options: MessageQueueListenOptions = {}, + ): Promise { + await this.initialize(); + const { signal } = options; + + const poll = async () => { + while (!signal?.aborted) { + let processed = false; + + // Step 1: Try to process messages without an ordering key first. + // These don't need advisory locks — FOR UPDATE SKIP LOCKED is + // sufficient to prevent two workers from processing the same message. + const noKeyMsg = await this.#dequeueWithoutOrderingKey(); + if (noKeyMsg !== undefined) { + if (signal?.aborted) return; + await withTimeout(handler(noKeyMsg), this.#handlerTimeoutMs); + processed = true; + continue; + } + + // Step 2: Try to process messages with an ordering key. + // MySQL advisory locks (GET_LOCK / RELEASE_LOCK) ensure that only + // one worker processes each ordering key at a time, providing + // sequential processing guarantees. + // + // IMPORTANT: GET_LOCK / RELEASE_LOCK are session-level in MySQL, i.e. + // they are tied to a specific connection. We therefore use a + // dedicated connection (pool.getConnection()) for the entire + // lock → dequeue → handler → unlock sequence so that the lock and + // unlock are guaranteed to execute on the same connection. + // + // We fetch up to ORDERING_KEY_CANDIDATE_LIMIT distinct ordering keys + // in a single GROUP BY query and iterate over them in memory, avoiding + // the O(n) NOT IN list that would grow with each failed lock attempt. + const candidates = await this.#findOrderingKeyCandidates(); + for (const orderingKey of candidates) { + if (signal?.aborted) break; + const lockName = getMysqlLockName(this.#tableName, orderingKey); + + let conn: PoolConnection | undefined; + try { + conn = await this.#pool.getConnection(); + const [lockResult] = await conn.query( + `SELECT GET_LOCK(?, 0) AS acquired`, + [lockName], + ); + if (lockResult[0].acquired === null) { + // GET_LOCK() returns NULL when an error occurred (e.g. the + // server ran out of lock resources). Log a warning and skip + // this ordering key so the worker can still process others. + logger.warn( + "GET_LOCK({lockName}) returned NULL (server error); " + + "skipping ordering key {orderingKey}.", + { lockName, orderingKey }, + ); + } else if (lockResult[0].acquired === 1) { + try { + const msg = await this.#dequeueOrderedMessage( + conn, + orderingKey, + ); + if (msg !== undefined) { + if (signal?.aborted) return; + await withTimeout(handler(msg), this.#handlerTimeoutMs); + processed = true; + } + } finally { + // Always release the advisory lock on the SAME connection + await conn.query(`SELECT RELEASE_LOCK(?)`, [lockName]); + } + if (processed) break; + } + // acquired === 0: lock not acquired → try next ordering key + } finally { + conn?.release(); + } + } + + if (!processed) break; + } + }; + + const safePoll = async (trigger: string) => { + try { + await poll(); + } catch (error) { + logger.error( + "Error while polling for messages ({trigger}); " + + "will retry on next poll: {error}", + { trigger, error }, + ); + } + }; + + // Immediately process any messages that were enqueued before listen() was + // called, so that pre-queued messages are not delayed by the first + // poll interval. + await safePoll("initial"); + + while (!signal?.aborted) { + await new Promise((resolve) => { + const timeoutId = setTimeout(() => { + signal?.removeEventListener("abort", onAbort); + resolve(0); + }, this.#pollIntervalMs); + function onAbort() { + clearTimeout(timeoutId); + resolve(undefined); + } + signal?.addEventListener("abort", onAbort, { once: true }); + }); + if (signal?.aborted) break; + await safePoll("interval"); + } + } + + /** + * Atomically dequeues the oldest ready message that has no ordering key, + * using `FOR UPDATE SKIP LOCKED` within a transaction. + * Returns `undefined` when no such message is available. + */ + async #dequeueWithoutOrderingKey(): Promise { + let conn: PoolConnection | undefined; + try { + conn = await this.#pool.getConnection(); + await conn.beginTransaction(); + const [rows] = await conn.query( + `SELECT \`id\`, \`message\` + FROM \`${this.#tableName}\` + WHERE \`deliver_after\` <= NOW(6) AND \`ordering_key\` IS NULL + ORDER BY \`deliver_after\` + LIMIT 1 + FOR UPDATE SKIP LOCKED`, + ); + if (rows.length === 0) { + await conn.rollback(); + return undefined; + } + const { id, message } = rows[0]; + await conn.query( + `DELETE FROM \`${this.#tableName}\` WHERE \`id\` = ?`, + [id], + ); + await conn.commit(); + return message; + } catch (e) { + if (conn != null) await conn.rollback(); + throw e; + } finally { + conn?.release(); + } + } + + /** + * Returns up to {@link ORDERING_KEY_CANDIDATE_LIMIT} distinct ordering keys + * that have at least one ready message, ordered by their earliest + * `deliver_after`. Fetching a batch in one query avoids the growing + * `NOT IN (…)` list that results from repeatedly calling a single-result + * version after each failed lock attempt. + */ + async #findOrderingKeyCandidates(): Promise { + const [rows] = await this.#pool.query( + `SELECT \`ordering_key\` + FROM \`${this.#tableName}\` + WHERE \`deliver_after\` <= NOW(6) AND \`ordering_key\` IS NOT NULL + GROUP BY \`ordering_key\` + ORDER BY MIN(\`deliver_after\`) + LIMIT ?`, + [ORDERING_KEY_CANDIDATE_LIMIT], + ); + return rows.map((r) => r.ordering_key as string); + } + + /** + * Dequeues the oldest ready message for the given ordering key using + * the supplied (dedicated) connection. The caller MUST hold the advisory + * lock for `orderingKey` before calling this method. + * Returns `undefined` when no ready message exists for the ordering key. + */ + async #dequeueOrderedMessage( + conn: PoolConnection, + orderingKey: string, + ): Promise { + await conn.beginTransaction(); + try { + const [rows] = await conn.query( + `SELECT \`id\`, \`message\` + FROM \`${this.#tableName}\` + WHERE \`deliver_after\` <= NOW(6) AND \`ordering_key\` = ? + ORDER BY \`deliver_after\` + LIMIT 1`, + [orderingKey], + ); + if (rows.length === 0) { + await conn.rollback(); + return undefined; + } + const { id, message } = rows[0]; + await conn.query( + `DELETE FROM \`${this.#tableName}\` WHERE \`id\` = ?`, + [id], + ); + await conn.commit(); + return message; + } catch (e) { + await conn.rollback(); + throw e; + } + } + + /** + * Initializes the message queue table if it does not already exist. + * Concurrent calls are coalesced — only one initialization runs at a time. + * + * @since 2.1.0 + */ + initialize(): Promise { + if (this.#initialized) return Promise.resolve(); + return (this.#initPromise ??= this.#doInitialize()); + } + + async #doInitialize(): Promise { + logger.debug("Initializing the message queue table {tableName}...", { + tableName: this.#tableName, + }); + for (let attempt = 1; attempt <= INITIALIZE_MAX_ATTEMPTS; attempt++) { + try { + await this.#pool.query( + `CREATE TABLE IF NOT EXISTS \`${this.#tableName}\` ( + \`id\` CHAR(36) NOT NULL, + \`message\` JSON NOT NULL, + \`deliver_after\` DATETIME(6) NOT NULL DEFAULT NOW(6), + \`ordering_key\` TEXT NULL DEFAULT NULL, + PRIMARY KEY (\`id\`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin`, + ); + try { + await this.#pool.query( + `CREATE INDEX \`idx_${this.#tableName}_deliver_after\` + ON \`${this.#tableName}\` (\`deliver_after\`)`, + ); + } catch (e) { + // Ignore duplicate index (ER_DUP_KEYNAME) from concurrent init + if ((e as { code?: string }).code !== "ER_DUP_KEYNAME") throw e; + } + try { + // Composite index to speed up #findOrderingKeyCandidates(): + // scans for the oldest ready message per ordering key. + // ordering_key uses a 766-character prefix because TEXT columns + // require a prefix length for indexing, and InnoDB's 3072-byte key + // limit in utf8mb4 (4 bytes/char) leaves room for 766 chars once the + // DATETIME(6) companion column (8 bytes) is included: + // 766 * 4 + 8 = 3072 bytes exactly. + await this.#pool.query( + `CREATE INDEX \`idx_${this.#tableName}_ok_da\` + ON \`${this.#tableName}\` (\`ordering_key\`(766), \`deliver_after\`)`, + ); + } catch (e) { + // Ignore duplicate index (ER_DUP_KEYNAME) from concurrent init + if ((e as { code?: string }).code !== "ER_DUP_KEYNAME") throw e; + } + break; + } catch (error) { + if (attempt >= INITIALIZE_MAX_ATTEMPTS) { + logger.error( + "Failed to initialize the message queue table: {error}", + { error }, + ); + throw error; + } + const backoffMs = INITIALIZE_BACKOFF_MS * 2 ** (attempt - 1); + logger.debug( + "Initialization race for table {tableName}; " + + "retrying in {backoffMs}ms " + + "(attempt {attempt}/{maxAttempts}).", + { + tableName: this.#tableName, + backoffMs, + attempt, + maxAttempts: INITIALIZE_MAX_ATTEMPTS, + error, + }, + ); + await sleep(backoffMs); + } + } + this.#initialized = true; + logger.debug("Initialized the message queue table {tableName}.", { + tableName: this.#tableName, + }); + } + + /** + * Drops the message queue table if it exists. Resets the initialized flag + * so that {@link MysqlMessageQueue.initialize} can recreate the table on + * the next call. + * + * @since 2.1.0 + */ + async drop(): Promise { + // Wait for any in-flight initialization to complete before dropping the + // table. Without this, drop() could finish before #doInitialize() sets + // #initialized = true, leaving the instance in a state where it believes + // the table exists even though it has already been dropped. + if (this.#initPromise != null) { + try { + await this.#initPromise; + } catch { + // Ignore initialization errors — we are dropping the table anyway. + } + } + await this.#pool.query( + `DROP TABLE IF EXISTS \`${this.#tableName}\``, + ); + this.#initialized = false; + this.#initPromise = undefined; + } +} diff --git a/packages/mysql/tsdown.config.ts b/packages/mysql/tsdown.config.ts index 825a0b114..8152c110c 100644 --- a/packages/mysql/tsdown.config.ts +++ b/packages/mysql/tsdown.config.ts @@ -1,7 +1,7 @@ import { defineConfig } from "tsdown"; export default defineConfig({ - entry: ["src/mod.ts", "src/kv.ts"], + entry: ["src/mod.ts", "src/kv.ts", "src/mq.ts"], dts: { compilerOptions: { isolatedDeclarations: true, declaration: true } }, unbundle: true, format: ["esm", "cjs"],