From 269664b7c6312c3946127d357ae5d94f7ec77b8d Mon Sep 17 00:00:00 2001 From: Kim Joar Bekkelund Date: Tue, 28 Mar 2023 21:28:00 +0200 Subject: [PATCH] fix: flush is broken when ordering enabled and multiple batches present --- src/publisher/message-queues.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/publisher/message-queues.ts b/src/publisher/message-queues.ts index 16e9e39a9..8b006123f 100644 --- a/src/publisher/message-queues.ts +++ b/src/publisher/message-queues.ts @@ -303,12 +303,12 @@ export class OrderedQueue extends MessageQueue { /** * Starts a timeout to publish any pending messages. */ - beginNextPublish(): void { + beginNextPublish(callback?: PublishDone): void { const maxMilliseconds = this.batchOptions.maxMilliseconds!; const timeWaiting = Date.now() - this.currentBatch.created; const delay = Math.max(0, maxMilliseconds - timeWaiting); - this.pending = setTimeout(() => this.publish(), delay); + this.pending = setTimeout(() => this.publish(callback), delay); } /** * Creates a new {@link MessageBatch} instance. @@ -361,7 +361,7 @@ export class OrderedQueue extends MessageQueue { this.handlePublishFailure(err); definedCallback(err); } else if (this.batches.length) { - this.beginNextPublish(); + this.beginNextPublish(callback); } else { this.emit('drain'); definedCallback(null);