Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,13 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
}
SocketUniquePtr ptr;
int rc = Socket::AddressFailedAsWell(sock_id, &ptr);
if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) {
LOG(FATAL) << "Fail to address SocketId=" << sock_id;
if (rc < 0) {
LOG(ERROR) << "Fail to address SocketId=" << sock_id;
return -1;
}
if (rc > 0 && !ptr->HCEnabled()) {
LOG(ERROR) << "Health check of SocketId="
<< sock_id << " is disabled";
return -1;
}
if (!AddServer(ServerId(sock_id))) {
Expand Down
18 changes: 15 additions & 3 deletions src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
return 0;
}
// A socket w/o HC is failed (permanently), replace it.
sc->socket->ReleaseHCRelatedReference();
ReleaseReference(sc->socket);
_map.erase(key); // in principle, we can override the entry in map w/o
// removing and inserting it again. But this would make error branches
// below have to remove the entry before returning, which is
Expand Down Expand Up @@ -268,7 +268,10 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
LOG(FATAL) << "Failed socket is not HC-enabled";
return -1;
}
SingleConnection new_sc = { 1, ptr.get(), 0 };
// If health check is enabled, a health-checking-related reference
// is hold in Socket::Create.
// If health check is disabled, hold a reference in SocketMap.
SingleConnection new_sc = { 1, ptr->HCEnabled() ? ptr.get() : ptr.release(), 0 };
_map[key] = new_sc;
*id = tmp_id;
mu.unlock();
Expand Down Expand Up @@ -306,11 +309,20 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
_map.erase(key);
mu.unlock();
s->ReleaseAdditionalReference(); // release extra ref
s->ReleaseHCRelatedReference();
ReleaseReference(s);
}
}
}

void SocketMap::ReleaseReference(Socket* s) {
if (s->HCEnabled()) {
s->ReleaseHCRelatedReference();
} else {
// Release the extra ref hold in SocketMap::Insert.
SocketUniquePtr ptr(s);
}
}

int SocketMap::Find(const SocketMapKey& key, SocketId* id) {
BAIDU_SCOPED_LOCK(_mutex);
SingleConnection* sc = _map.seek(key);
Expand Down
1 change: 1 addition & 0 deletions src/brpc/socket_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class SocketMap {
private:
void RemoveInternal(const SocketMapKey& key, SocketId id,
bool remove_orphan);
static void ReleaseReference(Socket* s);
void ListOrphans(int64_t defer_us, std::vector<SocketMapKey>* out);
void WatchConnections();
static void* RunWatchConnections(void*);
Expand Down
27 changes: 27 additions & 0 deletions test/brpc_socket_map_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "brpc/reloadable_flags.h"

namespace brpc {
DECLARE_int32(health_check_interval);
DECLARE_int32(idle_timeout_second);
DECLARE_int32(defer_close_second);
DECLARE_int32(max_connection_pool_size);
Expand Down Expand Up @@ -59,6 +60,31 @@ class SocketMapTest : public ::testing::Test{
virtual void TearDown(){};
};

TEST_F(SocketMapTest, disable_health_check) {
int32_t old_interval = brpc::FLAGS_health_check_interval;
brpc::FLAGS_health_check_interval = 0;
brpc::SocketId id;
ASSERT_EQ(-1, brpc::SocketMapFind(g_key, &id));
ASSERT_EQ(0, brpc::SocketMapInsert(g_key, &id));
ASSERT_EQ(0, brpc::SocketMapFind(g_key, &id));
{
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
}
ASSERT_EQ(0, brpc::Socket::SetFailed(id));

brpc::SocketUniquePtr ptr;
// The socket should not be recycled,
// because SocketMap holds a reference to it.
ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(id, &ptr));
ASSERT_EQ(2, ptr->nref());
brpc::SocketMapRemove(g_key);
// After removing the socket, `ptr' holds the last reference.
ASSERT_EQ(1, ptr->nref());
ASSERT_EQ(-1, brpc::SocketMapFind(g_key, &id));
brpc::FLAGS_health_check_interval = old_interval;
}

TEST_F(SocketMapTest, idle_timeout) {
const int TIMEOUT = 1;
const int NTHREAD = 10;
Expand Down Expand Up @@ -140,6 +166,7 @@ TEST_F(SocketMapTest, max_pool_size) {
EXPECT_TRUE(ptrs[i]->Failed());
}
}

} //namespace

int main(int argc, char* argv[]) {
Expand Down
Loading