From 582e3bfc5506e243f6456894b47c39020186a755 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 19 Jan 2017 15:49:54 +1100 Subject: [PATCH 1/2] make sure channel round-robin cannot fail on overflow The AtomicInteger might overflow. Previously, we'll just die since we'd try to index into an array with negative number. This commit makes sure we have a valid index to use. Note that this is still not a "perfect" round robin. If the number of channels does not divide 2^32, we'll slightly prefer lower indices. This is probably OK since the bias is small, and the alternative requires a much more expensive lock. Fixes #1543. --- .../main/java/com/google/cloud/pubsub/spi/v1/Publisher.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index fc7137b3909b..5da14e082ddc 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -334,7 +334,11 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) for (OutstandingPublish outstandingPublish : outstandingBundle.outstandingPublishes) { publishRequest.addMessages(outstandingPublish.message); } + int currentChannel = channelIndex.getAndIncrement() % channels.length; + if (currentChannel < 0) { + currentChannel += channels.length; + } long rpcTimeoutMs = Math.round( From 442680659b8937d1adae8c431dad33cf3476aa21 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Fri, 20 Jan 2017 10:37:54 +1100 Subject: [PATCH 2/2] split into its own class, adding tests --- .../cloud/pubsub/spi/v1/AtomicRoundRobin.java | 46 +++++++++++++++++++ .../google/cloud/pubsub/spi/v1/Publisher.java | 10 ++-- .../pubsub/spi/v1/AtomicRoundRobinTest.java | 41 +++++++++++++++++ 3 files changed, 90 insertions(+), 7 deletions(-) create mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobin.java create mode 100644 google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobinTest.java diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobin.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobin.java new file mode 100644 index 000000000000..4a2eac8f3161 --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobin.java @@ -0,0 +1,46 @@ +/* + * Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.spi.v1; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.concurrent.atomic.AtomicInteger; + +/** Provides a simplistic round robin, guarding for overflow. */ +class AtomicRoundRobin { + private final int max; + private final AtomicInteger current; + + AtomicRoundRobin(int max) { + Preconditions.checkArgument(max > 0); + this.max = max; + current = new AtomicInteger(0); + } + + int next() { + int next = current.getAndIncrement() % max; + if (next < 0) { + next += max; + } + return next; + } + + @VisibleForTesting + void set(int i) { + current.set(i); + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index 5da14e082ddc..ad1d7af7c5bc 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -52,7 +52,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.joda.time.Duration; @@ -139,7 +138,7 @@ public static long getApiMaxBundleBytes() { private final FlowController flowController; private final Channel[] channels; - private final AtomicInteger channelIndex; + private final AtomicRoundRobin channelIndex; private final CallCredentials credentials; private final ScheduledExecutorService executor; @@ -171,7 +170,7 @@ private Publisher(Builder builder) throws IOException { .setNameFormat("cloud-pubsub-publisher-thread-%d") .build()); channels = new Channel[numCores]; - channelIndex = new AtomicInteger(0); + channelIndex = new AtomicRoundRobin(channels.length); for (int i = 0; i < numCores; i++) { channels[i] = builder.channelBuilder.isPresent() @@ -335,10 +334,7 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) publishRequest.addMessages(outstandingPublish.message); } - int currentChannel = channelIndex.getAndIncrement() % channels.length; - if (currentChannel < 0) { - currentChannel += channels.length; - } + int currentChannel = channelIndex.next(); long rpcTimeoutMs = Math.round( diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobinTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobinTest.java new file mode 100644 index 000000000000..3bc4411eaa7b --- /dev/null +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/AtomicRoundRobinTest.java @@ -0,0 +1,41 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.spi.v1; + +import org.junit.Assert; +import org.junit.Test; + +public class AtomicRoundRobinTest { + @Test + public void testNext() { + AtomicRoundRobin roundRobin = new AtomicRoundRobin(3); + Assert.assertEquals(0, roundRobin.next()); + Assert.assertEquals(1, roundRobin.next()); + Assert.assertEquals(2, roundRobin.next()); + Assert.assertEquals(0, roundRobin.next()); + Assert.assertEquals(1, roundRobin.next()); + Assert.assertEquals(2, roundRobin.next()); + } + + @Test + public void testOverflow() { + AtomicRoundRobin roundRobin = new AtomicRoundRobin(3); + roundRobin.set(Integer.MAX_VALUE); + Assert.assertTrue(roundRobin.next() >= 0); + Assert.assertTrue(roundRobin.next() >= 0); + } +}