Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 84 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ A lib for rabbit and sqs queues

Requires `amqplib` to be installed separately. If you only need RabbitMQ support you can avoid also needing to install `@aws-sdk` packages by importing from `"@loke/queue-kit/rabbit"`.

Handling a work queue:
### Handling a work queue

```ts
import { RabbitHelper } from "@loke/queue-kit"; // or "@loke/queue-kit/rabbit"
Expand All @@ -26,13 +26,23 @@ async function main() {

const aborter = new AbortController();

const doneP = await rabbitHelper.handleQueue({
queueName: "work-queue",
handler: async (msg) => {
await doWork(msg.body);
},
signal: aborter.signal,
});
const doneP = (async () => {
// We loop here in case the rabbit channel closes and we need to restart
// the queue handler.
while (!aborter.signal.aborted) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to include a sleep interval or a retry limit in the sample code here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine without

try {
await rabbitHelper.handleQueue({
queueName: "work-queue",
handler: async (msg) => {
await doWork(msg.body);
},
signal: aborter.signal,
});
} catch (err) {
logger.error("Error with spend tracker queue", err);
}
}
})();

await stopSignal();

Expand All @@ -44,7 +54,11 @@ async function main() {
main();
```

**Breaking change in 2.x:** `assertWorkQueue` now requires a `retryDelay` option. This is the delay between retries when a message fails to be processed. To achieve this a dead letter queue is created and attached to the work queue (via the default direct exchange). Because old queues can't be changed via assertQueue, a new will need to be created.
**Breaking change in 2.x:** `assertWorkQueue` now requires a `retryDelay`
option. This is the delay between retries when a message fails to be processed.
To achieve this a dead letter queue is created and attached to the work queue
(via the default direct exchange). Because old queues can't be changed via
assertQueue, a new one will need to be created.

```ts
await rabbitHelper.assertWorkQueue("new-queue", { retryDelay: 1000 });
Expand All @@ -71,7 +85,67 @@ const doneP = await Promise.all([
]);
```

Publishing events:
### Handling a subscription queue

```ts
import { RabbitHelper } from "@loke/queue-kit"; // or "@loke/queue-kit/rabbit"
import amqp from "amqplib"; // must be installed separately

async function main() {
const amqpConnection = await amqp.connect("amqp://localhost");

const rabbitHelper = new RabbitHelper({
amqpConnection,
logger: console,
});

await rabbitHelper.assertExchange();

const aborter = new AbortController();

const doneP = (async () => {
// We loop here in case the rabbit channel closes and we need to restart
// the queue handler.
//
// Because this is a subscription queue, we need to create a new queue each
// time we start the handler. This is because the queue will be deleted when
// the channel closes. If the resource the queue is updating could have
// become stale while the channel was closed, we need to handle that too.
//
// In this example we update a cache with the latest value from the queue.
// If we loose our our connection to the queue, we need to clear the cache
// and start again.
while (!aborter.signal.aborted) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question around retry limits / sleeps in the loop as the other comment

const { queue } = await rabbitHelper.createSubscriptionQueue();
await rabbitHelper.bindQueue(queue, "thing.*");

await clearCache();

try {
await rabbitHelper.handleQueue({
queueName: queue,
handler: async (msg) => {
await updateCache(msg.body);
},
signal: aborter.signal,
});
} catch (err) {
logger.error("Error with spend tracker queue", err);
}
}
})();

await stopSignal();

aborter.abort();

await doneP;
}

main();
```

### Publishing events

```ts
import { RabbitHelper } from "@loke/queue-kit"; // or "@loke/queue-kit/rabbit"
Expand Down
64 changes: 7 additions & 57 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"@types/node": "^20.16.3",
"@typescript-eslint/eslint-plugin": "^5.19.0",
"@typescript-eslint/parser": "^5.19.0",
"amqplib": "^0.10.4",
"amqplib": "^0.10.7",
"ava": "^6.1.3",
"eslint": "^8.13.0",
"eslint-config-prettier": "^8.5.0",
Expand Down
9 changes: 0 additions & 9 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,3 @@ export interface Logger {
}

export type MessageHandler<T> = (message: T) => Promise<void>;

export interface AbortSignal {
aborted: boolean;
addEventListener: (
event: "abort",
handler: () => void,
opts: { once: boolean }
) => void;
}
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export { RabbitData, RabbitHelper } from "./rabbit";
export { SQSData, SQSHelper } from "./sqs";
export { Logger, MessageHandler, AbortSignal } from "./common";
export { Logger, MessageHandler } from "./common";
export { registerMetrics } from "./metrics";
36 changes: 28 additions & 8 deletions src/rabbit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import assert from "assert";
import util from "util";
import { ulid } from "ulid";

import { AbortSignal, Logger, MessageHandler } from "./common";
import { Logger, MessageHandler } from "./common";
import {
messagesReceivedCounter,
messagesFailedCounter,
Expand Down Expand Up @@ -124,11 +124,23 @@ export class RabbitHelper {
const inProgress = new Set<Promise<void>>();
const ch = await this.amqpConn.createChannel();

let chClosed = false;
const consumerAbort = new AbortController();
const onClose = () => {
chClosed = true;
consumerAbort.abort();
};
ch.once("close", onClose);

try {
await ch.prefetch(args.maxConcurrent || 20);

const { consumerTag } = await ch.consume(args.queueName, async (msg) => {
if (!msg) return;
// msg is null when the consumer is cancelled
if (!msg) {
consumerAbort.abort();
return;
}

messagesReceivedCounter.inc({ queue: args.queueName });
const end = messageHandlerDuration.startTimer({
Expand Down Expand Up @@ -182,19 +194,27 @@ export class RabbitHelper {
inProgress.add(task);
});

if (!args.signal.aborted) {
await new Promise<void>((resolve) => {
args.signal.addEventListener("abort", () => resolve(), {
const raceSignal = AbortSignal.any([args.signal, consumerAbort.signal]);

if (!raceSignal.aborted) {
await new Promise((resolve) => {
raceSignal.addEventListener("abort", resolve, {
once: true,
});
});
}

await ch.cancel(consumerTag);
if (!chClosed) {
await ch.cancel(consumerTag);

await Promise.all(inProgress);
await Promise.all(inProgress);
}
} finally {
await ch.close();
ch.removeListener("close", onClose);

if (!chClosed) {
await ch.close();
}
}
}

Expand Down