From 1db5ed0684d7992478dc33ac380610bfe75b9ba8 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 19 Jan 2017 23:15:21 +1100 Subject: [PATCH 1/3] fix improper use of iteration The previous implementation modifies a HashMap while iterating through its key set, causeing unpredictable iteration behavior. This might be the reason our tests intermittently deadlock. The new implementation uses a PriorityQueue. The time complexity is O(M * log N), where M is the number of expirations before the cut over time and N is the total number of expirations. I am not sure how this compares to O(N) intended in the previous implementation. If required, O(N) is also possible using an ArrayList. Unfortunately, a new failure has emerged. Instead of deadlocking, testModifyAckDeadline intermittently fails. Maybe - I have fixed the old bug and created a new one, - I have fixed the old bug that was masking another one, - The deadlock wasn't caused by the iteration. Now the tests just fail before they could deadlock, or some combination thereof. The incorrect iteration should be fixed regardless. --- .../pubsub/spi/v1/MessageDispatcher.java | 125 ++++++++---------- .../pubsub/testing/LocalPubSubHelper.java | 2 - 2 files changed, 54 insertions(+), 73 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 83fca0296fdf..232b20687762 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -28,12 +28,11 @@ import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -68,8 +67,7 @@ class MessageDispatcher { private final FlowController flowController; private final MessagesWaiter messagesWaiter; - // Map of outstanding messages (value) ordered by expiration time (key) in ascending order. - private final Map> outstandingAckHandlers; + private final PriorityQueue outstandingAckHandlers; private final Set pendingAcks; private final Set pendingNacks; @@ -82,39 +80,32 @@ class MessageDispatcher { // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; - private static class ExpirationInfo implements Comparable { - private final Clock clock; + // ExtensionJob represents a group of {@code AckHandler}s that shares the same expiration. + // + // It is Comparable so that it may be put in a PriorityQueue. + // For efficiency, it is also mutable, so great care should be taken to make sure + // it is not modified while inside the queue. + // The hashcode and equals methods are explicitly not implemented to discourage + // the use of this class as keys in maps or similar containers. + private static class ExtensionJob implements Comparable { Instant expiration; int nextExtensionSeconds; + ArrayList ackHandlers; - ExpirationInfo(Clock clock, Instant expiration, int initialAckDeadlineExtension) { - this.clock = clock; + ExtensionJob( + Instant expiration, int initialAckDeadlineExtension, ArrayList ackHandlers) { this.expiration = expiration; nextExtensionSeconds = initialAckDeadlineExtension; + this.ackHandlers = ackHandlers; } - void extendExpiration() { - expiration = new Instant(clock.millis()).plus(Duration.standardSeconds(nextExtensionSeconds)); + void extendExpiration(Instant now) { + expiration = now.plus(Duration.standardSeconds(nextExtensionSeconds)); nextExtensionSeconds = Math.min(2 * nextExtensionSeconds, MAX_ACK_DEADLINE_EXTENSION_SECS); } @Override - public int hashCode() { - return expiration.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof ExpirationInfo)) { - return false; - } - - ExpirationInfo other = (ExpirationInfo) obj; - return expiration.equals(other.expiration); - } - - @Override - public int compareTo(ExpirationInfo other) { + public int compareTo(ExtensionJob other) { return expiration.compareTo(other.expiration); } } @@ -217,7 +208,7 @@ void sendAckOperations( this.receiver = receiver; this.ackProcessor = ackProcessor; this.flowController = flowController; - outstandingAckHandlers = new HashMap<>(); + outstandingAckHandlers = new PriorityQueue<>(); pendingAcks = new HashSet<>(); pendingNacks = new HashSet<>(); // 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS @@ -257,17 +248,16 @@ public void processReceivedMessages(List r } Instant now = new Instant(clock.millis()); int totalByteCount = 0; - final List ackHandlers = new ArrayList<>(responseMessages.size()); + final ArrayList ackHandlers = new ArrayList<>(responseMessages.size()); for (ReceivedMessage pubsubMessage : responseMessages) { int messageSize = pubsubMessage.getMessage().getSerializedSize(); totalByteCount += messageSize; ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize)); } - ExpirationInfo expiration = - new ExpirationInfo( - clock, now.plus(messageDeadlineSeconds * 1000), INITIAL_ACK_DEADLINE_EXTENSION_SECONDS); + Instant expiration = now.plus(messageDeadlineSeconds * 1000); synchronized (outstandingAckHandlers) { - addOutstadingAckHandlers(expiration, ackHandlers); + outstandingAckHandlers.add( + new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers)); } logger.debug("Received {} messages at {}", responseMessages.size(), now); setupNextAckDeadlineExtensionAlarm(expiration); @@ -292,14 +282,6 @@ public void run() { } } - private void addOutstadingAckHandlers( - ExpirationInfo expiration, final List ackHandlers) { - if (!outstandingAckHandlers.containsKey(expiration)) { - outstandingAckHandlers.put(expiration, new ArrayList(ackHandlers.size())); - } - outstandingAckHandlers.get(expiration).addAll(ackHandlers); - } - private void setupPendingAcksAlarm() { alarmsLock.lock(); try { @@ -354,41 +336,42 @@ public void run() { now, cutOverTime, ackExpirationPadding); - ExpirationInfo nextScheduleExpiration = null; + Instant nextScheduleExpiration = null; List modifyAckDeadlinesToSend = new ArrayList<>(); synchronized (outstandingAckHandlers) { - for (ExpirationInfo messageExpiration : outstandingAckHandlers.keySet()) { - if (messageExpiration.expiration.compareTo(cutOverTime) <= 0) { - Collection expiringAcks = outstandingAckHandlers.get(messageExpiration); - outstandingAckHandlers.remove(messageExpiration); - List renewedAckHandlers = new ArrayList<>(expiringAcks.size()); - messageExpiration.extendExpiration(); - int extensionSeconds = - Ints.saturatedCast( - new Interval(now, messageExpiration.expiration) - .toDuration() - .getStandardSeconds()); - PendingModifyAckDeadline pendingModAckDeadline = - new PendingModifyAckDeadline(extensionSeconds); - for (AckHandler ackHandler : expiringAcks) { - if (ackHandler.acked.get()) { - continue; - } - pendingModAckDeadline.addAckId(ackHandler.ackId); - renewedAckHandlers.add(ackHandler); - } - modifyAckDeadlinesToSend.add(pendingModAckDeadline); - if (!renewedAckHandlers.isEmpty()) { - addOutstadingAckHandlers(messageExpiration, renewedAckHandlers); + if (!outstandingAckHandlers.isEmpty()) { + nextScheduleExpiration = outstandingAckHandlers.peek().expiration; + } + while (!outstandingAckHandlers.isEmpty() + && outstandingAckHandlers.peek().expiration.compareTo(cutOverTime) <= 0) { + ExtensionJob job = outstandingAckHandlers.poll(); + + // If a message has already been acked, remove it, nothing to do. + for (int i = 0; i < job.ackHandlers.size(); ) { + if (job.ackHandlers.get(i).acked.get()) { + Collections.swap(job.ackHandlers, i, job.ackHandlers.size() - 1); + job.ackHandlers.remove(job.ackHandlers.size() - 1); } else { - outstandingAckHandlers.remove(messageExpiration); + i++; } } - if (nextScheduleExpiration == null - || nextScheduleExpiration.expiration.isAfter(messageExpiration.expiration)) { - nextScheduleExpiration = messageExpiration; + + if (job.ackHandlers.isEmpty()) { + continue; + } + + job.extendExpiration(now); + int extensionSeconds = + Ints.saturatedCast( + new Interval(now, job.expiration).toDuration().getStandardSeconds()); + PendingModifyAckDeadline pendingModAckDeadline = + new PendingModifyAckDeadline(extensionSeconds); + for (AckHandler ackHandler : job.ackHandlers) { + pendingModAckDeadline.addAckId(ackHandler.ackId); } + modifyAckDeadlinesToSend.add(pendingModAckDeadline); + outstandingAckHandlers.add(job); } } @@ -404,8 +387,8 @@ public void run() { } } - private void setupNextAckDeadlineExtensionAlarm(ExpirationInfo messageExpiration) { - Instant possibleNextAlarmTime = messageExpiration.expiration.minus(ackExpirationPadding); + private void setupNextAckDeadlineExtensionAlarm(Instant expiration) { + Instant possibleNextAlarmTime = expiration.minus(ackExpirationPadding); alarmsLock.lock(); try { if (nextAckDeadlineExtensionAlarmTime.isAfter(possibleNextAlarmTime)) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubSubHelper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubSubHelper.java index 347bee595af2..b99eb2ab2885 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubSubHelper.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubSubHelper.java @@ -135,9 +135,7 @@ public void reset() throws IOException { */ @Override public void stop(Duration timeout) throws IOException, InterruptedException, TimeoutException { - System.err.println("sending"); sendPostRequest("/shutdown"); - System.err.println("sent"); waitForProcess(timeout); } } From 5c094fa78e7e83a96ed9ea40516598c2fe298cf3 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Fri, 20 Jan 2017 15:00:48 +1100 Subject: [PATCH 2/3] bug fix don't process the same job twice record next expiration after the loop --- .../pubsub/spi/v1/MessageDispatcher.java | 35 +++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 232b20687762..cb96fee25cdd 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -108,6 +108,14 @@ void extendExpiration(Instant now) { public int compareTo(ExtensionJob other) { return expiration.compareTo(other.expiration); } + + public String toString() { + ArrayList ackIds = new ArrayList<>(); + for (AckHandler ah : ackHandlers) { + ackIds.add(ah.ackId); + } + return String.format("ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}", expiration, nextExtensionSeconds, ackIds); + } } /** Stores the data needed to asynchronously modify acknowledgement deadlines. */ @@ -128,6 +136,10 @@ static class PendingModifyAckDeadline { public void addAckId(String ackId) { ackIds.add(ackId); } + + public String toString() { + return String.format("extend %d sec: %s", deadlineExtensionSeconds, ackIds); + } } /** @@ -255,10 +267,6 @@ public void processReceivedMessages(List r ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize)); } Instant expiration = now.plus(messageDeadlineSeconds * 1000); - synchronized (outstandingAckHandlers) { - outstandingAckHandlers.add( - new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers)); - } logger.debug("Received {} messages at {}", responseMessages.size(), now); setupNextAckDeadlineExtensionAlarm(expiration); @@ -275,6 +283,12 @@ public void run() { } }); } + + synchronized (outstandingAckHandlers) { + outstandingAckHandlers.add( + new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers)); + } + try { flowController.reserve(receivedMessagesCount, totalByteCount); } catch (FlowController.FlowControlException unexpectedException) { @@ -339,10 +353,11 @@ public void run() { Instant nextScheduleExpiration = null; List modifyAckDeadlinesToSend = new ArrayList<>(); + // Holding area for jobs we'll put back into the queue + // so we don't process the same job twice. + List renewJobs = new ArrayList<>(); + synchronized (outstandingAckHandlers) { - if (!outstandingAckHandlers.isEmpty()) { - nextScheduleExpiration = outstandingAckHandlers.peek().expiration; - } while (!outstandingAckHandlers.isEmpty() && outstandingAckHandlers.peek().expiration.compareTo(cutOverTime) <= 0) { ExtensionJob job = outstandingAckHandlers.poll(); @@ -371,8 +386,14 @@ public void run() { pendingModAckDeadline.addAckId(ackHandler.ackId); } modifyAckDeadlinesToSend.add(pendingModAckDeadline); + renewJobs.add(job); + } + for (ExtensionJob job : renewJobs) { outstandingAckHandlers.add(job); } + if (!outstandingAckHandlers.isEmpty()) { + nextScheduleExpiration = outstandingAckHandlers.peek().expiration; + } } processOutstandingAckOperations(modifyAckDeadlinesToSend); From 0aea0fc5fd02c9f8065913b5fa8ec1d39c0aae0a Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Sun, 22 Jan 2017 19:38:03 +1100 Subject: [PATCH 3/3] PR comments --- .../cloud/pubsub/spi/v1/MessageDispatcher.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index cb96fee25cdd..799770bad761 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -114,7 +114,9 @@ public String toString() { for (AckHandler ah : ackHandlers) { ackIds.add(ah.ackId); } - return String.format("ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}", expiration, nextExtensionSeconds, ackIds); + return String.format( + "ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}", + expiration, nextExtensionSeconds, ackIds); } } @@ -138,7 +140,9 @@ public void addAckId(String ackId) { } public String toString() { - return String.format("extend %d sec: %s", deadlineExtensionSeconds, ackIds); + return String.format( + "PendingModifyAckDeadline{extension: %d sec, ackIds: %s}", + deadlineExtensionSeconds, ackIds); } } @@ -284,6 +288,11 @@ public void run() { }); } + // There is a race condition. setupNextAckDeadlineExtensionAlarm might set + // an alarm that fires before this block can run. + // The fix is to move setup below this block, but doing so aggravates another + // race condition. + // TODO(pongad): Fix both races. synchronized (outstandingAckHandlers) { outstandingAckHandlers.add( new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));