Skip to content

Commit 9daeb60

Browse files
committed
BOOKKEEPER-117: Support multi threads in hedwig cpp client to leverage multi-core hardware (Sijie Guo via ivank)
git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1207289 13f79535-47bb-0310-9956-ffa450edef68
1 parent e110067 commit 9daeb60

File tree

12 files changed

+160
-36
lines changed

12 files changed

+160
-36
lines changed

CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ BUGFIXES:
110110

111111
BOOKKEEPER-118: Hedwig client doesn't kill and remove old subscription channel after redirection. (Sijie Guo via ivank)
112112

113+
BOOKKEEPER-117: Support multi threads in hedwig cpp client to leverage multi-core hardware (Sijie Guo via ivank)
114+
113115
IMPROVEMENTS:
114116

115117
BOOKKEEPER-28: Create useful startup scripts for bookkeeper and hedwig (ivank)

hedwig-client/src/main/cpp/inc/hedwig/client.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ namespace Hedwig {
4646
static const std::string RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME;
4747
static const std::string SYNC_REQUEST_TIMEOUT;
4848
static const std::string SUBSCRIBER_AUTOCONSUME;
49+
static const std::string NUM_DISPATCH_THREADS;
4950

5051
public:
5152
Configuration() {};

hedwig-client/src/main/cpp/lib/channel.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ using namespace Hedwig;
5050

5151
DuplexChannel::DuplexChannel(EventDispatcher& dispatcher, const HostAddress& addr,
5252
const Configuration& cfg, const ChannelHandlerPtr& handler)
53-
: dispatcher(dispatcher), address(addr), handler(handler),
54-
socket(dispatcher.getService()), instream(&in_buf), copy_buf(NULL), copy_buf_length(0),
53+
: dispatcher(dispatcher), address(addr), handler(handler), service(dispatcher.getService()),
54+
socket(service), instream(&in_buf), copy_buf(NULL), copy_buf_length(0),
5555
state(UNINITIALISED), receiving(false), reading(false), sending(false)
5656
{
5757
LOG4CXX_DEBUG(logger, "Creating DuplexChannel(" << this << ")");

hedwig-client/src/main/cpp/lib/channel.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include <boost/shared_ptr.hpp>
3939
#include <boost/enable_shared_from_this.hpp>
4040

41+
#include <boost/asio.hpp>
4142
#include <boost/asio/ip/tcp.hpp>
4243
#include <boost/thread/mutex.hpp>
4344
#include <boost/thread/shared_mutex.hpp>
@@ -110,6 +111,10 @@ namespace Hedwig {
110111
void channelDisconnected(const std::exception& e);
111112
virtual void kill();
112113

114+
inline boost::asio::io_service & getService() {
115+
return service;
116+
}
117+
113118
virtual ~DuplexChannel();
114119
private:
115120
enum State { UNINITIALISED, CONNECTING, CONNECTED, DEAD };
@@ -121,6 +126,7 @@ namespace Hedwig {
121126
HostAddress address;
122127
ChannelHandlerPtr handler;
123128

129+
boost::asio::io_service &service;
124130
boost::asio::ip::tcp::socket socket;
125131
boost::asio::streambuf in_buf;
126132
std::istream instream;

hedwig-client/src/main/cpp/lib/client.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const std::string Configuration::MAX_MESSAGE_QUEUE_SIZE = "hedwig.cpp.max_msgque
3737
const std::string Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = "hedwig.cpp.reconnect_subscribe_retry_wait_time";
3838
const std::string Configuration::SYNC_REQUEST_TIMEOUT = "hedwig.cpp.sync_request_timeout";
3939
const std::string Configuration::SUBSCRIBER_AUTOCONSUME = "hedwig.cpp.subscriber_autoconsume";
40+
const std::string Configuration::NUM_DISPATCH_THREADS = "hedwig.cpp.num_dispatch_threads";
4041

4142
Client::Client(const Configuration& conf) {
4243
LOG4CXX_DEBUG(logger, "Client::Client (" << this << ")");

hedwig-client/src/main/cpp/lib/clientimpl.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
3030
using namespace Hedwig;
3131

3232
const std::string DEFAULT_SERVER_DEFAULT_VAL = "";
33+
const int DEFAULT_NUM_DISPATCH_THREADS = 1;
3334

3435
void SyncOperationCallback::wait() {
3536
boost::unique_lock<boost::mutex> lock(mut);
@@ -182,15 +183,15 @@ ClientImplPtr ClientImpl::Create(const Configuration& conf) {
182183
ClientImplPtr impl(new ClientImpl(conf));
183184
LOG4CXX_DEBUG(logger, "Creating Clientimpl " << impl);
184185

185-
impl->dispatcher.start();
186+
impl->dispatcher->start();
186187

187188
return impl;
188189
}
189190

190191
void ClientImpl::Destroy() {
191192
LOG4CXX_DEBUG(logger, "destroying Clientimpl " << this);
192193

193-
dispatcher.stop();
194+
dispatcher->stop();
194195
{
195196
boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
196197

@@ -217,6 +218,7 @@ ClientImpl::ClientImpl(const Configuration& conf)
217218
: conf(conf), publisher(NULL), subscriber(NULL), counterobj(), shuttingDownFlag(false)
218219
{
219220
defaultHost = HostAddress::fromString(conf.get(Configuration::DEFAULT_SERVER, DEFAULT_SERVER_DEFAULT_VAL));
221+
dispatcher = EventDispatcherPtr(new EventDispatcher(conf.getInt(Configuration::NUM_DISPATCH_THREADS, DEFAULT_NUM_DISPATCH_THREADS)));
220222
}
221223

222224
Subscriber& ClientImpl::getSubscriber() {
@@ -312,7 +314,7 @@ DuplexChannelPtr ClientImpl::createChannel(const std::string& topic, const Chann
312314
setHostForTopic(topic, addr);
313315
}
314316

315-
DuplexChannelPtr channel(new DuplexChannel(dispatcher, addr, conf, handler));
317+
DuplexChannelPtr channel(new DuplexChannel(*dispatcher, addr, conf, handler));
316318

317319
boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
318320
if (shuttingDownFlag) {
@@ -392,5 +394,5 @@ const Configuration& ClientImpl::getConfiguration() {
392394
}
393395

394396
boost::asio::io_service& ClientImpl::getService() {
395-
return dispatcher.getService();
397+
return dispatcher->getService();
396398
}

hedwig-client/src/main/cpp/lib/clientimpl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ namespace Hedwig {
134134

135135
ClientTxnCounter counterobj;
136136

137-
EventDispatcher dispatcher;
137+
typedef boost::shared_ptr<EventDispatcher> EventDispatcherPtr;
138+
EventDispatcherPtr dispatcher;
138139

139140
typedef std::tr1::unordered_multimap<HostAddress, std::string, HostAddressHash > Host2TopicsMap;
140141
Host2TopicsMap host2topics;

hedwig-client/src/main/cpp/lib/eventdispatcher.cpp

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,49 +27,76 @@ static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
2727

2828
using namespace Hedwig;
2929

30-
EventDispatcher::EventDispatcher() : service(), dummy_work(NULL), t(NULL) {
30+
EventDispatcher::EventDispatcher(int numThreads)
31+
: num_threads(numThreads), running(false), next_io_service(0) {
32+
if (0 == num_threads) {
33+
throw std::runtime_error("number of threads in dispatcher is zero");
34+
}
35+
for (size_t i = 0; i < num_threads; i++) {
36+
io_service_ptr service(new boost::asio::io_service);
37+
services.push_back(service);
38+
}
3139
}
3240

33-
void EventDispatcher::run_forever() {
34-
LOG4CXX_DEBUG(logger, "Starting event dispatcher");
41+
void EventDispatcher::run_forever(io_service_ptr service, size_t idx) {
42+
LOG4CXX_DEBUG(logger, "Starting event dispatcher " << idx);
3543

3644
while (true) {
3745
try {
38-
service.run();
46+
service->run();
3947
break;
4048
} catch (std::exception &e) {
41-
LOG4CXX_ERROR(logger, "Exception in dispatch handler. " << e.what());
49+
LOG4CXX_ERROR(logger, "Exception in dispatch handler " << idx << " : " << e.what());
4250
}
4351
}
44-
LOG4CXX_DEBUG(logger, "Event dispatcher done");
52+
LOG4CXX_DEBUG(logger, "Event dispatcher " << idx << " done");
4553
}
4654

4755
void EventDispatcher::start() {
48-
if (t) {
56+
if (running) {
4957
return;
5058
}
51-
dummy_work = new boost::asio::io_service::work(service);
52-
t = new boost::thread(boost::bind(&EventDispatcher::run_forever, this));
59+
for (size_t i = 0; i < num_threads; i++) {
60+
io_service_ptr service = services[i];
61+
work_ptr work(new boost::asio::io_service::work(*service));
62+
works.push_back(work);
63+
// new thread
64+
thread_ptr t(new boost::thread(boost::bind(&EventDispatcher::run_forever, this, service, i)));
65+
threads.push_back(t);
66+
}
67+
running = true;
5368
}
5469

5570
void EventDispatcher::stop() {
56-
if (!t) {
71+
if (!running) {
5772
return;
5873
}
59-
delete dummy_work;
60-
dummy_work = NULL;
61-
62-
service.stop();
63-
64-
t->join();
65-
delete t;
66-
t = NULL;
74+
75+
works.clear();
76+
77+
for (size_t i = 0; i < num_threads; i++) {
78+
services[i]->stop();
79+
}
80+
81+
for (size_t i = 0; i < num_threads; i++) {
82+
threads[i]->join();
83+
}
84+
threads.clear();
85+
86+
running = false;
6787
}
6888

6989
EventDispatcher::~EventDispatcher() {
70-
delete dummy_work;
90+
services.clear();
7191
}
7292

7393
boost::asio::io_service& EventDispatcher::getService() {
94+
size_t next = 0;
95+
{
96+
boost::lock_guard<boost::mutex> lock(next_lock);
97+
next = next_io_service;
98+
next_io_service = (next_io_service + 1) % num_threads;
99+
}
100+
boost::asio::io_service& service = *services[next];
74101
return service;
75102
}

hedwig-client/src/main/cpp/lib/eventdispatcher.h

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,21 @@
1818
#ifndef EVENTDISPATCHER_H
1919
#define EVENTDISPATCHER_H
2020

21+
#include <vector>
22+
2123
#include <boost/asio.hpp>
2224
#include <boost/thread.hpp>
25+
#include <boost/thread/mutex.hpp>
26+
#include <boost/shared_ptr.hpp>
2327

2428
namespace Hedwig {
29+
typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr;
30+
typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr;
31+
typedef boost::shared_ptr<boost::thread> thread_ptr;
32+
2533
class EventDispatcher {
2634
public:
27-
EventDispatcher();
35+
EventDispatcher(int numThreads = 1);
2836
~EventDispatcher();
2937

3038
void start();
@@ -33,11 +41,21 @@ namespace Hedwig {
3341
boost::asio::io_service& getService();
3442

3543
private:
36-
void run_forever();
44+
void run_forever(io_service_ptr service, size_t idx);
3745

38-
boost::asio::io_service service;
39-
boost::asio::io_service::work* dummy_work;
40-
boost::thread* t;
46+
// number of threads
47+
size_t num_threads;
48+
// running flag
49+
bool running;
50+
// pool of io_services.
51+
std::vector<io_service_ptr> services;
52+
// pool of works
53+
std::vector<work_ptr> works;
54+
// threads
55+
std::vector<thread_ptr> threads;
56+
// next io_service used for a connection
57+
boost::mutex next_lock;
58+
std::size_t next_io_service;
4159
};
4260
}
4361

hedwig-client/src/main/cpp/lib/subscriberimpl.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ void SubscriberConsumeCallback::operationFailed(const std::exception& exception)
127127
LOG4CXX_ERROR(logger, "Error passing message to client transaction: " << data->getTxnId() << " error: " << exception.what()
128128
<< " retrying in " << retrywait << " Microseconds");
129129

130-
boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
130+
boost::asio::deadline_timer t(handler->getChannel()->getService(), boost::posix_time::milliseconds(retrywait));
131131

132132
t.async_wait(boost::bind(&SubscriberConsumeCallback::timerComplete, handler, m, boost::asio::placeholders::error));
133133
}
@@ -207,7 +207,7 @@ void SubscriberClientChannelHandler::channelDisconnected(const DuplexChannelPtr&
207207
int retrywait = client->getConfiguration().getInt(Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME,
208208
DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME);
209209

210-
boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
210+
boost::asio::deadline_timer t(channel->getService(), boost::posix_time::milliseconds(retrywait));
211211
t.async_wait(boost::bind(&SubscriberClientChannelHandler::reconnectTimerComplete, shared_from_this(),
212212
channel, e, boost::asio::placeholders::error));
213213
return;

0 commit comments

Comments
 (0)