Skip to content

Commit 5c094fa

Browse files
committed
bug fix
don't process the same job twice record next expiration after the loop
1 parent 1db5ed0 commit 5c094fa

File tree

1 file changed

+28
-7
lines changed

1 file changed

+28
-7
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ void extendExpiration(Instant now) {
108108
public int compareTo(ExtensionJob other) {
109109
return expiration.compareTo(other.expiration);
110110
}
111+
112+
public String toString() {
113+
ArrayList<String> ackIds = new ArrayList<>();
114+
for (AckHandler ah : ackHandlers) {
115+
ackIds.add(ah.ackId);
116+
}
117+
return String.format("ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}", expiration, nextExtensionSeconds, ackIds);
118+
}
111119
}
112120

113121
/** Stores the data needed to asynchronously modify acknowledgement deadlines. */
@@ -128,6 +136,10 @@ static class PendingModifyAckDeadline {
128136
public void addAckId(String ackId) {
129137
ackIds.add(ackId);
130138
}
139+
140+
public String toString() {
141+
return String.format("extend %d sec: %s", deadlineExtensionSeconds, ackIds);
142+
}
131143
}
132144

133145
/**
@@ -255,10 +267,6 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
255267
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
256268
}
257269
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
258-
synchronized (outstandingAckHandlers) {
259-
outstandingAckHandlers.add(
260-
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
261-
}
262270
logger.debug("Received {} messages at {}", responseMessages.size(), now);
263271
setupNextAckDeadlineExtensionAlarm(expiration);
264272

@@ -275,6 +283,12 @@ public void run() {
275283
}
276284
});
277285
}
286+
287+
synchronized (outstandingAckHandlers) {
288+
outstandingAckHandlers.add(
289+
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
290+
}
291+
278292
try {
279293
flowController.reserve(receivedMessagesCount, totalByteCount);
280294
} catch (FlowController.FlowControlException unexpectedException) {
@@ -339,10 +353,11 @@ public void run() {
339353
Instant nextScheduleExpiration = null;
340354
List<PendingModifyAckDeadline> modifyAckDeadlinesToSend = new ArrayList<>();
341355

356+
// Holding area for jobs we'll put back into the queue
357+
// so we don't process the same job twice.
358+
List<ExtensionJob> renewJobs = new ArrayList<>();
359+
342360
synchronized (outstandingAckHandlers) {
343-
if (!outstandingAckHandlers.isEmpty()) {
344-
nextScheduleExpiration = outstandingAckHandlers.peek().expiration;
345-
}
346361
while (!outstandingAckHandlers.isEmpty()
347362
&& outstandingAckHandlers.peek().expiration.compareTo(cutOverTime) <= 0) {
348363
ExtensionJob job = outstandingAckHandlers.poll();
@@ -371,8 +386,14 @@ public void run() {
371386
pendingModAckDeadline.addAckId(ackHandler.ackId);
372387
}
373388
modifyAckDeadlinesToSend.add(pendingModAckDeadline);
389+
renewJobs.add(job);
390+
}
391+
for (ExtensionJob job : renewJobs) {
374392
outstandingAckHandlers.add(job);
375393
}
394+
if (!outstandingAckHandlers.isEmpty()) {
395+
nextScheduleExpiration = outstandingAckHandlers.peek().expiration;
396+
}
376397
}
377398

378399
processOutstandingAckOperations(modifyAckDeadlinesToSend);

0 commit comments

Comments
 (0)