Skip to content

Commit eea1a58

Browse files
davidtorresgarrettjonesgoogle
authored andcommitted
Change AckReplyConsumer to expose individual methods for replying (#1899)
* Change AckReplyConsumer to expose individual methods for replying to a message outcome as opposed to use an enum.
1 parent f9e7ecc commit eea1a58

File tree

11 files changed

+54
-62
lines changed

11 files changed

+54
-62
lines changed

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ public void run(Tuple<SubscriptionName, Long> params) throws Exception {
502502
@Override
503503
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
504504
messageCount.incrementAndGet();
505-
consumer.accept(AckReply.ACK);
505+
consumer.ack();
506506
}
507507
};
508508
SubscriptionName subscriptionName = params.x();

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java renamed to google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndConsumeMessages.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.google.cloud.examples.pubsub.snippets;
1818

19-
import com.google.cloud.pubsub.spi.v1.AckReply;
2019
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
2120
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
2221
import com.google.cloud.pubsub.spi.v1.Subscriber;
@@ -31,7 +30,7 @@
3130
* A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull subscription and
3231
* asynchronously pull messages from it.
3332
*/
34-
public class CreateSubscriptionAndPullMessages {
33+
public class CreateSubscriptionAndConsumeMessages {
3534

3635
public static void main(String... args) throws Exception {
3736
// [START async_pull_subscription]
@@ -47,7 +46,7 @@ public static void main(String... args) throws Exception {
4746
@Override
4847
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
4948
System.out.println("got message: " + message.getData().toStringUtf8());
50-
consumer.accept(AckReply.ACK);
49+
consumer.ack();
5150
}
5251
};
5352
Subscriber subscriber = null;

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
package com.google.cloud.examples.pubsub.snippets;
2424

25-
import com.google.cloud.pubsub.spi.v1.AckReply;
2625
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
2726
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
2827
import com.google.pubsub.v1.PubsubMessage;
@@ -51,9 +50,9 @@ public MessageReceiver messageReceiver() {
5150
MessageReceiver receiver = new MessageReceiver() {
5251
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
5352
if (blockingQueue.offer(message)) {
54-
consumer.accept(AckReply.ACK);
53+
consumer.ack();
5554
} else {
56-
consumer.accept(AckReply.NACK);
55+
consumer.nack();
5756
}
5857
}
5958
};

google-cloud-pubsub/README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,6 @@ With Pub/Sub you can pull messages from a subscription. Add the following import
158158
file:
159159

160160
```java
161-
import com.google.cloud.pubsub.spi.v1.AckReply;
162161
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
163162
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
164163
import com.google.cloud.pubsub.spi.v1.Subscriber;
@@ -175,7 +174,7 @@ MessageReceiver receiver =
175174
@Override
176175
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
177176
System.out.println("got message: " + message.getData().toStringUtf8());
178-
consumer.accept(AckReply.ACK, null);
177+
consumer.ack();
179178
}
180179
};
181180
Subscriber subscriber = null;
@@ -204,7 +203,7 @@ try {
204203
In
205204
[CreateTopicAndPublishMessages.java](../google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java)
206205
and
207-
[CreateSubscriptionAndPullMessages.java](../google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java)
206+
[CreateSubscriptionAndConsumeMessages.java](../google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndConsumeMessages.java)
208207
we put together all the code shown above into two programs. The programs assume that you are
209208
running on Compute Engine, App Engine Flexible or from your own desktop.
210209

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReply.java

Lines changed: 0 additions & 31 deletions
This file was deleted.

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,17 @@
1818

1919
/**
2020
* Accepts a reply, sending it to the service.
21-
*
22-
* <p>Both the interface and its method is named after the Java 8's {@code Consumer} interface
23-
* to make migration to Java 8 and adopting its patterns easier.
2421
*/
2522
public interface AckReplyConsumer {
26-
void accept(AckReply ackReply);
23+
/**
24+
* Acknowledges that the message has been successfully processed. The service will not send the
25+
* message again.
26+
*/
27+
void ack();
28+
29+
/**
30+
* Signals that the message has not been successfully processed. The service should resend the
31+
* message.
32+
*/
33+
void nack();
2734
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,12 @@ public String toString() {
154154
}
155155
}
156156

157+
/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
158+
public enum AckReply {
159+
ACK,
160+
NACK
161+
}
162+
157163
/**
158164
* Handles callbacks for acking/nacking messages from the {@link
159165
* com.google.cloud.pubsub.spi.v1.MessageReceiver}.
@@ -295,8 +301,13 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
295301
final AckReplyConsumer consumer =
296302
new AckReplyConsumer() {
297303
@Override
298-
public void accept(AckReply reply) {
299-
response.set(reply);
304+
public void ack() {
305+
response.set(AckReply.ACK);
306+
}
307+
308+
@Override
309+
public void nack() {
310+
response.set(AckReply.NACK);
300311
}
301312
};
302313
Futures.addCallback(response, ackHandler);

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
public interface MessageReceiver {
2323
/**
2424
* Called when a message is received by the subscriber. The implementation must arrange for {@link
25-
* AckReplyConsumer#accept} to be called after processing the {@code message}.
25+
* AckReplyConsumer#ack()} or {@link
26+
* AckReplyConsumer#nack()} to be called after processing the {@code message}.
2627
*
2728
* <p>This {@code MessageReceiver} passes all messages to a {@code BlockingQueue}.
2829
* This method can be called concurrently from multiple threads,
@@ -34,9 +35,9 @@ public interface MessageReceiver {
3435
* MessageReceiver receiver = new MessageReceiver() {
3536
* public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
3637
* if (blockingQueue.offer(message)) {
37-
* consumer.accept(AckReply.ACK, null);
38+
* consumer.ack();
3839
* } else {
39-
* consumer.accept(AckReply.NACK, null);
40+
* consumer.nack();
4041
* }
4142
* }
4243
* };

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@
5555
*
5656
* <p>A {@link Subscriber} allows you to provide an implementation of a {@link MessageReceiver
5757
* receiver} to which messages are going to be delivered as soon as they are received by the
58-
* subscriber. The delivered messages then can be {@link AckReply#ACK acked} or {@link AckReply#NACK
59-
* nacked} at will as they get processed by the receiver. Nacking a messages implies a later
60-
* redelivery of such message.
58+
* subscriber. The delivered messages then can be {@link AckReplyConsumer#ack() acked} or {@link
59+
* AckReplyConsumer#nack() nacked} at will as they get processed by the receiver. Nacking a messages
60+
* implies a later redelivery of such message.
6161
*
6262
* <p>The subscriber handles the ack management, by automatically extending the ack deadline while
6363
* the message is being processed, to then issue the ack or nack of such message when the processing

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import com.google.api.gax.core.SettableApiFuture;
2323
import com.google.cloud.ServiceOptions;
24-
import com.google.cloud.pubsub.spi.v1.AckReply;
2524
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
2625
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
2726
import com.google.cloud.pubsub.spi.v1.Publisher;
@@ -116,9 +115,9 @@ public void testPublishSubscribe() throws Exception {
116115
public void receiveMessage(
117116
final PubsubMessage message, final AckReplyConsumer consumer) {
118117
if (received.set(message)) {
119-
consumer.accept(AckReply.ACK);
118+
consumer.ack();
120119
} else {
121-
consumer.accept(AckReply.NACK);
120+
consumer.nack();
122121
}
123122
}
124123
})

0 commit comments

Comments
 (0)