diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobin.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobin.java new file mode 100644 index 000000000000..4a2eac8f3161 --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobin.java @@ -0,0 +1,46 @@ +/* + * Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.spi.v1; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.concurrent.atomic.AtomicInteger; + +/** Provides a simplistic round robin, guarding for overflow. */ +class AtomicRoundRobin { + private final int max; + private final AtomicInteger current; + + AtomicRoundRobin(int max) { + Preconditions.checkArgument(max > 0); + this.max = max; + current = new AtomicInteger(0); + } + + int next() { + int next = current.getAndIncrement() % max; + if (next < 0) { + next += max; + } + return next; + } + + @VisibleForTesting + void set(int i) { + current.set(i); + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index fc7137b3909b..ad1d7af7c5bc 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -52,7 +52,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.joda.time.Duration; @@ -139,7 +138,7 @@ public static long getApiMaxBundleBytes() { private final FlowController flowController; private final Channel[] channels; - private final AtomicInteger channelIndex; + private final AtomicRoundRobin channelIndex; private final CallCredentials credentials; private final ScheduledExecutorService executor; @@ -171,7 +170,7 @@ private Publisher(Builder builder) throws IOException { .setNameFormat("cloud-pubsub-publisher-thread-%d") .build()); channels = new Channel[numCores]; - channelIndex = new AtomicInteger(0); + channelIndex = new AtomicRoundRobin(channels.length); for (int i = 0; i < numCores; i++) { channels[i] = builder.channelBuilder.isPresent() @@ -334,7 +333,8 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) for (OutstandingPublish outstandingPublish : outstandingBundle.outstandingPublishes) { publishRequest.addMessages(outstandingPublish.message); } - int currentChannel = channelIndex.getAndIncrement() % channels.length; + + int currentChannel = channelIndex.next(); long rpcTimeoutMs = Math.round( diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobinTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobinTest.java new file mode 100644 index 000000000000..3bc4411eaa7b --- /dev/null +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobinTest.java @@ -0,0 +1,41 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.spi.v1; + +import org.junit.Assert; +import org.junit.Test; + +public class AtomicRoundRobinTest { + @Test + public void testNext() { + AtomicRoundRobin roundRobin = new AtomicRoundRobin(3); + Assert.assertEquals(0, roundRobin.next()); + Assert.assertEquals(1, roundRobin.next()); + Assert.assertEquals(2, roundRobin.next()); + Assert.assertEquals(0, roundRobin.next()); + Assert.assertEquals(1, roundRobin.next()); + Assert.assertEquals(2, roundRobin.next()); + } + + @Test + public void testOverflow() { + AtomicRoundRobin roundRobin = new AtomicRoundRobin(3); + roundRobin.set(Integer.MAX_VALUE); + Assert.assertTrue(roundRobin.next() >= 0); + Assert.assertTrue(roundRobin.next() >= 0); + } +}