Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/brpc/rtmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ class RtmpSocketCreator : public SocketCreator {
: _connect_options(connect_options) {
}

int CreateSocket(const SocketOptions& opt, SocketId* id) {
int CreateSocket(const SocketOptions& opt, SocketId* id) override {
SocketOptions sock_opt = opt;
sock_opt.app_connect = std::make_shared<RtmpConnect>();
sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL);
Expand Down
20 changes: 12 additions & 8 deletions src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ friend class SubDone;
ChannelBalancer::~ChannelBalancer() {
for (ChannelToIdMap::iterator
it = _chan_map.begin(); it != _chan_map.end(); ++it) {
SocketUniquePtr ptr(it->second); // Dereference
it->second->ReleaseAdditionalReference();
it->second->ReleaseHCRelatedReference();
}
_chan_map.clear();
}
Expand Down Expand Up @@ -196,15 +196,21 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
return -1;
}
SocketUniquePtr ptr;
CHECK_EQ(0, Socket::Address(sock_id, &ptr));
int rc = Socket::AddressFailedAsWell(sock_id, &ptr);
if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) {
LOG(FATAL) << "Fail to address SocketId=" << sock_id;
return -1;
}
if (!AddServer(ServerId(sock_id))) {
LOG(ERROR) << "Duplicated sub_channel=" << sub_channel;
// sub_chan will be deleted when the socket is recycled.
ptr->SetFailed();
// Cancel health checking.
ptr->ReleaseHCRelatedReference();
return -1;
}
ptr->SetHCRelatedRefHeld(); // set held status
_chan_map[sub_channel]= ptr.release(); // Add reference.
// The health-check-related reference has been held on created.
_chan_map[sub_channel]= ptr.get();
if (handle) {
*handle = sock_id;
}
Expand All @@ -223,13 +229,11 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha
BAIDU_SCOPED_LOCK(_mutex);
CHECK_EQ(1UL, _chan_map.erase(sub->chan));
}
{
ptr->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr2(ptr.get()); // Dereference.
}
if (rc == 0) {
ptr->ReleaseAdditionalReference();
}
// Cancel health checking.
ptr->ReleaseHCRelatedReference();
}
}

Expand Down
65 changes: 51 additions & 14 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ int Socket::ResetFileDescriptor(int fd) {
if (!ValidFileDescriptor(fd)) {
return 0;
}
if (_remote_side == butil::EndPoint()) {
// OK to fail, non-socket fd does not support this.
butil::get_remote_side(fd, &_remote_side);
}
// OK to fail, non-socket fd does not support this.
if (butil::get_local_side(fd, &_local_side) != 0) {
_local_side = butil::EndPoint();
Expand Down Expand Up @@ -781,6 +785,19 @@ int Socket::OnCreated(const SocketOptions& options) {
_keepalive_options = options.keepalive_options;
CHECK(NULL == _write_head.load(butil::memory_order_relaxed));
_is_write_shutdown = false;
int fd = options.fd;
if (!ValidFileDescriptor(fd) && options.connect_on_create) {
// Connect on create.
fd = DoConnect(options.connect_abstime, NULL, NULL);
if (fd < 0) {
PLOG(ERROR) << "Fail to connect to " << options.remote_side;
int error_code = errno != 0 ? errno : EHOSTDOWN;
SetFailed(error_code, "Fail to connect to %s: %s",
butil::endpoint2str(options.remote_side).c_str(),
berror(error_code));
return -1;
}
}
// Must be the last one! Internal fields of this Socket may be accessed
// just after calling ResetFileDescriptor.
if (ResetFileDescriptor(options.fd) != 0) {
Expand All @@ -790,6 +807,7 @@ int Socket::OnCreated(const SocketOptions& options) {
berror(saved_errno));
return -1;
}
HoldHCRelatedRef();
guard.dismiss();

return 0;
Expand Down Expand Up @@ -940,6 +958,20 @@ std::string Socket::OnDescription() const {
return result;
}

void Socket::HoldHCRelatedRef() {
if (_health_check_interval_s > 0) {
_is_hc_related_ref_held = true;
AddReference();
}
}

void Socket::ReleaseHCRelatedReference() {
if (_health_check_interval_s > 0) {
_is_hc_related_ref_held = false;
Dereference();
}
}

int Socket::WaitAndReset(int32_t expected_nref) {
const uint32_t id_ver = VersionOfVRefId(id());
uint64_t vref;
Expand Down Expand Up @@ -1350,16 +1382,27 @@ int Socket::CheckConnected(int sockfd) {
return -1;
}

butil::EndPoint local_point;
CHECK_EQ(0, butil::get_local_side(sockfd, &local_point));
LOG_IF(INFO, FLAGS_log_connected)
<< "Connected to " << remote_side()
<< " via fd=" << (int)sockfd << " SocketId=" << id()
<< " local_side=" << local_point;
if (FLAGS_log_connected) {
butil::EndPoint local_point;
CHECK_EQ(0, butil::get_local_side(sockfd, &local_point));
LOG(INFO) << "Connected to " << remote_side()
<< " via fd=" << (int)sockfd << " SocketId=" << id()
<< " local_side=" << local_point;
}

// Doing SSL handshake after TCP connected
return SSLHandshake(sockfd, false);
}

int Socket::DoConnect(const timespec* abstime,
int (*on_connect)(int, int, void*), void* data) {
if (_conn) {
return _conn->Connect(this, abstime, on_connect, data);
} else {
return Connect(abstime, on_connect, data);
}
}

int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) {
if (_fd.load(butil::memory_order_consume) >= 0) {
return 0;
Expand All @@ -1370,14 +1413,8 @@ int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) {
SocketUniquePtr s;
ReAddress(&s);
req->set_socket(s.get());
if (_conn) {
if (_conn->Connect(this, abstime, KeepWriteIfConnected, req) < 0) {
return -1;
}
} else {
if (Connect(abstime, KeepWriteIfConnected, req) < 0) {
return -1;
}
if (DoConnect(abstime, KeepWriteIfConnected, req) < 0) {
return -1;
}
s.release();
return 1;
Expand Down
27 changes: 20 additions & 7 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,14 @@ struct SocketOptions {
// user->BeforeRecycle() before recycling.
int fd;
butil::EndPoint remote_side;
// If `connect_on_create' is true and `fd' is less than 0,
// a client connection will be established to remote_side()
// regarding deadline `connect_abstime' when Socket is being created.
// Default: false, means that a connection will be established
// on first write.
bool connect_on_create;
// Default: NULL, means no timeout.
const timespec* connect_abstime;
SocketUser* user;
// When *edge-triggered* events happen on the file descriptor, callback
// `on_edge_triggered_events' will be called. Inside the callback, user
Expand Down Expand Up @@ -409,16 +417,15 @@ friend void DereferenceSocket(Socket*);

// True if health checking is enabled.
bool HCEnabled() const {
// This fence makes sure that we see change of
// `_is_hc_related_ref_held' before changing `_versioned_ref.
butil::atomic_thread_fence(butil::memory_order_acquire);
return _health_check_interval_s > 0 && _is_hc_related_ref_held;
}

// When someone holds a health-checking-related reference,
// this function need to be called to make health checking run normally.
void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; }
// When someone releases the health-checking-related reference,
// this function need to be called to cancel health checking.
void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; }
bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; }
// Release the health-checking-related
// reference which is held on created.
void ReleaseHCRelatedReference();

// After health checking is complete, set _hc_started to false.
void AfterHCCompleted() { _hc_started.store(false, butil::memory_order_relaxed); }
Expand Down Expand Up @@ -665,6 +672,9 @@ friend void DereferenceSocket(Socket*);

std::string OnDescription() const;

// Hold the health-checking-related
// reference on created.
void HoldHCRelatedRef();

static int Status(SocketId, int32_t* nref = NULL); // for unit-test.

Expand Down Expand Up @@ -699,8 +709,11 @@ friend void DereferenceSocket(Socket*);
// starting a connection request and `on_connect' will be called
// when connecting completes (whether it succeeds or not)
// Returns the socket fd on success, -1 otherwise
int DoConnect(const timespec* abstime,
int (*on_connect)(int fd, int err, void* data), void* data);
int Connect(const timespec* abstime,
int (*on_connect)(int fd, int err, void* data), void* data);

int CheckConnected(int sockfd);

// [Not thread-safe] Only used by `Write'.
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/socket_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ namespace brpc {

inline SocketOptions::SocketOptions()
: fd(-1)
, connect_on_create(false)
, connect_abstime(NULL)
, user(NULL)
, on_edge_triggered_events(NULL)
, health_check_interval_s(-1)
Expand Down
17 changes: 9 additions & 8 deletions src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static butil::static_atomic<SocketMap*> g_socket_map = BUTIL_STATIC_ATOMIC_INIT(

class GlobalSocketCreator : public SocketCreator {
public:
int CreateSocket(const SocketOptions& opt, SocketId* id) {
int CreateSocket(const SocketOptions& opt, SocketId* id) override {
SocketOptions sock_opt = opt;
sock_opt.health_check_interval_s = FLAGS_health_check_interval;
return get_client_side_messenger()->Create(sock_opt, id);
Expand Down Expand Up @@ -237,8 +237,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
return 0;
}
// A socket w/o HC is failed (permanently), replace it.
sc->socket->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion.
sc->socket->ReleaseHCRelatedReference();
_map.erase(key); // in principle, we can override the entry in map w/o
// removing and inserting it again. But this would make error branches
// below have to remove the entry before returning, which is
Expand All @@ -258,12 +257,15 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
// use SocketUniquePtr which cannot put into containers before c++11.
// The ref will be removed at entry's removal.
SocketUniquePtr ptr;
if (Socket::Address(tmp_id, &ptr) != 0) {
int rc = Socket::AddressFailedAsWell(tmp_id, &ptr);
if (rc < 0) {
LOG(FATAL) << "Fail to address SocketId=" << tmp_id;
return -1;
} else if (rc > 0 && !ptr->HCEnabled()) {
LOG(FATAL) << "Failed socket is not HC-enabled";
return -1;
}
ptr->SetHCRelatedRefHeld(); // set held status
SingleConnection new_sc = { 1, ptr.release(), 0 };
SingleConnection new_sc = { 1, ptr.get(), 0 };
_map[key] = new_sc;
*id = tmp_id;
mu.unlock();
Expand Down Expand Up @@ -301,8 +303,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
_map.erase(key);
mu.unlock();
s->ReleaseAdditionalReference(); // release extra ref
s->SetHCRelatedRefReleased(); // set released status to cancel health checking
SocketUniquePtr ptr(s); // Dereference
s->ReleaseHCRelatedReference();
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/brpc/versioned_ref_with_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ friend void DereferenceVersionedRefWithId<>(T* r);
// it will be recycled automatically and T::BeforeRecycled() will be called.
int Dereference();

// Increase the reference count by 1.
void AddReference() {
_versioned_ref.fetch_add(1, butil::memory_order_release);
}

// Make this socket addressable again.
// If nref is less than `at_least_nref', VersionedRefWithId was
// abandoned during revival and cannot be revived.
Expand Down
21 changes: 12 additions & 9 deletions test/brpc_socket_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
{
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));
s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
ASSERT_EQ(-1, s->fd());
Expand Down Expand Up @@ -542,6 +541,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
#endif
ASSERT_TRUE(src.empty());
ASSERT_EQ(-1, s->fd());
s->ReleaseHCRelatedReference();
}
// StartHealthCheck is possibly still running. Spin until global_sock
// is NULL(set in CheckRecycle::BeforeRecycle). Notice that you should
Expand Down Expand Up @@ -650,12 +650,14 @@ TEST_F(SocketTest, health_check) {
options.user = new CheckRecycle;
options.health_check_interval_s = kCheckInteval/*s*/;
ASSERT_EQ(0, brpc::Socket::Create(options, &id));
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));

s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
brpc::Socket* s = NULL;
{
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
s = ptr.get();
}
global_sock = s;
ASSERT_NE(nullptr, s);
ASSERT_EQ(-1, s->fd());
ASSERT_EQ(point, s->remote_side());
ASSERT_EQ(id, s->id());
Expand Down Expand Up @@ -763,7 +765,7 @@ TEST_F(SocketTest, health_check) {
ASSERT_NE(0, ptr->fd());
}

s.release()->Dereference();
s->ReleaseHCRelatedReference();

// Must stop messenger before SetFailed the id otherwise StartHealthCheck
// still has chance to get reconnected and revive the id.
Expand All @@ -779,7 +781,8 @@ TEST_F(SocketTest, health_check) {
bthread_usleep(1000);
ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L);
}
ASSERT_EQ(-1, brpc::Socket::Status(id));
nref = 0;
ASSERT_EQ(-1, brpc::Socket::Status(id, &nref)) << "nref=" << nref;
// The id is invalid.
brpc::SocketUniquePtr ptr;
ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
Expand Down
Loading