Skip to content

Commit 4426806

Browse files
committed
split into its own class, adding tests
1 parent 582e3bf commit 4426806

File tree

3 files changed

+90
-7
lines changed

3 files changed

+90
-7
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2017 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub.spi.v1;
18+
19+
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Preconditions;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
/** Provides a simplistic round robin, guarding for overflow. */
24+
class AtomicRoundRobin {
25+
private final int max;
26+
private final AtomicInteger current;
27+
28+
AtomicRoundRobin(int max) {
29+
Preconditions.checkArgument(max > 0);
30+
this.max = max;
31+
current = new AtomicInteger(0);
32+
}
33+
34+
int next() {
35+
int next = current.getAndIncrement() % max;
36+
if (next < 0) {
37+
next += max;
38+
}
39+
return next;
40+
}
41+
42+
@VisibleForTesting
43+
void set(int i) {
44+
current.set(i);
45+
}
46+
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import java.util.concurrent.ThreadLocalRandom;
5353
import java.util.concurrent.TimeUnit;
5454
import java.util.concurrent.atomic.AtomicBoolean;
55-
import java.util.concurrent.atomic.AtomicInteger;
5655
import java.util.concurrent.locks.Lock;
5756
import java.util.concurrent.locks.ReentrantLock;
5857
import org.joda.time.Duration;
@@ -139,7 +138,7 @@ public static long getApiMaxBundleBytes() {
139138

140139
private final FlowController flowController;
141140
private final Channel[] channels;
142-
private final AtomicInteger channelIndex;
141+
private final AtomicRoundRobin channelIndex;
143142
private final CallCredentials credentials;
144143

145144
private final ScheduledExecutorService executor;
@@ -171,7 +170,7 @@ private Publisher(Builder builder) throws IOException {
171170
.setNameFormat("cloud-pubsub-publisher-thread-%d")
172171
.build());
173172
channels = new Channel[numCores];
174-
channelIndex = new AtomicInteger(0);
173+
channelIndex = new AtomicRoundRobin(channels.length);
175174
for (int i = 0; i < numCores; i++) {
176175
channels[i] =
177176
builder.channelBuilder.isPresent()
@@ -335,10 +334,7 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle)
335334
publishRequest.addMessages(outstandingPublish.message);
336335
}
337336

338-
int currentChannel = channelIndex.getAndIncrement() % channels.length;
339-
if (currentChannel < 0) {
340-
currentChannel += channels.length;
341-
}
337+
int currentChannel = channelIndex.next();
342338

343339
long rpcTimeoutMs =
344340
Math.round(
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub.spi.v1;
18+
19+
import org.junit.Assert;
20+
import org.junit.Test;
21+
22+
public class AtomicRoundRobinTest {
23+
@Test
24+
public void testNext() {
25+
AtomicRoundRobin roundRobin = new AtomicRoundRobin(3);
26+
Assert.assertEquals(0, roundRobin.next());
27+
Assert.assertEquals(1, roundRobin.next());
28+
Assert.assertEquals(2, roundRobin.next());
29+
Assert.assertEquals(0, roundRobin.next());
30+
Assert.assertEquals(1, roundRobin.next());
31+
Assert.assertEquals(2, roundRobin.next());
32+
}
33+
34+
@Test
35+
public void testOverflow() {
36+
AtomicRoundRobin roundRobin = new AtomicRoundRobin(3);
37+
roundRobin.set(Integer.MAX_VALUE);
38+
Assert.assertTrue(roundRobin.next() >= 0);
39+
Assert.assertTrue(roundRobin.next() >= 0);
40+
}
41+
}

0 commit comments

Comments
 (0)