diff --git a/README.md b/README.md index d251dd3..29abd6b 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ A lib for rabbit and sqs queues Requires `amqplib` to be installed separately. If you only need RabbitMQ support you can avoid also needing to install `@aws-sdk` packages by importing from `"@loke/queue-kit/rabbit"`. -Handling a work queue: +### Handling a work queue ```ts import { RabbitHelper } from "@loke/queue-kit"; // or "@loke/queue-kit/rabbit" @@ -26,13 +26,23 @@ async function main() { const aborter = new AbortController(); - const doneP = await rabbitHelper.handleQueue({ - queueName: "work-queue", - handler: async (msg) => { - await doWork(msg.body); - }, - signal: aborter.signal, - }); + const doneP = (async () => { + // We loop here in case the rabbit channel closes and we need to restart + // the queue handler. + while (!aborter.signal.aborted) { + try { + await rabbitHelper.handleQueue({ + queueName: "work-queue", + handler: async (msg) => { + await doWork(msg.body); + }, + signal: aborter.signal, + }); + } catch (err) { + logger.error("Error with spend tracker queue", err); + } + } + })(); await stopSignal(); @@ -44,7 +54,11 @@ async function main() { main(); ``` -**Breaking change in 2.x:** `assertWorkQueue` now requires a `retryDelay` option. This is the delay between retries when a message fails to be processed. To achieve this a dead letter queue is created and attached to the work queue (via the default direct exchange). Because old queues can't be changed via assertQueue, a new will need to be created. +**Breaking change in 2.x:** `assertWorkQueue` now requires a `retryDelay` +option. This is the delay between retries when a message fails to be processed. +To achieve this a dead letter queue is created and attached to the work queue +(via the default direct exchange). Because old queues can't be changed via +assertQueue, a new one will need to be created. ```ts await rabbitHelper.assertWorkQueue("new-queue", { retryDelay: 1000 }); @@ -71,7 +85,67 @@ const doneP = await Promise.all([ ]); ``` -Publishing events: +### Handling a subscription queue + +```ts +import { RabbitHelper } from "@loke/queue-kit"; // or "@loke/queue-kit/rabbit" +import amqp from "amqplib"; // must be installed separately + +async function main() { + const amqpConnection = await amqp.connect("amqp://localhost"); + + const rabbitHelper = new RabbitHelper({ + amqpConnection, + logger: console, + }); + + await rabbitHelper.assertExchange(); + + const aborter = new AbortController(); + + const doneP = (async () => { + // We loop here in case the rabbit channel closes and we need to restart + // the queue handler. + // + // Because this is a subscription queue, we need to create a new queue each + // time we start the handler. This is because the queue will be deleted when + // the channel closes. If the resource the queue is updating could have + // become stale while the channel was closed, we need to handle that too. + // + // In this example we update a cache with the latest value from the queue. + // If we loose our our connection to the queue, we need to clear the cache + // and start again. + while (!aborter.signal.aborted) { + const { queue } = await rabbitHelper.createSubscriptionQueue(); + await rabbitHelper.bindQueue(queue, "thing.*"); + + await clearCache(); + + try { + await rabbitHelper.handleQueue({ + queueName: queue, + handler: async (msg) => { + await updateCache(msg.body); + }, + signal: aborter.signal, + }); + } catch (err) { + logger.error("Error with spend tracker queue", err); + } + } + })(); + + await stopSignal(); + + aborter.abort(); + + await doneP; +} + +main(); +``` + +### Publishing events ```ts import { RabbitHelper } from "@loke/queue-kit"; // or "@loke/queue-kit/rabbit" diff --git a/package-lock.json b/package-lock.json index 5443af6..b4c343b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,7 +17,7 @@ "@types/node": "^20.16.3", "@typescript-eslint/eslint-plugin": "^5.19.0", "@typescript-eslint/parser": "^5.19.0", - "amqplib": "^0.10.4", + "amqplib": "^0.10.7", "ava": "^6.1.3", "eslint": "^8.13.0", "eslint-config-prettier": "^8.5.0", @@ -31,20 +31,6 @@ "node": ">=18.0.0" } }, - "node_modules/@acuminous/bitsyntax": { - "version": "0.1.2", - "resolved": "https://registry.npmjs.org/@acuminous/bitsyntax/-/bitsyntax-0.1.2.tgz", - "integrity": "sha512-29lUK80d1muEQqiUsSo+3A0yP6CdspgC95EnKBMi22Xlwt79i/En4Vr67+cXhU+cZjbti3TgGGC5wy1stIywVQ==", - "dev": true, - "dependencies": { - "buffer-more-ints": "~1.0.0", - "debug": "^4.3.4", - "safe-buffer": "~5.1.2" - }, - "engines": { - "node": ">=0.8" - } - }, "node_modules/@aws-crypto/sha256-browser": { "version": "5.2.0", "resolved": "https://registry.npmjs.org/@aws-crypto/sha256-browser/-/sha256-browser-5.2.0.tgz", @@ -2190,14 +2176,13 @@ } }, "node_modules/amqplib": { - "version": "0.10.4", - "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.4.tgz", - "integrity": "sha512-DMZ4eCEjAVdX1II2TfIUpJhfKAuoCeDIo/YyETbfAqehHTXxxs7WOOd+N1Xxr4cKhx12y23zk8/os98FxlZHrw==", + "version": "0.10.7", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.7.tgz", + "integrity": "sha512-7xPSYKSX2kj/bT6iHZ3MlctzxdCW1Ds9xyN0EmuRi2DZxHztwwoG1YkZrgmLyuPNjfxlRiMdWJPQscmoa3Vgdg==", "dev": true, + "license": "MIT", "dependencies": { - "@acuminous/bitsyntax": "^0.1.2", "buffer-more-ints": "~1.0.0", - "readable-stream": "1.x >=1.1.9", "url-parse": "~1.5.10" }, "engines": { @@ -2531,7 +2516,8 @@ "version": "1.0.0", "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", "integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==", - "dev": true + "dev": true, + "license": "MIT" }, "node_modules/callsites": { "version": "4.2.0", @@ -2770,12 +2756,6 @@ "node": "^12.20.0 || ^14.13.1 || >=16.0.0" } }, - "node_modules/core-util-is": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.3.tgz", - "integrity": "sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ==", - "dev": true - }, "node_modules/create-require": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", @@ -3770,12 +3750,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/isarray": { - "version": "0.0.1", - "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", - "integrity": "sha1-ihis/Kmo9Bd+Cav8YDiTmwXR7t8=", - "dev": true - }, "node_modules/isexe": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/isexe/-/isexe-2.0.0.tgz", @@ -4365,18 +4339,6 @@ } ] }, - "node_modules/readable-stream": { - "version": "1.1.14", - "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", - "integrity": "sha1-fPTFTvZI44EwhMY23SB54WbAgdk=", - "dev": true, - "dependencies": { - "core-util-is": "~1.0.0", - "inherits": "~2.0.1", - "isarray": "0.0.1", - "string_decoder": "~0.10.x" - } - }, "node_modules/regexpp": { "version": "3.2.0", "resolved": "https://registry.npmjs.org/regexpp/-/regexpp-3.2.0.tgz", @@ -4483,12 +4445,6 @@ "queue-microtask": "^1.2.2" } }, - "node_modules/safe-buffer": { - "version": "5.1.2", - "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", - "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", - "dev": true - }, "node_modules/semver": { "version": "7.3.6", "resolved": "https://registry.npmjs.org/semver/-/semver-7.3.6.tgz", @@ -4616,12 +4572,6 @@ "node": ">=8" } }, - "node_modules/string_decoder": { - "version": "0.10.31", - "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", - "integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=", - "dev": true - }, "node_modules/string-width": { "version": "7.2.0", "resolved": "https://registry.npmjs.org/string-width/-/string-width-7.2.0.tgz", diff --git a/package.json b/package.json index 67730a8..20a0984 100644 --- a/package.json +++ b/package.json @@ -51,7 +51,7 @@ "@types/node": "^20.16.3", "@typescript-eslint/eslint-plugin": "^5.19.0", "@typescript-eslint/parser": "^5.19.0", - "amqplib": "^0.10.4", + "amqplib": "^0.10.7", "ava": "^6.1.3", "eslint": "^8.13.0", "eslint-config-prettier": "^8.5.0", diff --git a/src/common.ts b/src/common.ts index 40629c5..14362c7 100644 --- a/src/common.ts +++ b/src/common.ts @@ -3,12 +3,3 @@ export interface Logger { } export type MessageHandler = (message: T) => Promise; - -export interface AbortSignal { - aborted: boolean; - addEventListener: ( - event: "abort", - handler: () => void, - opts: { once: boolean } - ) => void; -} diff --git a/src/index.ts b/src/index.ts index 383e180..788fd9b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,4 @@ export { RabbitData, RabbitHelper } from "./rabbit"; export { SQSData, SQSHelper } from "./sqs"; -export { Logger, MessageHandler, AbortSignal } from "./common"; +export { Logger, MessageHandler } from "./common"; export { registerMetrics } from "./metrics"; diff --git a/src/rabbit.ts b/src/rabbit.ts index a56ddab..a771a7c 100644 --- a/src/rabbit.ts +++ b/src/rabbit.ts @@ -3,7 +3,7 @@ import assert from "assert"; import util from "util"; import { ulid } from "ulid"; -import { AbortSignal, Logger, MessageHandler } from "./common"; +import { Logger, MessageHandler } from "./common"; import { messagesReceivedCounter, messagesFailedCounter, @@ -124,11 +124,23 @@ export class RabbitHelper { const inProgress = new Set>(); const ch = await this.amqpConn.createChannel(); + let chClosed = false; + const consumerAbort = new AbortController(); + const onClose = () => { + chClosed = true; + consumerAbort.abort(); + }; + ch.once("close", onClose); + try { await ch.prefetch(args.maxConcurrent || 20); const { consumerTag } = await ch.consume(args.queueName, async (msg) => { - if (!msg) return; + // msg is null when the consumer is cancelled + if (!msg) { + consumerAbort.abort(); + return; + } messagesReceivedCounter.inc({ queue: args.queueName }); const end = messageHandlerDuration.startTimer({ @@ -182,19 +194,27 @@ export class RabbitHelper { inProgress.add(task); }); - if (!args.signal.aborted) { - await new Promise((resolve) => { - args.signal.addEventListener("abort", () => resolve(), { + const raceSignal = AbortSignal.any([args.signal, consumerAbort.signal]); + + if (!raceSignal.aborted) { + await new Promise((resolve) => { + raceSignal.addEventListener("abort", resolve, { once: true, }); }); } - await ch.cancel(consumerTag); + if (!chClosed) { + await ch.cancel(consumerTag); - await Promise.all(inProgress); + await Promise.all(inProgress); + } } finally { - await ch.close(); + ch.removeListener("close", onClose); + + if (!chClosed) { + await ch.close(); + } } }