Skip to content

Commit 02c617e

Browse files
committed
pubsub: acquire FlowController before releasing
1 parent 6de41d5 commit 02c617e

File tree

1 file changed

+7
-8
lines changed

1 file changed

+7
-8
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,7 @@ public int getMessageDeadlineSeconds() {
260260
}
261261

262262
public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> responseMessages) {
263-
int receivedMessagesCount = responseMessages.size();
264-
if (receivedMessagesCount == 0) {
263+
if (responseMessages.size() == 0) {
265264
return;
266265
}
267266
Instant now = new Instant(clock.millisTime());
@@ -276,7 +275,13 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
276275
logger.log(
277276
Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now});
278277

278+
try {
279+
flowController.reserve(responseMessages.size(), totalByteCount);
280+
} catch (FlowController.FlowControlException unexpectedException) {
281+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
282+
}
279283
messagesWaiter.incrementPendingMessages(responseMessages.size());
284+
280285
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
281286
for (ReceivedMessage userMessage : responseMessages) {
282287
final PubsubMessage message = userMessage.getMessage();
@@ -308,12 +313,6 @@ public void run() {
308313
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
309314
}
310315
setupNextAckDeadlineExtensionAlarm(expiration);
311-
312-
try {
313-
flowController.reserve(receivedMessagesCount, totalByteCount);
314-
} catch (FlowController.FlowControlException unexpectedException) {
315-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
316-
}
317316
}
318317

319318
private void setupPendingAcksAlarm() {

0 commit comments

Comments
 (0)