Skip to content

Commit 4afc000

Browse files
committed
BOOKKEEPER-69: ServerRedirectLoopException when a machine (hosts bookie server & hub server) reboot, which is caused by race condition of topic manager (Sijie, ivank via ivank)
git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1203576 13f79535-47bb-0310-9956-ffa450edef68
1 parent 4049d4c commit 4afc000

File tree

4 files changed

+245
-12
lines changed

4 files changed

+245
-12
lines changed

CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ BUGFIXES:
8686

8787
BOOKKEEPER-100: Some hedwig tests have build errors (dferro via ivank)
8888

89+
BOOKKEEPER-69: ServerRedirectLoopException when a machine (hosts bookie server & hub server) reboot, which is caused by race condition of topic manager (Sijie, ivank via ivank)
90+
8991
hedwig-client/
9092

9193
BOOKKEEPER-52: Message sequence confuse due to the subscribeMsgQueue@SubscribeResponseHandler (xulei via ivank)

hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,19 @@ public void operationFinished(Object ctx, Void resultOfOperation) {
136136
}
137137

138138
@Override
139-
public void operationFailed(Object ctx, PubSubException exception) {
139+
public void operationFailed(final Object ctx, final PubSubException exception) {
140140
// TODO: optimization: we can release this as soon as we experience the first error.
141-
realReleaseTopic(topic, CallbackUtils.curry(originalCallback, addr), originalContext);
142-
originalCallback.operationFailed(ctx, exception);
141+
Callback<Void> cb = new Callback<Void>() {
142+
public void operationFinished(Object _ctx, Void _resultOfOperation) {
143+
originalCallback.operationFailed(ctx, exception);
144+
}
145+
public void operationFailed(Object _ctx, PubSubException _exception) {
146+
logger.error("Exception releasing topic", _exception);
147+
originalCallback.operationFailed(ctx, exception);
148+
}
149+
};
150+
151+
realReleaseTopic(topic, cb, originalContext);
143152
}
144153
};
145154

hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.File;
2121
import java.util.LinkedList;
2222
import java.util.List;
23+
import java.util.Random;
2324

2425
import org.apache.bookkeeper.conf.ClientConfiguration;
2526
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -48,8 +49,8 @@ public class BookKeeperTestBase extends ZooKeeperTestBase {
4849

4950
// BookKeeper Server variables
5051
private List<BookieServer> bookiesList;
51-
private List<ServerConfiguration> bookieConfsList;
5252
private int initialPort = 5000;
53+
private int nextPort = initialPort;
5354

5455
// String constants used for creating the bookie server files.
5556
private static final String PREFIX = "bookie";
@@ -100,16 +101,9 @@ public void setUp() throws Exception {
100101

101102
// Create Bookie Servers
102103
bookiesList = new LinkedList<BookieServer>();
103-
bookieConfsList = new LinkedList<ServerConfiguration>();
104104

105105
for (int i = 0; i < numBookies; i++) {
106-
File tmpDir = FileUtils.createTempDirectory(PREFIX + i, SUFFIX);
107-
ServerConfiguration conf = newServerConfiguration(
108-
initialPort + i, hostPort, tmpDir, new File[] { tmpDir });
109-
bookieConfsList.add(conf);
110-
BookieServer bs = new BookieServer(conf);
111-
bs.start();
112-
bookiesList.add(bs);
106+
startUpNewBookieServer();
113107
}
114108

115109
// Create the BookKeeper client
@@ -135,6 +129,28 @@ public void tearDown() throws Exception {
135129
bk.close();
136130
super.tearDown();
137131
}
132+
133+
public void tearDownOneBookieServer() throws Exception {
134+
Random r = new Random();
135+
int bi = r.nextInt(bookiesList.size());
136+
BookieServer bs = bookiesList.get(bi);
137+
try {
138+
bs.shutdown();
139+
} catch (InterruptedException e) {
140+
LOG.error("Error tearing down", e);
141+
}
142+
bookiesList.remove(bi);
143+
}
144+
145+
public void startUpNewBookieServer() throws Exception {
146+
File tmpDir = FileUtils.createTempDirectory(
147+
PREFIX + (nextPort - initialPort), SUFFIX);
148+
ServerConfiguration conf = newServerConfiguration(
149+
nextPort++, hostPort, tmpDir, new File[] { tmpDir });
150+
BookieServer bs = new BookieServer(conf);
151+
bs.start();
152+
bookiesList.add(bs);
153+
}
138154

139155
protected ServerConfiguration newServerConfiguration(int port, String zkServers, File journalDir, File[] ledgerDirs) {
140156
ServerConfiguration conf = new ServerConfiguration(baseConf);
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hedwig.server.topics;
19+
20+
import java.util.concurrent.LinkedBlockingQueue;
21+
import java.util.concurrent.SynchronousQueue;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
25+
import org.apache.hedwig.client.conf.ClientConfiguration;
26+
import org.apache.hedwig.client.HedwigClient;
27+
import org.apache.hedwig.client.api.Publisher;
28+
import org.apache.hedwig.client.api.Subscriber;
29+
import org.apache.hedwig.exceptions.PubSubException;
30+
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
31+
import org.apache.hedwig.server.HedwigHubTestBase;
32+
import org.apache.hedwig.util.Callback;
33+
import org.apache.hedwig.util.ConcurrencyUtils;
34+
import org.junit.Test;
35+
36+
import com.google.protobuf.ByteString;
37+
38+
public class TestConcurrentTopicAcquisition extends HedwigHubTestBase {
39+
40+
// Client variables
41+
protected HedwigClient client;
42+
protected Publisher publisher;
43+
protected Subscriber subscriber;
44+
45+
final LinkedBlockingQueue<ByteString> subscribers =
46+
new LinkedBlockingQueue<ByteString>();
47+
final ByteString topic = ByteString.copyFromUtf8("concurrent-topic");
48+
final int numSubscribers = 300;
49+
final AtomicInteger numDone = new AtomicInteger(0);
50+
51+
// SynchronousQueues to verify async calls
52+
private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
53+
54+
class SubCallback implements Callback<Void> {
55+
56+
ByteString subId;
57+
58+
public SubCallback(ByteString subId) {
59+
this.subId = subId;
60+
}
61+
62+
@Override
63+
public void operationFinished(Object ctx,
64+
Void resultOfOperation) {
65+
if (logger.isDebugEnabled()) {
66+
logger.debug("subscriber " + subId.toStringUtf8() + " succeed.");
67+
}
68+
int done = numDone.incrementAndGet();
69+
if (done == numSubscribers) {
70+
ConcurrencyUtils.put(queue, false);
71+
}
72+
}
73+
74+
@Override
75+
public void operationFailed(Object ctx,
76+
PubSubException exception) {
77+
if (logger.isDebugEnabled()) {
78+
logger.debug("subscriber " + subId.toStringUtf8() + " failed : ", exception);
79+
}
80+
ConcurrencyUtils.put(subscribers, subId);
81+
// ConcurrencyUtils.put(queue, false);
82+
}
83+
}
84+
85+
@Override
86+
public void setUp() throws Exception {
87+
super.setUp();
88+
client = new HedwigClient(new ClientConfiguration());
89+
90+
publisher = client.getPublisher();
91+
subscriber = client.getSubscriber();
92+
}
93+
94+
@Override
95+
public void tearDown() throws Exception {
96+
// sub.interrupt();
97+
// sub.join();
98+
99+
client.close();
100+
super.tearDown();
101+
}
102+
103+
@Test
104+
public void testTopicAcquistion() throws Exception {
105+
logger.info("Start concurrent topic acquistion test.");
106+
107+
// let one bookie down to cause not enough bookie exception
108+
logger.info("Tear down one bookie server.");
109+
bktb.tearDownOneBookieServer();
110+
111+
// In current implementation, the first several subscriptions will succeed to put topic in topic manager set,
112+
// because the tear down bookie server's zk node need time to disappear
113+
// some subscriptions will create ledger successfully, then other subscriptions will fail.
114+
// the race condition will be: topic manager own topic but persistence manager doesn't
115+
116+
// 300 subscribers subscribe to a same topic
117+
final AtomicBoolean inRedirectLoop = new AtomicBoolean(false);
118+
numDone.set(0);
119+
for (int i=0; i<numSubscribers; i++) {
120+
ByteString subId = ByteString.copyFromUtf8("sub-" + i);
121+
if (logger.isDebugEnabled()) {
122+
logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8());
123+
}
124+
subscriber.asyncSubscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH,
125+
new Callback<Void>() {
126+
127+
private void tick() {
128+
if (numDone.incrementAndGet() == numSubscribers) {
129+
ConcurrencyUtils.put(queue, true);
130+
}
131+
}
132+
133+
@Override
134+
public void operationFinished(Object ctx,
135+
Void resultOfOperation) {
136+
tick();
137+
}
138+
139+
@Override
140+
public void operationFailed(Object ctx,
141+
PubSubException exception) {
142+
if (exception instanceof PubSubException.ServiceDownException) {
143+
String msg = exception.getMessage();
144+
if (msg.indexOf("ServerRedirectLoopException") > 0) {
145+
inRedirectLoop.set(true);
146+
}
147+
if (logger.isDebugEnabled()) {
148+
logger.debug("Operation failed : ", exception);
149+
}
150+
}
151+
tick();
152+
}
153+
154+
},
155+
null);
156+
}
157+
158+
queue.take();
159+
160+
// TODO: remove comment after we fix the issue
161+
// Assert.assertEquals(false, inRedirectLoop.get());
162+
163+
// start a thread to send subscriptions
164+
numDone.set(0);
165+
Thread sub = new Thread(new Runnable() {
166+
167+
@Override
168+
public void run() {
169+
logger.info("sub thread started");
170+
try {
171+
// 100 subscribers subscribe to a same topic
172+
for (int i=0; i<numSubscribers; i++) {
173+
ByteString subscriberId = ByteString.copyFromUtf8("sub-" + i);
174+
subscribers.put(subscriberId);
175+
}
176+
177+
ByteString subId;
178+
while (true) {
179+
subId = subscribers.take();
180+
181+
if (logger.isDebugEnabled()) {
182+
logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8());
183+
}
184+
subscriber.asyncSubscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH,
185+
new SubCallback(subId), null);
186+
}
187+
// subscriber.asyncSubscribe(topic, subscriberId, mode, callback, context)
188+
} catch (InterruptedException ie) {
189+
// break
190+
logger.warn("Interrupted : ", ie);
191+
}
192+
}
193+
194+
});
195+
sub.start();
196+
Thread.sleep(2000);
197+
198+
// start a new bookie server
199+
logger.info("start new bookie server");
200+
bktb.startUpNewBookieServer();
201+
202+
// hope that all the subscriptions will be OK
203+
queue.take();
204+
}
205+
206+
}

0 commit comments

Comments
 (0)