2828import com .google .pubsub .v1 .PubsubMessage ;
2929import com .google .pubsub .v1 .ReceivedMessage ;
3030import java .util .ArrayList ;
31- import java .util .Collection ;
32- import java .util .HashMap ;
31+ import java .util .Collections ;
3332import java .util .HashSet ;
3433import java .util .Iterator ;
3534import java .util .List ;
36- import java .util .Map ;
35+ import java .util .PriorityQueue ;
3736import java .util .Set ;
3837import java .util .concurrent .ScheduledExecutorService ;
3938import java .util .concurrent .ScheduledFuture ;
@@ -68,8 +67,7 @@ class MessageDispatcher {
6867 private final FlowController flowController ;
6968 private final MessagesWaiter messagesWaiter ;
7069
71- // Map of outstanding messages (value) ordered by expiration time (key) in ascending order.
72- private final Map <ExpirationInfo , List <AckHandler >> outstandingAckHandlers ;
70+ private final PriorityQueue <ExtensionJob > outstandingAckHandlers ;
7371 private final Set <String > pendingAcks ;
7472 private final Set <String > pendingNacks ;
7573
@@ -82,40 +80,43 @@ class MessageDispatcher {
8280 // To keep track of number of seconds the receiver takes to process messages.
8381 private final Distribution ackLatencyDistribution ;
8482
85- private static class ExpirationInfo implements Comparable <ExpirationInfo > {
86- private final Clock clock ;
83+ // ExtensionJob represents a group of {@code AckHandler}s that shares the same expiration.
84+ //
85+ // It is Comparable so that it may be put in a PriorityQueue.
86+ // For efficiency, it is also mutable, so great care should be taken to make sure
87+ // it is not modified while inside the queue.
88+ // The hashcode and equals methods are explicitly not implemented to discourage
89+ // the use of this class as keys in maps or similar containers.
90+ private static class ExtensionJob implements Comparable <ExtensionJob > {
8791 Instant expiration ;
8892 int nextExtensionSeconds ;
93+ ArrayList <AckHandler > ackHandlers ;
8994
90- ExpirationInfo ( Clock clock , Instant expiration , int initialAckDeadlineExtension ) {
91- this . clock = clock ;
95+ ExtensionJob (
96+ Instant expiration , int initialAckDeadlineExtension , ArrayList < AckHandler > ackHandlers ) {
9297 this .expiration = expiration ;
9398 nextExtensionSeconds = initialAckDeadlineExtension ;
99+ this .ackHandlers = ackHandlers ;
94100 }
95101
96- void extendExpiration () {
97- expiration = new Instant ( clock . millis ()) .plus (Duration .standardSeconds (nextExtensionSeconds ));
102+ void extendExpiration (Instant now ) {
103+ expiration = now .plus (Duration .standardSeconds (nextExtensionSeconds ));
98104 nextExtensionSeconds = Math .min (2 * nextExtensionSeconds , MAX_ACK_DEADLINE_EXTENSION_SECS );
99105 }
100106
101107 @ Override
102- public int hashCode ( ) {
103- return expiration .hashCode ( );
108+ public int compareTo ( ExtensionJob other ) {
109+ return expiration .compareTo ( other . expiration );
104110 }
105111
106- @ Override
107- public boolean equals ( Object obj ) {
108- if (!( obj instanceof ExpirationInfo ) ) {
109- return false ;
112+ public String toString () {
113+ ArrayList < String > ackIds = new ArrayList <>();
114+ for ( AckHandler ah : ackHandlers ) {
115+ ackIds . add ( ah . ackId ) ;
110116 }
111-
112- ExpirationInfo other = (ExpirationInfo ) obj ;
113- return expiration .equals (other .expiration );
114- }
115-
116- @ Override
117- public int compareTo (ExpirationInfo other ) {
118- return expiration .compareTo (other .expiration );
117+ return String .format (
118+ "ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}" ,
119+ expiration , nextExtensionSeconds , ackIds );
119120 }
120121 }
121122
@@ -137,6 +138,12 @@ static class PendingModifyAckDeadline {
137138 public void addAckId (String ackId ) {
138139 ackIds .add (ackId );
139140 }
141+
142+ public String toString () {
143+ return String .format (
144+ "PendingModifyAckDeadline{extension: %d sec, ackIds: %s}" ,
145+ deadlineExtensionSeconds , ackIds );
146+ }
140147 }
141148
142149 /**
@@ -217,7 +224,7 @@ void sendAckOperations(
217224 this .receiver = receiver ;
218225 this .ackProcessor = ackProcessor ;
219226 this .flowController = flowController ;
220- outstandingAckHandlers = new HashMap <>();
227+ outstandingAckHandlers = new PriorityQueue <>();
221228 pendingAcks = new HashSet <>();
222229 pendingNacks = new HashSet <>();
223230 // 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
@@ -257,18 +264,13 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
257264 }
258265 Instant now = new Instant (clock .millis ());
259266 int totalByteCount = 0 ;
260- final List <AckHandler > ackHandlers = new ArrayList <>(responseMessages .size ());
267+ final ArrayList <AckHandler > ackHandlers = new ArrayList <>(responseMessages .size ());
261268 for (ReceivedMessage pubsubMessage : responseMessages ) {
262269 int messageSize = pubsubMessage .getMessage ().getSerializedSize ();
263270 totalByteCount += messageSize ;
264271 ackHandlers .add (new AckHandler (pubsubMessage .getAckId (), messageSize ));
265272 }
266- ExpirationInfo expiration =
267- new ExpirationInfo (
268- clock , now .plus (messageDeadlineSeconds * 1000 ), INITIAL_ACK_DEADLINE_EXTENSION_SECONDS );
269- synchronized (outstandingAckHandlers ) {
270- addOutstadingAckHandlers (expiration , ackHandlers );
271- }
273+ Instant expiration = now .plus (messageDeadlineSeconds * 1000 );
272274 logger .debug ("Received {} messages at {}" , responseMessages .size (), now );
273275 setupNextAckDeadlineExtensionAlarm (expiration );
274276
@@ -285,21 +287,24 @@ public void run() {
285287 }
286288 });
287289 }
290+
291+ // There is a race condition. setupNextAckDeadlineExtensionAlarm might set
292+ // an alarm that fires before this block can run.
293+ // The fix is to move setup below this block, but doing so aggravates another
294+ // race condition.
295+ // TODO(pongad): Fix both races.
296+ synchronized (outstandingAckHandlers ) {
297+ outstandingAckHandlers .add (
298+ new ExtensionJob (expiration , INITIAL_ACK_DEADLINE_EXTENSION_SECONDS , ackHandlers ));
299+ }
300+
288301 try {
289302 flowController .reserve (receivedMessagesCount , totalByteCount );
290303 } catch (FlowController .FlowControlException unexpectedException ) {
291304 throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
292305 }
293306 }
294307
295- private void addOutstadingAckHandlers (
296- ExpirationInfo expiration , final List <AckHandler > ackHandlers ) {
297- if (!outstandingAckHandlers .containsKey (expiration )) {
298- outstandingAckHandlers .put (expiration , new ArrayList <AckHandler >(ackHandlers .size ()));
299- }
300- outstandingAckHandlers .get (expiration ).addAll (ackHandlers );
301- }
302-
303308 private void setupPendingAcksAlarm () {
304309 alarmsLock .lock ();
305310 try {
@@ -354,41 +359,49 @@ public void run() {
354359 now ,
355360 cutOverTime ,
356361 ackExpirationPadding );
357- ExpirationInfo nextScheduleExpiration = null ;
362+ Instant nextScheduleExpiration = null ;
358363 List <PendingModifyAckDeadline > modifyAckDeadlinesToSend = new ArrayList <>();
359364
365+ // Holding area for jobs we'll put back into the queue
366+ // so we don't process the same job twice.
367+ List <ExtensionJob > renewJobs = new ArrayList <>();
368+
360369 synchronized (outstandingAckHandlers ) {
361- for (ExpirationInfo messageExpiration : outstandingAckHandlers .keySet ()) {
362- if (messageExpiration .expiration .compareTo (cutOverTime ) <= 0 ) {
363- Collection <AckHandler > expiringAcks = outstandingAckHandlers .get (messageExpiration );
364- outstandingAckHandlers .remove (messageExpiration );
365- List <AckHandler > renewedAckHandlers = new ArrayList <>(expiringAcks .size ());
366- messageExpiration .extendExpiration ();
367- int extensionSeconds =
368- Ints .saturatedCast (
369- new Interval (now , messageExpiration .expiration )
370- .toDuration ()
371- .getStandardSeconds ());
372- PendingModifyAckDeadline pendingModAckDeadline =
373- new PendingModifyAckDeadline (extensionSeconds );
374- for (AckHandler ackHandler : expiringAcks ) {
375- if (ackHandler .acked .get ()) {
376- continue ;
377- }
378- pendingModAckDeadline .addAckId (ackHandler .ackId );
379- renewedAckHandlers .add (ackHandler );
380- }
381- modifyAckDeadlinesToSend .add (pendingModAckDeadline );
382- if (!renewedAckHandlers .isEmpty ()) {
383- addOutstadingAckHandlers (messageExpiration , renewedAckHandlers );
370+ while (!outstandingAckHandlers .isEmpty ()
371+ && outstandingAckHandlers .peek ().expiration .compareTo (cutOverTime ) <= 0 ) {
372+ ExtensionJob job = outstandingAckHandlers .poll ();
373+
374+ // If a message has already been acked, remove it, nothing to do.
375+ for (int i = 0 ; i < job .ackHandlers .size (); ) {
376+ if (job .ackHandlers .get (i ).acked .get ()) {
377+ Collections .swap (job .ackHandlers , i , job .ackHandlers .size () - 1 );
378+ job .ackHandlers .remove (job .ackHandlers .size () - 1 );
384379 } else {
385- outstandingAckHandlers . remove ( messageExpiration ) ;
380+ i ++ ;
386381 }
387382 }
388- if (nextScheduleExpiration == null
389- || nextScheduleExpiration .expiration .isAfter (messageExpiration .expiration )) {
390- nextScheduleExpiration = messageExpiration ;
383+
384+ if (job .ackHandlers .isEmpty ()) {
385+ continue ;
386+ }
387+
388+ job .extendExpiration (now );
389+ int extensionSeconds =
390+ Ints .saturatedCast (
391+ new Interval (now , job .expiration ).toDuration ().getStandardSeconds ());
392+ PendingModifyAckDeadline pendingModAckDeadline =
393+ new PendingModifyAckDeadline (extensionSeconds );
394+ for (AckHandler ackHandler : job .ackHandlers ) {
395+ pendingModAckDeadline .addAckId (ackHandler .ackId );
391396 }
397+ modifyAckDeadlinesToSend .add (pendingModAckDeadline );
398+ renewJobs .add (job );
399+ }
400+ for (ExtensionJob job : renewJobs ) {
401+ outstandingAckHandlers .add (job );
402+ }
403+ if (!outstandingAckHandlers .isEmpty ()) {
404+ nextScheduleExpiration = outstandingAckHandlers .peek ().expiration ;
392405 }
393406 }
394407
@@ -404,8 +417,8 @@ public void run() {
404417 }
405418 }
406419
407- private void setupNextAckDeadlineExtensionAlarm (ExpirationInfo messageExpiration ) {
408- Instant possibleNextAlarmTime = messageExpiration . expiration .minus (ackExpirationPadding );
420+ private void setupNextAckDeadlineExtensionAlarm (Instant expiration ) {
421+ Instant possibleNextAlarmTime = expiration .minus (ackExpirationPadding );
409422 alarmsLock .lock ();
410423 try {
411424 if (nextAckDeadlineExtensionAlarmTime .isAfter (possibleNextAlarmTime )) {
0 commit comments