From 237146a103b8e48e405fe8ac16b2588630cc5c93 Mon Sep 17 00:00:00 2001 From: w-gc <25614556+w-gc@users.noreply.github.com> Date: Thu, 16 Jan 2025 15:31:00 +0800 Subject: [PATCH 1/6] fix memory leak issue --- src/brpc/server.cpp | 62 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 3 deletions(-) diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index ade8e274d5..3213e9b30b 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -151,7 +151,7 @@ ServerOptions::ServerOptions() , rtmp_service(NULL) , redis_service(NULL) , bthread_tag(BTHREAD_TAG_DEFAULT) - , rpc_pb_message_factory(new DefaultRpcPBMessageFactory()) + , rpc_pb_message_factory(NULL) , ignore_eovercrowded(false) { if (s_ncore > 0) { num_threads = s_ncore + 1; @@ -424,7 +424,10 @@ Server::Server(ProfilerLinker) , _eps_bvar(&_nerror_bvar) , _concurrency(0) , _concurrency_bvar(cast_no_barrier_int, &_concurrency) - ,_has_progressive_read_method(false) { + , _has_progressive_read_method(false) { + // `rpc_pb_message_factory` is created here because it is possible + // for users to visit it at any time after `Server` created + _options.rpc_pb_message_factory = new DefaultRpcPBMessageFactory(); BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0, Server_concurrency_must_be_aligned_by_cacheline); } @@ -782,6 +785,47 @@ static bool OptionsAvailableOverRdma(const ServerOptions* opt) { static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0); static bool g_default_ignore_eovercrowded(false); + +inline void copy_server_option(ServerOptions& dst, const ServerOptions& src) { + if (&dst == &src) { + return; + } + +#define FREE_PTR_IF_NOT_REUSED(ptr) \ + if (dst.ptr != src.ptr) { \ + delete dst.ptr; \ + dst.ptr = NULL; \ + } + + FREE_PTR_IF_NOT_REUSED(nshead_service); + +#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL + FREE_PTR_IF_NOT_REUSED(thrift_service); +#endif + + FREE_PTR_IF_NOT_REUSED(baidu_master_service); + FREE_PTR_IF_NOT_REUSED(http_master_service); + FREE_PTR_IF_NOT_REUSED(rpc_pb_message_factory); + + if (dst.pid_file != src.pid_file && !dst.pid_file.empty()) { + unlink(dst.pid_file.c_str()); + } + + if (dst.server_owns_auth) { + FREE_PTR_IF_NOT_REUSED(auth); + } + + if (dst.server_owns_interceptor) { + FREE_PTR_IF_NOT_REUSED(interceptor); + } + + FREE_PTR_IF_NOT_REUSED(redis_service); + +#undef FREE_PTR_IF_NOT_REUSED + + dst = src; +} + int Server::StartInternal(const butil::EndPoint& endpoint, const PortRange& port_range, const ServerOptions *opt) { @@ -813,14 +857,26 @@ int Server::StartInternal(const butil::EndPoint& endpoint, } return -1; } + if (opt) { - _options = *opt; + copy_server_option(_options, *opt); } else { + // Don't forget to release `rpc_pb_message_factory` before overwriting `_options` + delete _options.rpc_pb_message_factory; + _options.rpc_pb_message_factory = NULL; + // Always reset to default options explicitly since `_options' // may be the options for the last run or even bad options _options = ServerOptions(); } + // Create the resource if: + // 1. `_options` copied from user and user forgot to create + // 2. `_options` created by our + if (!_options.rpc_pb_message_factory) { + _options.rpc_pb_message_factory = new DefaultRpcPBMessageFactory(); + } + if (!_options.h2_settings.IsValid(true/*log_error*/)) { LOG(ERROR) << "Invalid h2_settings"; return -1; From 9ed8b8abbac1bc787f8be8e6f7e4e8eb9216b89c Mon Sep 17 00:00:00 2001 From: w-gc <25614556+w-gc@users.noreply.github.com> Date: Wed, 22 Jan 2025 17:15:37 +0800 Subject: [PATCH 2/6] add comments --- src/brpc/server.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 3213e9b30b..e913cd6f7a 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -426,7 +426,13 @@ Server::Server(ProfilerLinker) , _concurrency_bvar(cast_no_barrier_int, &_concurrency) , _has_progressive_read_method(false) { // `rpc_pb_message_factory` is created here because it is possible - // for users to visit it at any time after `Server` created + // for users to visit it at any time after `Server` created, such as + // the `_dummy` server of ChannelTest.success unit test case that uses + // `rpc_pb_message_factory` of the default ServerOptions: + // ```cpp + // Server _dummy; + // auto messages = _dummy.options().rpc_pb_message_factory->Get(...); + // ``` _options.rpc_pb_message_factory = new DefaultRpcPBMessageFactory(); BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0, Server_concurrency_must_be_aligned_by_cacheline); From f286fa4c7353380bc04c7fb46261d565f14f666b Mon Sep 17 00:00:00 2001 From: w-gc <25614556+w-gc@users.noreply.github.com> Date: Thu, 23 Jan 2025 11:29:59 +0800 Subject: [PATCH 3/6] refine --- src/brpc/server.cpp | 121 +++++++++++++++++++------------------------- 1 file changed, 53 insertions(+), 68 deletions(-) diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index e913cd6f7a..6b0e7e2c47 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -406,6 +406,57 @@ const std::string& Server::ServiceProperty::service_name() const { return s_unknown_name; } +inline void copy_and_fill_server_options(ServerOptions& dst, const ServerOptions& src) { +// follow Server::~Server() +#define FREE_PTR_IF_NOT_REUSED(ptr) \ + if (dst.ptr != src.ptr) { \ + delete dst.ptr; \ + dst.ptr = NULL; \ + } + + if (&dst != &src) { + FREE_PTR_IF_NOT_REUSED(nshead_service); + + #ifdef ENABLE_THRIFT_FRAMED_PROTOCOL + FREE_PTR_IF_NOT_REUSED(thrift_service); + #endif + + FREE_PTR_IF_NOT_REUSED(baidu_master_service); + FREE_PTR_IF_NOT_REUSED(http_master_service); + FREE_PTR_IF_NOT_REUSED(rpc_pb_message_factory); + + if (dst.pid_file != src.pid_file && !dst.pid_file.empty()) { + unlink(dst.pid_file.c_str()); + } + + if (dst.server_owns_auth) { + FREE_PTR_IF_NOT_REUSED(auth); + } + + if (dst.server_owns_interceptor) { + FREE_PTR_IF_NOT_REUSED(interceptor); + } + + FREE_PTR_IF_NOT_REUSED(redis_service); + + // copy data members directly + dst = src; + } +#undef FREE_PTR_IF_NOT_REUSED + + // `rpc_pb_message_factory` is created here because it is possible + // for users to visit it at any time after `Server` created, such as + // the `_dummy` server of ChannelTest.success unit test case that uses + // `rpc_pb_message_factory` of the default ServerOptions: + // ```cpp + // Server _dummy; + // auto messages = _dummy.options().rpc_pb_message_factory->Get(...); + // ``` + if (!dst.rpc_pb_message_factory) { + dst.rpc_pb_message_factory = new DefaultRpcPBMessageFactory(); + } +} + Server::Server(ProfilerLinker) : _session_local_data_pool(NULL) , _status(UNINITIALIZED) @@ -425,15 +476,7 @@ Server::Server(ProfilerLinker) , _concurrency(0) , _concurrency_bvar(cast_no_barrier_int, &_concurrency) , _has_progressive_read_method(false) { - // `rpc_pb_message_factory` is created here because it is possible - // for users to visit it at any time after `Server` created, such as - // the `_dummy` server of ChannelTest.success unit test case that uses - // `rpc_pb_message_factory` of the default ServerOptions: - // ```cpp - // Server _dummy; - // auto messages = _dummy.options().rpc_pb_message_factory->Get(...); - // ``` - _options.rpc_pb_message_factory = new DefaultRpcPBMessageFactory(); + copy_and_fill_server_options(_options, _options); BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0, Server_concurrency_must_be_aligned_by_cacheline); } @@ -791,47 +834,6 @@ static bool OptionsAvailableOverRdma(const ServerOptions* opt) { static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0); static bool g_default_ignore_eovercrowded(false); - -inline void copy_server_option(ServerOptions& dst, const ServerOptions& src) { - if (&dst == &src) { - return; - } - -#define FREE_PTR_IF_NOT_REUSED(ptr) \ - if (dst.ptr != src.ptr) { \ - delete dst.ptr; \ - dst.ptr = NULL; \ - } - - FREE_PTR_IF_NOT_REUSED(nshead_service); - -#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL - FREE_PTR_IF_NOT_REUSED(thrift_service); -#endif - - FREE_PTR_IF_NOT_REUSED(baidu_master_service); - FREE_PTR_IF_NOT_REUSED(http_master_service); - FREE_PTR_IF_NOT_REUSED(rpc_pb_message_factory); - - if (dst.pid_file != src.pid_file && !dst.pid_file.empty()) { - unlink(dst.pid_file.c_str()); - } - - if (dst.server_owns_auth) { - FREE_PTR_IF_NOT_REUSED(auth); - } - - if (dst.server_owns_interceptor) { - FREE_PTR_IF_NOT_REUSED(interceptor); - } - - FREE_PTR_IF_NOT_REUSED(redis_service); - -#undef FREE_PTR_IF_NOT_REUSED - - dst = src; -} - int Server::StartInternal(const butil::EndPoint& endpoint, const PortRange& port_range, const ServerOptions *opt) { @@ -864,24 +866,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint, return -1; } - if (opt) { - copy_server_option(_options, *opt); - } else { - // Don't forget to release `rpc_pb_message_factory` before overwriting `_options` - delete _options.rpc_pb_message_factory; - _options.rpc_pb_message_factory = NULL; - - // Always reset to default options explicitly since `_options' - // may be the options for the last run or even bad options - _options = ServerOptions(); - } - - // Create the resource if: - // 1. `_options` copied from user and user forgot to create - // 2. `_options` created by our - if (!_options.rpc_pb_message_factory) { - _options.rpc_pb_message_factory = new DefaultRpcPBMessageFactory(); - } + copy_and_fill_server_options(_options, opt ? *opt : ServerOptions()); if (!_options.h2_settings.IsValid(true/*log_error*/)) { LOG(ERROR) << "Invalid h2_settings"; From 634f0e3fe7cc54b472a3c8f151e5022496dba16f Mon Sep 17 00:00:00 2001 From: w-gc <25614556+w-gc@users.noreply.github.com> Date: Fri, 24 Jan 2025 10:46:41 +0800 Subject: [PATCH 4/6] refine --- src/brpc/server.cpp | 98 ++++++++++++++++------------------ test/brpc_channel_unittest.cpp | 5 ++ 2 files changed, 51 insertions(+), 52 deletions(-) diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 6b0e7e2c47..2da703ef83 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -406,57 +406,6 @@ const std::string& Server::ServiceProperty::service_name() const { return s_unknown_name; } -inline void copy_and_fill_server_options(ServerOptions& dst, const ServerOptions& src) { -// follow Server::~Server() -#define FREE_PTR_IF_NOT_REUSED(ptr) \ - if (dst.ptr != src.ptr) { \ - delete dst.ptr; \ - dst.ptr = NULL; \ - } - - if (&dst != &src) { - FREE_PTR_IF_NOT_REUSED(nshead_service); - - #ifdef ENABLE_THRIFT_FRAMED_PROTOCOL - FREE_PTR_IF_NOT_REUSED(thrift_service); - #endif - - FREE_PTR_IF_NOT_REUSED(baidu_master_service); - FREE_PTR_IF_NOT_REUSED(http_master_service); - FREE_PTR_IF_NOT_REUSED(rpc_pb_message_factory); - - if (dst.pid_file != src.pid_file && !dst.pid_file.empty()) { - unlink(dst.pid_file.c_str()); - } - - if (dst.server_owns_auth) { - FREE_PTR_IF_NOT_REUSED(auth); - } - - if (dst.server_owns_interceptor) { - FREE_PTR_IF_NOT_REUSED(interceptor); - } - - FREE_PTR_IF_NOT_REUSED(redis_service); - - // copy data members directly - dst = src; - } -#undef FREE_PTR_IF_NOT_REUSED - - // `rpc_pb_message_factory` is created here because it is possible - // for users to visit it at any time after `Server` created, such as - // the `_dummy` server of ChannelTest.success unit test case that uses - // `rpc_pb_message_factory` of the default ServerOptions: - // ```cpp - // Server _dummy; - // auto messages = _dummy.options().rpc_pb_message_factory->Get(...); - // ``` - if (!dst.rpc_pb_message_factory) { - dst.rpc_pb_message_factory = new DefaultRpcPBMessageFactory(); - } -} - Server::Server(ProfilerLinker) : _session_local_data_pool(NULL) , _status(UNINITIALIZED) @@ -476,7 +425,6 @@ Server::Server(ProfilerLinker) , _concurrency(0) , _concurrency_bvar(cast_no_barrier_int, &_concurrency) , _has_progressive_read_method(false) { - copy_and_fill_server_options(_options, _options); BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0, Server_concurrency_must_be_aligned_by_cacheline); } @@ -834,6 +782,52 @@ static bool OptionsAvailableOverRdma(const ServerOptions* opt) { static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0); static bool g_default_ignore_eovercrowded(false); +inline void copy_and_fill_server_options(ServerOptions& dst, const ServerOptions& src) { +// follow Server::~Server() +#define FREE_PTR_IF_NOT_REUSED(ptr) \ + if (dst.ptr != src.ptr) { \ + delete dst.ptr; \ + dst.ptr = NULL; \ + } + + if (&dst != &src) { + FREE_PTR_IF_NOT_REUSED(nshead_service); + + #ifdef ENABLE_THRIFT_FRAMED_PROTOCOL + FREE_PTR_IF_NOT_REUSED(thrift_service); + #endif + + FREE_PTR_IF_NOT_REUSED(baidu_master_service); + FREE_PTR_IF_NOT_REUSED(http_master_service); + FREE_PTR_IF_NOT_REUSED(rpc_pb_message_factory); + + if (dst.pid_file != src.pid_file && !dst.pid_file.empty()) { + unlink(dst.pid_file.c_str()); + } + + if (dst.server_owns_auth) { + FREE_PTR_IF_NOT_REUSED(auth); + } + + if (dst.server_owns_interceptor) { + FREE_PTR_IF_NOT_REUSED(interceptor); + } + + FREE_PTR_IF_NOT_REUSED(redis_service); + + // copy data members directly + dst = src; + } +#undef FREE_PTR_IF_NOT_REUSED + + // Create the resource if: + // 1. `dst` copied from user and user forgot to create + // 2. `dst` created by our + if (!dst.rpc_pb_message_factory) { + dst.rpc_pb_message_factory = new DefaultRpcPBMessageFactory(); + } +} + int Server::StartInternal(const butil::EndPoint& endpoint, const PortRange& port_range, const ServerOptions *opt) { diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 7b98896b41..44918d0d20 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -256,6 +256,11 @@ class ChannelTest : public ::testing::Test{ ASSERT_EQ(ts->_svc.descriptor()->full_name(), req_meta.service_name()); const google::protobuf::MethodDescriptor* method = ts->_svc.descriptor()->FindMethodByName(req_meta.method_name()); + + if (!ts->_dummy.options().rpc_pb_message_factory) { + ts->_dummy._options.rpc_pb_message_factory = new brpc::DefaultRpcPBMessageFactory(); + } + brpc::RpcPBMessages* messages = ts->_dummy.options().rpc_pb_message_factory->Get(ts->_svc, *method); google::protobuf::Message* req = messages->Request(); From 4983198514e5f989d99047615ed4e51f33f258a2 Mon Sep 17 00:00:00 2001 From: w-gc <25614556+w-gc@users.noreply.github.com> Date: Fri, 24 Jan 2025 12:53:30 +0800 Subject: [PATCH 5/6] fix --- test/brpc_channel_unittest.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 44918d0d20..186c8de826 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -201,6 +201,10 @@ class ChannelTest : public ::testing::Test{ ChannelTest() : _ep(butil::IP_ANY, 8787) , _close_fd_once(false) { + if (!_dummy.options().rpc_pb_message_factory) { + _dummy._options.rpc_pb_message_factory = new brpc::DefaultRpcPBMessageFactory(); + } + pthread_once(®ister_mock_protocol, register_protocol); const brpc::InputMessageHandler pairs[] = { { brpc::policy::ParseRpcMessage, @@ -256,11 +260,6 @@ class ChannelTest : public ::testing::Test{ ASSERT_EQ(ts->_svc.descriptor()->full_name(), req_meta.service_name()); const google::protobuf::MethodDescriptor* method = ts->_svc.descriptor()->FindMethodByName(req_meta.method_name()); - - if (!ts->_dummy.options().rpc_pb_message_factory) { - ts->_dummy._options.rpc_pb_message_factory = new brpc::DefaultRpcPBMessageFactory(); - } - brpc::RpcPBMessages* messages = ts->_dummy.options().rpc_pb_message_factory->Get(ts->_svc, *method); google::protobuf::Message* req = messages->Request(); From a58b1f8de1b51b9d87e89d43b6367d610cc8f620 Mon Sep 17 00:00:00 2001 From: w-gc <25614556+w-gc@users.noreply.github.com> Date: Fri, 24 Jan 2025 14:24:42 +0800 Subject: [PATCH 6/6] continue to fix --- test/brpc_http_rpc_protocol_unittest.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/brpc_http_rpc_protocol_unittest.cpp b/test/brpc_http_rpc_protocol_unittest.cpp index f4bdec955f..5b6fb9ee03 100644 --- a/test/brpc_http_rpc_protocol_unittest.cpp +++ b/test/brpc_http_rpc_protocol_unittest.cpp @@ -132,7 +132,10 @@ class HttpTest : public ::testing::Test{ // Hack: Regard `_server' as running _server._status = brpc::Server::RUNNING; _server._options.auth = &_auth; - + if (!_server._options.rpc_pb_message_factory) { + _server._options.rpc_pb_message_factory = new brpc::DefaultRpcPBMessageFactory(); + } + EXPECT_EQ(0, pipe(_pipe_fds)); brpc::SocketId id;