File tree Expand file tree Collapse file tree 5 files changed +18
-20
lines changed
google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets
main/java/com/google/cloud/pubsub/spi/v1
test/java/com/google/cloud/pubsub/spi/v1 Expand file tree Collapse file tree 5 files changed +18
-20
lines changed Original file line number Diff line number Diff line change 1919import com .google .cloud .pubsub .spi .v1 .MessageReceiver ;
2020import com .google .cloud .pubsub .spi .v1 .Subscriber ;
2121import com .google .cloud .pubsub .spi .v1 .SubscriberClient ;
22- import com .google .common .util .concurrent .Futures ;
23- import com .google .common .util .concurrent .ListenableFuture ;
2422import com .google .common .util .concurrent .MoreExecutors ;
23+ import com .google .common .util .concurrent .SettableFuture ;
2524import com .google .pubsub .v1 .PubsubMessage ;
2625import com .google .pubsub .v1 .PushConfig ;
2726import com .google .pubsub .v1 .SubscriptionName ;
@@ -44,9 +43,10 @@ public static void main(String... args) throws Exception {
4443 MessageReceiver receiver =
4544 new MessageReceiver () {
4645 @ Override
47- public ListenableFuture <MessageReceiver .AckReply > receiveMessage (PubsubMessage message ) {
46+ public void receiveMessage (
47+ PubsubMessage message , SettableFuture <MessageReceiver .AckReply > response ) {
4848 System .out .println ("got message: " + message .getData ().toStringUtf8 ());
49- return Futures . immediateFuture (MessageReceiver .AckReply .ACK );
49+ response . set (MessageReceiver .AckReply .ACK );
5050 }
5151 };
5252 Subscriber subscriber = null ;
Original file line number Diff line number Diff line change 2525import com .google .common .primitives .Ints ;
2626import com .google .common .util .concurrent .FutureCallback ;
2727import com .google .common .util .concurrent .Futures ;
28+ import com .google .common .util .concurrent .SettableFuture ;
2829import com .google .pubsub .v1 .PubsubMessage ;
2930import com .google .pubsub .v1 .ReceivedMessage ;
3031import java .util .ArrayList ;
@@ -278,11 +279,13 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
278279 for (ReceivedMessage userMessage : responseMessages ) {
279280 final PubsubMessage message = userMessage .getMessage ();
280281 final AckHandler ackHandler = acksIterator .next ();
282+ final SettableFuture <AckReply > response = SettableFuture .create ();
283+ Futures .addCallback (response , ackHandler );
281284 executor .submit (
282285 new Runnable () {
283286 @ Override
284287 public void run () {
285- Futures . addCallback ( receiver .receiveMessage (message ), ackHandler );
288+ receiver .receiveMessage (message , response );
286289 }
287290 });
288291 }
Original file line number Diff line number Diff line change 1616
1717package com .google .cloud .pubsub .spi .v1 ;
1818
19- import com .google .common .util .concurrent .ListenableFuture ;
19+ import com .google .common .util .concurrent .SettableFuture ;
2020import com .google .pubsub .v1 .PubsubMessage ;
2121
2222/** This interface can be implemented by users of {@link Subscriber} to receive messages. */
@@ -34,11 +34,10 @@ enum AckReply {
3434 */
3535 NACK
3636 }
37-
37+
3838 /**
39- * Called when a message is received by the subscriber.
40- *
41- * @return A future that signals when a message has been processed.
39+ * Called when a message is received by the subscriber. The implementation must arrange for {@code
40+ * reponse} to be set after processing the {@code message}.
4241 */
43- ListenableFuture < AckReply > receiveMessage (PubsubMessage message );
42+ void receiveMessage (PubsubMessage message , SettableFuture < AckReply > response );
4443}
Original file line number Diff line number Diff line change 8181 * <pre><code>
8282 * MessageReceiver receiver = new MessageReceiver() {
8383 * @Override
84- * public ListenableFuture <AckReply> receiveMessage(PubsubMessage message ) {
84+ * public void receiveMessage(PubsubMessage message, SettableFuture <AckReply> response ) {
8585 * // ... process message ...
86- * return Futures.immediateFuture (AckReply.ACK);
86+ * return response.set (AckReply.ACK);
8787 * }
8888 * }
8989 *
Original file line number Diff line number Diff line change 2828import com .google .common .base .Optional ;
2929import com .google .common .base .Preconditions ;
3030import com .google .common .collect .ImmutableList ;
31- import com .google .common .util .concurrent .ListenableFuture ;
3231import com .google .common .util .concurrent .SettableFuture ;
3332import com .google .pubsub .v1 .PubsubMessage ;
3433import com .google .pubsub .v1 .PullResponse ;
@@ -115,23 +114,20 @@ void waitForExpectedMessages() throws InterruptedException {
115114 }
116115
117116 @ Override
118- public ListenableFuture < AckReply > receiveMessage (PubsubMessage message ) {
117+ public void receiveMessage (PubsubMessage message , SettableFuture < AckReply > response ) {
119118 if (messageCountLatch .isPresent ()) {
120119 messageCountLatch .get ().countDown ();
121120 }
122- SettableFuture <AckReply > reply = SettableFuture .create ();
123121
124122 if (explicitAckReplies ) {
125123 try {
126- outstandingMessageReplies .put (reply );
124+ outstandingMessageReplies .put (response );
127125 } catch (InterruptedException e ) {
128126 throw new IllegalStateException (e );
129127 }
130128 } else {
131- replyTo (reply );
129+ replyTo (response );
132130 }
133-
134- return reply ;
135131 }
136132
137133 public void replyNextOutstandingMessage () {
You can’t perform that action at this time.
0 commit comments