Skip to content
This repository was archived by the owner on Mar 17, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
/build/
system-test/secrets.js
system-test/*key.json
samples/**/build
*.lock
.DS_Store
package-lock.json
Expand Down
4 changes: 2 additions & 2 deletions src/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export class Publisher {
const flushResolver = () => {
resolve();

// flush() maybe called more than once, so remove these
// flush() may be called more than once, so remove these
// event listeners after we've completed flush().
q.removeListener('drain', flushResolver);
};
Expand All @@ -129,7 +129,7 @@ export class Publisher {
);

const allPublishes = Promise.all(
toDrain.map(q => promisify(q.publish).bind(q)())
toDrain.map(q => promisify(q.publishDrain).bind(q)())
);

allPublishes
Expand Down
53 changes: 49 additions & 4 deletions src/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,23 @@ export abstract class MessageQueue extends EventEmitter {
* @param {PublishCallback} callback The publish callback.
*/
abstract add(message: PubsubMessage, callback: PublishCallback): void;

/**
* Method to initiate publishing.
* Method to initiate publishing. Full drain behaviour depends on whether the
* queues are ordered or not.
*
* @abstract
*/
abstract publish(): void;

/**
* Method to finalize publishing. Does as many publishes as are needed
* to finish emptying the queues, and fires a drain event afterward.
*
* @abstract
*/
abstract publishDrain(): void;

/**
* Accepts a batch of messages and publishes them to the API.
*
Expand Down Expand Up @@ -156,12 +167,33 @@ export class Queue extends MessageQueue {
this.pending = setTimeout(() => this.publish(), maxMilliseconds!);
}
}

/**
* Cancels any pending publishes and calls _publish immediately.
*
* _Does_ attempt to further drain after one batch is sent.
*
* @emits Queue#drain when all messages are sent.
*/
publishDrain(callback?: PublishDone): void {
this._publishInternal(true, callback);
}

/**
* Cancels any pending publishes and calls _publish immediately.
*
* Does _not_ attempt to further drain after one batch is sent.
*/
publish(callback?: PublishDone): void {
this._publishInternal(false, callback);
}

/**
* Cancels any pending publishes and calls _publish immediately.
*
* @emits Queue#drain when all messages are sent.
*/
_publishInternal(fullyDrain: boolean, callback?: PublishDone): void {
const definedCallback = callback || (() => {});
const {messages, callbacks} = this.batch;

Expand All @@ -176,8 +208,12 @@ export class Queue extends MessageQueue {
if (err) {
definedCallback(err);
} else if (this.batch.messages.length) {
// Make another go-around, we're trying to drain the queues fully.
this.publish(callback);
// We only do the indefinite go-arounds when we're trying to do a
// final drain for flush(). In all other cases, we want to leave
// subsequent batches alone so that they can time out as needed.
if (fullyDrain) {
this._publishInternal(true, callback);
}
} else {
this.emit('drain');
definedCallback(null);
Expand Down Expand Up @@ -279,7 +315,7 @@ export class OrderedQueue extends MessageQueue {
*
* @returns {MessageBatch}
*/
createBatch() {
createBatch(): MessageBatch {
return new MessageBatch(this.batchOptions);
}
/**
Expand Down Expand Up @@ -333,6 +369,15 @@ export class OrderedQueue extends MessageQueue {
});
}

/**
* For ordered queues, this does exactly the same thing as `publish()`.
*
* @fires OrderedQueue#drain
*/
publishDrain(callback?: PublishDone): void {
this.publish(callback);
}

/**
* Tells the queue it is ok to continue publishing messages.
*/
Expand Down
6 changes: 6 additions & 0 deletions test/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class FakeQueue extends EventEmitter {
publish(callback: (err: Error | null) => void) {
this._publish([], [], callback);
}
publishDrain(callback: (err: Error | null) => void) {
this.publish(callback);
}
_publish(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
messages: p.PubsubMessage[],
Expand All @@ -85,6 +88,9 @@ class FakeOrderedQueue extends FakeQueue {
publish(callback: (err: Error | null) => void) {
this._publish([], [], callback);
}
publishDrain(callback: (err: Error | null) => void) {
this.publish(callback);
}
_publish(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
messages: p.PubsubMessage[],
Expand Down
71 changes: 71 additions & 0 deletions test/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ class FakeMessageBatch {
created: number;
messages: p.PubsubMessage[];
options: b.BatchPublishOptions;
bytes: number;
constructor(options = {} as b.BatchPublishOptions) {
this.callbacks = [];
this.created = Date.now();
this.messages = [];
this.options = options;
this.bytes = 0;
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
add(message: p.PubsubMessage, callback: p.PublishCallback): void {}
Expand Down Expand Up @@ -332,6 +334,75 @@ describe('Message Queues', () => {
assert.strictEqual(messages, batch.messages);
assert.strictEqual(callbacks, batch.callbacks);
});

describe('publish chaining', () => {
let fakeMessages: p.PubsubMessage[];
let spies: p.PublishCallback[];
beforeEach(() => {
fakeMessages = [{}, {}] as p.PubsubMessage[];
spies = [sandbox.spy(), sandbox.spy()] as p.PublishCallback[];
});

it('should begin another publish(drain) if there are pending batches', () => {
const stub = sandbox.stub(queue, '_publish');
let once = false;
stub.callsFake((m, c, done) => {
if (!once) {
// Drop in a second batch before calling the callback.
const secondBatch = new FakeMessageBatch();
secondBatch.messages = fakeMessages;
secondBatch.callbacks = spies;
queue.batch = secondBatch;
}
once = true;

done!(null);
});

queue.batch = new FakeMessageBatch();
queue.batch.messages = fakeMessages;
queue.batch.callbacks = spies;
queue.publishDrain();

assert.strictEqual(stub.callCount, 2);
});

it('should not begin another publish(non-drain) if there are pending batches', () => {
const stub = sandbox.stub(queue, '_publish');
let once = false;
stub.callsFake((m, c, done) => {
if (!once) {
// Drop in a second batch before calling the callback.
const secondBatch = new FakeMessageBatch();
secondBatch.messages = fakeMessages;
secondBatch.callbacks = spies;
queue.batch = secondBatch;
}
once = true;

done!(null);
});

queue.batch = new FakeMessageBatch();
queue.batch.messages = fakeMessages;
queue.batch.callbacks = spies;
queue.publish();

assert.strictEqual(stub.callCount, 1);
});

it('should emit "drain" if there is nothing left to publish', () => {
const spy = sandbox.spy();
sandbox
.stub(queue, '_publish')
.callsFake((m, c, done) => done!(null));

queue.on('drain', spy);
queue.publish();

assert.strictEqual(spy.callCount, 1);
});
});
});
});

Expand Down