Skip to content

Commit b125280

Browse files
committed
use ExecutorProvider
Use ExecutorProvider instead of asking of an Executor directly. This makes Publisher and Subscriber consistent with toolkit-generated code. This commit leaves Channels and Credentials alone for now, since it is already getting big.
1 parent 1c80923 commit b125280

File tree

4 files changed

+88
-47
lines changed

4 files changed

+88
-47
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.google.api.gax.bundling.FlowController;
2020
import com.google.api.gax.core.RetrySettings;
2121
import com.google.api.gax.grpc.BundlingSettings;
22+
import com.google.api.gax.grpc.ExecutorProvider;
23+
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
2224
import com.google.auth.Credentials;
2325
import com.google.auth.oauth2.GoogleCredentials;
2426
import com.google.common.base.Optional;
@@ -28,7 +30,6 @@
2830
import com.google.common.util.concurrent.Futures;
2931
import com.google.common.util.concurrent.ListenableFuture;
3032
import com.google.common.util.concurrent.SettableFuture;
31-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
3233
import com.google.pubsub.v1.PublishRequest;
3334
import com.google.pubsub.v1.PublishResponse;
3435
import com.google.pubsub.v1.PublisherGrpc;
@@ -43,10 +44,10 @@
4344
import io.grpc.netty.NegotiationType;
4445
import io.grpc.netty.NettyChannelBuilder;
4546
import java.io.IOException;
47+
import java.util.ArrayList;
4648
import java.util.Iterator;
4749
import java.util.LinkedList;
4850
import java.util.List;
49-
import java.util.concurrent.Executors;
5051
import java.util.concurrent.ScheduledExecutorService;
5152
import java.util.concurrent.ScheduledFuture;
5253
import java.util.concurrent.ThreadLocalRandom;
@@ -118,8 +119,6 @@ public static long getApiMaxBundleBytes() {
118119
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
119120
}
120121

121-
private static final int DEFAULT_MIN_THREAD_POOL_SIZE = 5;
122-
123122
private static final Logger logger = LoggerFactory.getLogger(Publisher.class);
124123

125124
private final String topic;
@@ -143,6 +142,7 @@ public static long getApiMaxBundleBytes() {
143142

144143
private final ScheduledExecutorService executor;
145144
private final AtomicBoolean shutdown;
145+
private final List<AutoCloseable> closeables = new ArrayList<>();
146146
private final MessagesWaiter messagesWaiter;
147147
private ScheduledFuture<?> currentAlarmFuture;
148148

@@ -160,15 +160,16 @@ private Publisher(Builder builder) throws IOException {
160160
messagesBundleLock = new ReentrantLock();
161161
activeAlarm = new AtomicBoolean(false);
162162
int numCores = Math.max(1, Runtime.getRuntime().availableProcessors());
163-
executor =
164-
builder.executor.isPresent()
165-
? builder.executor.get()
166-
: Executors.newScheduledThreadPool(
167-
numCores * DEFAULT_MIN_THREAD_POOL_SIZE,
168-
new ThreadFactoryBuilder()
169-
.setDaemon(true)
170-
.setNameFormat("cloud-pubsub-publisher-thread-%d")
171-
.build());
163+
executor = builder.executorProvider.getExecutor();
164+
if (builder.executorProvider.shouldAutoClose()) {
165+
closeables.add(
166+
new AutoCloseable() {
167+
@Override
168+
public void close() throws IOException {
169+
executor.shutdown();
170+
}
171+
});
172+
}
172173
channels = new Channel[numCores];
173174
channelIndex = new AtomicRoundRobin(channels.length);
174175
for (int i = 0; i < numCores; i++) {
@@ -480,7 +481,7 @@ public PublisherStats getStats() {
480481
* should be invoked prior to deleting the {@link Publisher} object in order to ensure that no
481482
* pending messages are lost.
482483
*/
483-
public void shutdown() {
484+
public void shutdown() throws Exception {
484485
if (shutdown.getAndSet(true)) {
485486
throw new IllegalStateException("Cannot shut down a publisher already shut-down.");
486487
}
@@ -489,6 +490,9 @@ public void shutdown() {
489490
}
490491
publishAllOutstanding();
491492
messagesWaiter.waitNoMessages();
493+
for (AutoCloseable closeable : closeables) {
494+
closeable.close();
495+
}
492496
}
493497

494498
private boolean hasBundlingBytes() {
@@ -550,6 +554,12 @@ public static final class Builder {
550554
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
551555
.build();
552556

557+
private static final int THREADS_PER_CPU = 5;
558+
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
559+
InstantiatingExecutorProvider.newBuilder()
560+
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
561+
.build();
562+
553563
String topic;
554564

555565
// Bundling options
@@ -566,7 +576,7 @@ public static final class Builder {
566576
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
567577
Optional.absent();
568578

569-
Optional<ScheduledExecutorService> executor = Optional.absent();
579+
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
570580

571581
/** Constructs a new {@link Builder} using the given topic. */
572582
public static Builder newBuilder(TopicName topic) {
@@ -656,8 +666,8 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
656666
}
657667

658668
/** Gives the ability to set a custom executor to be used by the library. */
659-
public Builder setExecutor(ScheduledExecutorService executor) {
660-
this.executor = Optional.of(Preconditions.checkNotNull(executor));
669+
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
670+
this.executorProvider = Preconditions.checkNotNull(executorProvider);
661671
return this;
662672
}
663673

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.google.cloud.pubsub.spi.v1;
1818

1919
import com.google.api.gax.bundling.FlowController;
20+
import com.google.api.gax.grpc.ExecutorProvider;
21+
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
2022
import com.google.api.stats.Distribution;
2123
import com.google.auth.Credentials;
2224
import com.google.auth.oauth2.GoogleCredentials;
@@ -28,7 +30,6 @@
2830
import com.google.common.primitives.Ints;
2931
import com.google.common.util.concurrent.AbstractService;
3032
import com.google.common.util.concurrent.Service;
31-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
3233
import com.google.pubsub.v1.SubscriptionName;
3334
import io.grpc.ManagedChannelBuilder;
3435
import io.grpc.Status;
@@ -40,7 +41,6 @@
4041
import java.util.ArrayList;
4142
import java.util.List;
4243
import java.util.concurrent.CountDownLatch;
43-
import java.util.concurrent.Executors;
4444
import java.util.concurrent.ScheduledExecutorService;
4545
import java.util.concurrent.ScheduledFuture;
4646
import java.util.concurrent.TimeUnit;
@@ -131,6 +131,7 @@ public class Subscriber extends AbstractService {
131131
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
132132
private final List<PollingSubscriberConnection> pollingSubscriberConnections;
133133
private final Clock clock;
134+
private final List<AutoCloseable> closeables = new ArrayList<>();
134135
private ScheduledFuture<?> ackDeadlineUpdater;
135136
private int streamAckDeadlineSeconds;
136137

@@ -147,16 +148,16 @@ private Subscriber(Builder builder) throws IOException {
147148

148149
flowController = new FlowController(builder.flowControlSettings, false);
149150

150-
numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
151-
executor =
152-
builder.executor.isPresent()
153-
? builder.executor.get()
154-
: Executors.newScheduledThreadPool(
155-
numChannels * THREADS_PER_CHANNEL,
156-
new ThreadFactoryBuilder()
157-
.setDaemon(true)
158-
.setNameFormat("cloud-pubsub-subscriber-thread-%d")
159-
.build());
151+
executor = builder.executorProvider.getExecutor();
152+
if (builder.executorProvider.shouldAutoClose()) {
153+
closeables.add(
154+
new AutoCloseable() {
155+
@Override
156+
public void close() throws IOException {
157+
executor.shutdown();
158+
}
159+
});
160+
}
160161

161162
channelBuilder =
162163
builder.channelBuilder.isPresent()
@@ -176,6 +177,7 @@ private Subscriber(Builder builder) throws IOException {
176177
: GoogleCredentials.getApplicationDefault()
177178
.createScoped(SubscriberSettings.getDefaultServiceScopes());
178179

180+
numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
179181
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);
180182
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels);
181183
}
@@ -191,7 +193,14 @@ protected void doStart() {
191193
protected void doStop() {
192194
stopAllStreamingConnections();
193195
stopAllPollingConnections();
194-
notifyStopped();
196+
try {
197+
for (AutoCloseable closeable : closeables) {
198+
closeable.close();
199+
}
200+
notifyStopped();
201+
} catch (Exception e) {
202+
notifyFailed(e);
203+
}
195204
}
196205

197206
private void startStreamingConnections() {
@@ -379,6 +388,14 @@ public static final class Builder {
379388
private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.millis(100);
380389
private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500);
381390

391+
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
392+
InstantiatingExecutorProvider.newBuilder()
393+
.setExecutorThreadCount(
394+
THREADS_PER_CHANNEL
395+
* CHANNELS_PER_CORE
396+
* Runtime.getRuntime().availableProcessors())
397+
.build();
398+
382399
String subscription;
383400
Optional<Credentials> credentials = Optional.absent();
384401
MessageReceiver receiver;
@@ -387,7 +404,7 @@ public static final class Builder {
387404

388405
FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT;
389406

390-
Optional<ScheduledExecutorService> executor = Optional.absent();
407+
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
391408
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
392409
Optional.absent();
393410
Optional<Clock> clock = Optional.absent();
@@ -459,8 +476,8 @@ public Builder setAckExpirationPadding(Duration ackExpirationPadding) {
459476
}
460477

461478
/** Gives the ability to set a custom executor. */
462-
public Builder setExecutor(ScheduledExecutorService executor) {
463-
this.executor = Optional.of(executor);
479+
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
480+
this.executorProvider = Preconditions.checkNotNull(executorProvider);
464481
return this;
465482
}
466483

0 commit comments

Comments
 (0)