Skip to content

Commit 1bcbc47

Browse files
committed
BOOKKEEPER-79: randomly startDelivery/stopDelivery will core dump in c++ hedwig client (Sijie Guo via ivank)
git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1204437 13f79535-47bb-0310-9956-ffa450edef68
1 parent aed845b commit 1bcbc47

File tree

6 files changed

+215
-47
lines changed

6 files changed

+215
-47
lines changed

CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ BUGFIXES:
106106

107107
BOOKKEEPER-87: TestHedwigHub exhausts direct buffer memory with netty 3.2.4.Final (ivank via fpj)
108108

109+
BOOKKEEPER-79: randomly startDelivery/stopDelivery will core dump in c++ hedwig client (Sijie Guo via ivank)
110+
109111
IMPROVEMENTS:
110112

111113
BOOKKEEPER-28: Create useful startup scripts for bookkeeper and hedwig (ivank)

hedwig-client/src/main/cpp/lib/channel.cpp

Lines changed: 79 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ DuplexChannel::DuplexChannel(EventDispatcher& dispatcher, const HostAddress& add
5252
const Configuration& cfg, const ChannelHandlerPtr& handler)
5353
: dispatcher(dispatcher), address(addr), handler(handler),
5454
socket(dispatcher.getService()), instream(&in_buf), copy_buf(NULL), copy_buf_length(0),
55-
state(UNINITIALISED), receiving(false), sending(false)
55+
state(UNINITIALISED), receiving(false), reading(false), sending(false)
5656
{
5757
LOG4CXX_DEBUG(logger, "Creating DuplexChannel(" << this << ")");
5858
}
@@ -140,6 +140,21 @@ void DuplexChannel::connect() {
140140
h = channel->handler;
141141
}
142142
}
143+
144+
// channel did stopReceiving, we should not call #messageReceived
145+
// store this response in outstanding_response variable and did stop receiving
146+
// when we startReceiving again, we can process this last response.
147+
{
148+
boost::lock_guard<boost::mutex> lock(channel->receiving_lock);
149+
if (!channel->isReceiving()) {
150+
// queue the response
151+
channel->outstanding_response = response;
152+
channel->reading = false;
153+
return;
154+
}
155+
}
156+
157+
// channel is still in receiving status
143158
if (h.get()) {
144159
h->messageReceived(channel, response);
145160
}
@@ -188,10 +203,6 @@ void DuplexChannel::connect() {
188203
}
189204

190205
/*static*/ void DuplexChannel::readSize(DuplexChannelPtr channel) {
191-
if (!channel->isReceiving()) {
192-
return;
193-
}
194-
195206
int toread = sizeof(uint32_t) - channel->in_buf.size();
196207
LOG4CXX_DEBUG(logger, " size of incoming message " << sizeof(uint32_t)
197208
<< ", currently in buffer " << channel->in_buf.size()
@@ -212,14 +223,56 @@ void DuplexChannel::connect() {
212223

213224
void DuplexChannel::startReceiving() {
214225
LOG4CXX_DEBUG(logger, "DuplexChannel::startReceiving channel(" << this << ") currently receiving = " << receiving);
215-
216-
boost::lock_guard<boost::mutex> lock(receiving_lock);
217-
if (receiving) {
218-
return;
219-
}
220-
receiving = true;
221-
222-
DuplexChannel::readSize(shared_from_this());
226+
227+
PubSubResponsePtr response;
228+
bool inReadingState;
229+
{
230+
boost::lock_guard<boost::mutex> lock(receiving_lock);
231+
// receiving before just return
232+
if (receiving) {
233+
return;
234+
}
235+
receiving = true;
236+
237+
// if we have last response collected in previous startReceiving
238+
// we need to process it, but we should process it under receiving_lock
239+
// otherwise we enter dead lock
240+
// subscriber#startDelivery(subscriber#queue_lock) =>
241+
// channel#startReceiving(channel#receiving_lock) =>
242+
// sbuscriber#messageReceived(subscriber#queue_lock)
243+
if (outstanding_response.get()) {
244+
response = outstanding_response;
245+
outstanding_response = PubSubResponsePtr();
246+
}
247+
248+
// if channel is in reading status wait data from remote server
249+
// we don't need to insert another readSize op
250+
inReadingState = reading;
251+
if (!reading) {
252+
reading = true;
253+
}
254+
}
255+
256+
// consume message buffered in receiving queue
257+
// there is at most one message buffered when we
258+
// stopReceiving between #readSize and #readMsgBody
259+
if (response.get()) {
260+
ChannelHandlerPtr h;
261+
{
262+
boost::shared_lock<boost::shared_mutex> lock(this->destruction_lock);
263+
if (this->handler.get()) {
264+
h = this->handler;
265+
}
266+
}
267+
if (h.get()) {
268+
h->messageReceived(shared_from_this(), response);
269+
}
270+
}
271+
272+
// if channel is not in reading state, #readSize
273+
if (!inReadingState) {
274+
DuplexChannel::readSize(shared_from_this());
275+
}
223276
}
224277

225278
bool DuplexChannel::isReceiving() {
@@ -320,9 +373,19 @@ void DuplexChannel::kill() {
320373
if (connected) {
321374
setState(DEAD);
322375

323-
socket.cancel();
324-
socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
325-
socket.close();
376+
boost::system::error_code ec;
377+
socket.cancel(ec);
378+
if (ec) {
379+
LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
380+
}
381+
socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
382+
if (ec) {
383+
LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
384+
}
385+
socket.close(ec);
386+
if (ec) {
387+
LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
388+
}
326389
}
327390
handler = ChannelHandlerPtr(); // clear the handler in case it ever referenced the channel*/
328391
}

hedwig-client/src/main/cpp/lib/channel.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ namespace Hedwig {
139139
boost::shared_mutex state_lock;
140140

141141
bool receiving;
142+
bool reading;
143+
PubSubResponsePtr outstanding_response;
142144
boost::mutex receiving_lock;
143145

144146
bool sending;

hedwig-client/src/main/cpp/lib/subscriberimpl.cpp

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -235,24 +235,31 @@ void SubscriberClientChannelHandler::channelDisconnected(const DuplexChannelPtr&
235235
}
236236

237237
void SubscriberClientChannelHandler::startDelivery(const MessageHandlerCallbackPtr& handler) {
238-
boost::lock_guard<boost::shared_mutex> lock(queue_lock);
238+
{
239+
boost::lock_guard<boost::shared_mutex> lock(queue_lock);
239240

240-
this->handler = handler;
241+
this->handler = handler;
241242

242-
if (!(this->handler.get())) {
243-
// no message handler callback
244-
LOG4CXX_WARN(logger, "Handler " << this << " try to start an empty message handler");
245-
return;
246-
}
243+
if (!(this->handler.get())) {
244+
// no message handler callback
245+
LOG4CXX_WARN(logger, "Handler " << this << " try to start an empty message handler");
246+
return;
247+
}
247248

248-
while (!queue.empty()) {
249-
PubSubResponsePtr m = queue.front();
250-
queue.pop_front();
249+
while (!queue.empty()) {
250+
PubSubResponsePtr m = queue.front();
251+
queue.pop_front();
251252

252-
OperationCallbackPtr callback(new SubscriberConsumeCallback(client, shared_from_this(), origData, m));
253+
OperationCallbackPtr callback(new SubscriberConsumeCallback(client, shared_from_this(), origData, m));
253254

254-
this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m->message(), callback);
255+
this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m->message(), callback);
256+
}
255257
}
258+
259+
// put channel#startReceiving out of lock of subscriber#queue_lock
260+
// otherwise we enter dead lock
261+
// subscriber#startDelivery(subscriber#queue_lock) =>
262+
// channel#startReceiving(channel#receiving_lock) =>
256263
channel->startReceiving();
257264
}
258265

hedwig-client/src/main/cpp/scripts/server-control.sh

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,25 @@ start_hw_server () {
9595
COUNT=$2
9696
PORT=$((4080+$COUNT))
9797

98+
export HEDWIG_LOG_CONF=/tmp/hw-log4j-$COUNT.properties
99+
cat > $HEDWIG_LOG_CONF <<EOF
100+
log4j.rootLogger=INFO, ROLLINGFILE
101+
#
102+
# Add ROLLINGFILE to rootLogger to get log file output
103+
# Log DEBUG level and above messages to a log file
104+
log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
105+
log4j.appender.ROLLINGFILE.Threshold=DEBUG
106+
log4j.appender.ROLLINGFILE.File=/tmp/hedwig-server-$COUNT.log
107+
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
108+
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
109+
# Max log file size of 10MB
110+
log4j.appender.ROLLINGFILE.MaxFileSize=10MB
111+
# uncomment the next line to limit number of backup files
112+
#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
113+
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
114+
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
115+
EOF
116+
98117
export HEDWIG_SERVER_CONF=/tmp/hw-server-$COUNT.conf
99118
cat > $HEDWIG_SERVER_CONF <<EOF
100119
zk_host=localhost:2181

hedwig-client/src/main/cpp/test/pubsubtest.cpp

Lines changed: 94 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class PubSubTestSuite : public CppUnit::TestFixture {
4242
private:
4343
CPPUNIT_TEST_SUITE( PubSubTestSuite );
4444
CPPUNIT_TEST(testPubSubOrderChecking);
45+
CPPUNIT_TEST(testRandomDelivery);
4546
CPPUNIT_TEST(testPubSubContinuousOverClose);
4647
// CPPUNIT_TEST(testPubSubContinuousOverServerDown);
4748
CPPUNIT_TEST(testMultiTopic);
@@ -118,13 +119,17 @@ class PubSubTestSuite : public CppUnit::TestFixture {
118119
int newMsgId = atoi(msg.body().c_str());
119120
// checking msgId
120121
LOG4CXX_DEBUG(logger, "received message " << newMsgId);
121-
if (isInOrder) {
122-
if (newMsgId != startMsgId + 1) {
123-
LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId);
124-
isInOrder = false;
125-
} else {
126-
startMsgId = newMsgId;
122+
if (startMsgId >= 0) { // need to check ordering if start msg id is larger than 0
123+
if (isInOrder) {
124+
if (newMsgId != startMsgId + 1) {
125+
LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId);
126+
isInOrder = false;
127+
} else {
128+
startMsgId = newMsgId;
129+
}
127130
}
131+
} else { // we set first msg id as startMsgId when startMsgId is -1
132+
startMsgId = newMsgId;
128133
}
129134
callback->operationComplete();
130135
sleep(sleepTimeInConsume);
@@ -152,31 +157,101 @@ class PubSubTestSuite : public CppUnit::TestFixture {
152157
int sleepTimeInConsume;
153158
};
154159

155-
class PubForOrderChecking {
160+
// Publisher integer until finished
161+
class IntegerPublisher {
156162
public:
157-
PubForOrderChecking(std::string &topic, int startMsgId, int numMsgs, int sleepTime, Hedwig::Publisher &pub)
158-
: topic(topic), startMsgId(startMsgId), numMsgs(numMsgs), sleepTime(sleepTime), pub(pub) {
163+
IntegerPublisher(std::string &topic, int startMsgId, int numMsgs, int sleepTime, Hedwig::Publisher &pub, long runTime)
164+
: topic(topic), startMsgId(startMsgId), numMsgs(numMsgs), sleepTime(sleepTime), pub(pub), running(true), runTime(runTime) {
159165
}
160166

161167
void operator()() {
162-
for (int i=0; i<numMsgs; i++) {
163-
int msg = startMsgId + i;
164-
std::stringstream ss;
165-
ss << msg;
166-
pub.publish(topic, ss.str());
167-
sleep(sleepTime);
168-
}
168+
int i = 1;
169+
long beginTime = curTime();
170+
long elapsedTime = 0;
171+
172+
while (running) {
173+
try {
174+
int msg = startMsgId + i;
175+
std::stringstream ss;
176+
ss << msg;
177+
pub.publish(topic, ss.str());
178+
sleep(sleepTime);
179+
if (numMsgs > 0 && i >= numMsgs) {
180+
running = false;
181+
} else {
182+
if (i % 100 == 0 &&
183+
(elapsedTime = (curTime() - beginTime)) >= runTime) {
184+
LOG4CXX_DEBUG(logger, "Elapsed time : " << elapsedTime);
185+
running = false;
186+
}
187+
}
188+
++i;
189+
} catch (std::exception &e) {
190+
LOG4CXX_WARN(logger, "Exception when publishing messages : " << e.what());
191+
}
192+
}
169193
}
170194

195+
long curTime() {
196+
struct timeval tv;
197+
long mtime;
198+
gettimeofday(&tv, NULL);
199+
mtime = tv.tv_sec * 1000 + tv.tv_usec / 1000.0 + 0.5;
200+
return mtime;
201+
}
171202

172203
private:
173204
std::string topic;
174205
int startMsgId;
175206
int numMsgs;
176207
int sleepTime;
177208
Hedwig::Publisher& pub;
209+
bool running;
210+
long runTime;
178211
};
179212

213+
// test startDelivery / stopDelivery randomly
214+
void testRandomDelivery() {
215+
std::string topic = "randomDeliveryTopic";
216+
std::string subscriber = "mysub-randomDelivery";
217+
218+
int nLoops = 300;
219+
int sleepTimePerLoop = 1;
220+
int syncTimeout = 10000;
221+
222+
Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout);
223+
std::auto_ptr<Hedwig::Configuration> confptr(conf);
224+
225+
Hedwig::Client* client = new Hedwig::Client(*conf);
226+
std::auto_ptr<Hedwig::Client> clientptr(client);
227+
228+
Hedwig::Subscriber& sub = client->getSubscriber();
229+
Hedwig::Publisher& pub = client->getPublisher();
230+
231+
// subscribe topic
232+
sub.subscribe(topic, subscriber, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
233+
234+
// start thread to publish message
235+
IntegerPublisher intPublisher = IntegerPublisher(topic, 0, 0, 0, pub, nLoops * sleepTimePerLoop * 1000);
236+
boost::thread pubThread(intPublisher);
237+
238+
// start random delivery
239+
MyOrderCheckingMessageHandlerCallback* cb =
240+
new MyOrderCheckingMessageHandlerCallback(topic, subscriber, 0, 0);
241+
Hedwig::MessageHandlerCallbackPtr handler(cb);
242+
243+
for (int i = 0; i < nLoops; i++) {
244+
LOG4CXX_DEBUG(logger, "Randomly Delivery : " << i);
245+
sub.startDelivery(topic, subscriber, handler);
246+
// sleep random time
247+
sleep(sleepTimePerLoop);
248+
sub.stopDelivery(topic, subscriber);
249+
CPPUNIT_ASSERT(cb->inOrder());
250+
}
251+
252+
pubThread.join();
253+
}
254+
180255
// check message ordering
181256
void testPubSubOrderChecking() {
182257
std::string topic = "orderCheckingTopic";
@@ -204,17 +279,17 @@ class PubSubTestSuite : public CppUnit::TestFixture {
204279

205280
// we don't start delivery first, so the message will be queued
206281
// publish ${numMessages} messages, so the messages will be queued
207-
for (int i=0; i<numMessages; i++) {
282+
for (int i=1; i<=numMessages; i++) {
208283
std::stringstream ss;
209284
ss << i;
210285
pub.publish(topic, ss.str());
211286
}
212287

213-
MyOrderCheckingMessageHandlerCallback* cb = new MyOrderCheckingMessageHandlerCallback(topic, sid, -1, sleepTimeInConsume);
288+
MyOrderCheckingMessageHandlerCallback* cb = new MyOrderCheckingMessageHandlerCallback(topic, sid, 0, sleepTimeInConsume);
214289
Hedwig::MessageHandlerCallbackPtr handler(cb);
215290

216291
// create a thread to publish another ${numMessages} messages
217-
boost::thread pubThread(PubForOrderChecking(topic, numMessages, numMessages, sleepTimeInConsume, pub));
292+
boost::thread pubThread(IntegerPublisher(topic, numMessages, numMessages, sleepTimeInConsume, pub, 0));
218293

219294
// start delivery will consumed the queued messages
220295
// new message will recevied and the queued message should be consumed

0 commit comments

Comments
 (0)