From b90214b0fa571b60ef86f32eda43d179f3db9219 Mon Sep 17 00:00:00 2001 From: Megan Potter Date: Fri, 3 Mar 2023 17:13:29 -0500 Subject: [PATCH] fix: don't do multiple drains per publish() in message queues unless requested --- .gitignore | 1 + src/publisher/index.ts | 4 +- src/publisher/message-queues.ts | 53 ++++++++++++++++++++++-- test/publisher/index.ts | 6 +++ test/publisher/message-queues.ts | 71 ++++++++++++++++++++++++++++++++ 5 files changed, 129 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index d4f03a0df..a8d155cd1 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ /build/ system-test/secrets.js system-test/*key.json +samples/**/build *.lock .DS_Store package-lock.json diff --git a/src/publisher/index.ts b/src/publisher/index.ts index 1a545784e..dfe340da8 100644 --- a/src/publisher/index.ts +++ b/src/publisher/index.ts @@ -119,7 +119,7 @@ export class Publisher { const flushResolver = () => { resolve(); - // flush() maybe called more than once, so remove these + // flush() may be called more than once, so remove these // event listeners after we've completed flush(). q.removeListener('drain', flushResolver); }; @@ -129,7 +129,7 @@ export class Publisher { ); const allPublishes = Promise.all( - toDrain.map(q => promisify(q.publish).bind(q)()) + toDrain.map(q => promisify(q.publishDrain).bind(q)()) ); allPublishes diff --git a/src/publisher/message-queues.ts b/src/publisher/message-queues.ts index 4d367b8c4..16e9e39a9 100644 --- a/src/publisher/message-queues.ts +++ b/src/publisher/message-queues.ts @@ -66,12 +66,23 @@ export abstract class MessageQueue extends EventEmitter { * @param {PublishCallback} callback The publish callback. */ abstract add(message: PubsubMessage, callback: PublishCallback): void; + /** - * Method to initiate publishing. + * Method to initiate publishing. Full drain behaviour depends on whether the + * queues are ordered or not. * * @abstract */ abstract publish(): void; + + /** + * Method to finalize publishing. Does as many publishes as are needed + * to finish emptying the queues, and fires a drain event afterward. + * + * @abstract + */ + abstract publishDrain(): void; + /** * Accepts a batch of messages and publishes them to the API. * @@ -156,12 +167,33 @@ export class Queue extends MessageQueue { this.pending = setTimeout(() => this.publish(), maxMilliseconds!); } } + /** * Cancels any pending publishes and calls _publish immediately. * + * _Does_ attempt to further drain after one batch is sent. + * * @emits Queue#drain when all messages are sent. */ + publishDrain(callback?: PublishDone): void { + this._publishInternal(true, callback); + } + + /** + * Cancels any pending publishes and calls _publish immediately. + * + * Does _not_ attempt to further drain after one batch is sent. + */ publish(callback?: PublishDone): void { + this._publishInternal(false, callback); + } + + /** + * Cancels any pending publishes and calls _publish immediately. + * + * @emits Queue#drain when all messages are sent. + */ + _publishInternal(fullyDrain: boolean, callback?: PublishDone): void { const definedCallback = callback || (() => {}); const {messages, callbacks} = this.batch; @@ -176,8 +208,12 @@ export class Queue extends MessageQueue { if (err) { definedCallback(err); } else if (this.batch.messages.length) { - // Make another go-around, we're trying to drain the queues fully. - this.publish(callback); + // We only do the indefinite go-arounds when we're trying to do a + // final drain for flush(). In all other cases, we want to leave + // subsequent batches alone so that they can time out as needed. + if (fullyDrain) { + this._publishInternal(true, callback); + } } else { this.emit('drain'); definedCallback(null); @@ -279,7 +315,7 @@ export class OrderedQueue extends MessageQueue { * * @returns {MessageBatch} */ - createBatch() { + createBatch(): MessageBatch { return new MessageBatch(this.batchOptions); } /** @@ -333,6 +369,15 @@ export class OrderedQueue extends MessageQueue { }); } + /** + * For ordered queues, this does exactly the same thing as `publish()`. + * + * @fires OrderedQueue#drain + */ + publishDrain(callback?: PublishDone): void { + this.publish(callback); + } + /** * Tells the queue it is ok to continue publishing messages. */ diff --git a/test/publisher/index.ts b/test/publisher/index.ts index 151d1b96e..82ba5d618 100644 --- a/test/publisher/index.ts +++ b/test/publisher/index.ts @@ -63,6 +63,9 @@ class FakeQueue extends EventEmitter { publish(callback: (err: Error | null) => void) { this._publish([], [], callback); } + publishDrain(callback: (err: Error | null) => void) { + this.publish(callback); + } _publish( // eslint-disable-next-line @typescript-eslint/no-unused-vars messages: p.PubsubMessage[], @@ -85,6 +88,9 @@ class FakeOrderedQueue extends FakeQueue { publish(callback: (err: Error | null) => void) { this._publish([], [], callback); } + publishDrain(callback: (err: Error | null) => void) { + this.publish(callback); + } _publish( // eslint-disable-next-line @typescript-eslint/no-unused-vars messages: p.PubsubMessage[], diff --git a/test/publisher/message-queues.ts b/test/publisher/message-queues.ts index 7ea076477..eda38f1fa 100644 --- a/test/publisher/message-queues.ts +++ b/test/publisher/message-queues.ts @@ -53,11 +53,13 @@ class FakeMessageBatch { created: number; messages: p.PubsubMessage[]; options: b.BatchPublishOptions; + bytes: number; constructor(options = {} as b.BatchPublishOptions) { this.callbacks = []; this.created = Date.now(); this.messages = []; this.options = options; + this.bytes = 0; } // eslint-disable-next-line @typescript-eslint/no-unused-vars add(message: p.PubsubMessage, callback: p.PublishCallback): void {} @@ -332,6 +334,75 @@ describe('Message Queues', () => { assert.strictEqual(messages, batch.messages); assert.strictEqual(callbacks, batch.callbacks); }); + + describe('publish chaining', () => { + let fakeMessages: p.PubsubMessage[]; + let spies: p.PublishCallback[]; + beforeEach(() => { + fakeMessages = [{}, {}] as p.PubsubMessage[]; + spies = [sandbox.spy(), sandbox.spy()] as p.PublishCallback[]; + }); + + it('should begin another publish(drain) if there are pending batches', () => { + const stub = sandbox.stub(queue, '_publish'); + let once = false; + stub.callsFake((m, c, done) => { + if (!once) { + // Drop in a second batch before calling the callback. + const secondBatch = new FakeMessageBatch(); + secondBatch.messages = fakeMessages; + secondBatch.callbacks = spies; + queue.batch = secondBatch; + } + once = true; + + done!(null); + }); + + queue.batch = new FakeMessageBatch(); + queue.batch.messages = fakeMessages; + queue.batch.callbacks = spies; + queue.publishDrain(); + + assert.strictEqual(stub.callCount, 2); + }); + + it('should not begin another publish(non-drain) if there are pending batches', () => { + const stub = sandbox.stub(queue, '_publish'); + let once = false; + stub.callsFake((m, c, done) => { + if (!once) { + // Drop in a second batch before calling the callback. + const secondBatch = new FakeMessageBatch(); + secondBatch.messages = fakeMessages; + secondBatch.callbacks = spies; + queue.batch = secondBatch; + } + once = true; + + done!(null); + }); + + queue.batch = new FakeMessageBatch(); + queue.batch.messages = fakeMessages; + queue.batch.callbacks = spies; + queue.publish(); + + assert.strictEqual(stub.callCount, 1); + }); + + it('should emit "drain" if there is nothing left to publish', () => { + const spy = sandbox.spy(); + sandbox + .stub(queue, '_publish') + .callsFake((m, c, done) => done!(null)); + + queue.on('drain', spy); + queue.publish(); + + assert.strictEqual(spy.callCount, 1); + }); + }); }); });