From c3a74e552edb356c80e4fb3de88dd2717467026a Mon Sep 17 00:00:00 2001 From: heyuyi0906 Date: Fri, 17 Jul 2020 17:45:22 +0800 Subject: [PATCH 1/3] support protobuf arena --- src/brpc/controller.h | 15 ++++ src/butil/memory_arena.h | 144 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 159 insertions(+) create mode 100755 src/butil/memory_arena.h diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 69b3f48e28..2152bffa06 100755 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -27,6 +27,7 @@ #include "bthread/errno.h" // Redefine errno #include "butil/endpoint.h" // butil::EndPoint #include "butil/iobuf.h" // butil::IOBuf +#include "butil/memory_arena.h" #include "bthread/types.h" // bthread_id_t #include "brpc/options.pb.h" // CompressType #include "brpc/errno.pb.h" // error code @@ -510,6 +511,18 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // -1 means no deadline. int64_t deadline_us() const { return _deadline_us; } + std::shared_ptr get_memory_arena() { + if (!_arena) { + _arena.reset(new butil::MemoryArenaDefault); + } + return _arena; + } + void share_memory_arena(Controller *cntl) { + if (cntl->_arena) { + _arena = cntl->_arena; + } + } + private: struct CompletionInfo { CallId id; // call_id of the corresponding request @@ -757,6 +770,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Thrift method name, only used when thrift protocol enabled std::string _thrift_method_name; + + std::shared_ptr _arena; }; // Advises the RPC system that the caller desires that the RPC call be diff --git a/src/butil/memory_arena.h b/src/butil/memory_arena.h new file mode 100755 index 0000000000..1748c49925 --- /dev/null +++ b/src/butil/memory_arena.h @@ -0,0 +1,144 @@ + +#ifndef BUTIL_MEMORY_ARENA_H +#define BUTIL_MEMORY_ARENA_H + +#include "google/protobuf/message.h" + +namespace butil { + +template +class ArenaObjPtr { +public: + explicit ArenaObjPtr(T *p, bool own) noexcept + : _ptr(p), _own_ptr(own) { + } + + ~ArenaObjPtr() { + if (_ptr && _own_ptr) { + delete _ptr; + _ptr = nullptr; + } + } + +private: + T *_ptr; + bool _own_ptr; +}; + +namespace internal { + +// Template meta to check protobuf version. +// protobuf 3.x: ArenaCheck::test(0) => int8_t +// protobuf 2.x: ArenaCheck::test(0) => int64_t +struct ArenaCheck { + template static int8_t test(decltype(&T::GetArena)); + template static int64_t test(...); +}; + +// Don't specialize by yourselves, MemoryArenaDefault and MemoryArenaSimple +// will be enough. +// A simple implementation. +template +class MemoryArena { +public: + ArenaObjPtr CreateMessage( + const google::protobuf::Message &proto_type) { + google::protobuf::Message *msg = proto_type.New(); + return ArenaObjPtr(msg, true); + } + + template + ArenaObjPtr CreateMessage() { + T *msg = new T; + return ArenaObjPtr(msg, true); + } + + template + ArenaObjPtr Create(Args&&... args) { + T *obj = new T(std::forward(args)...); + return ArenaObjPtr(obj, true); + } +}; + +// Protobuf arena is encapsulated. +template +class MemoryArena(0)) == sizeof(int8_t) + >::type +> { +public: + typedef decltype(((Def*)nullptr)->GetArena()) ArenaTypePtr; + typedef typename std::remove_pointer::type ArenaType; + + MemoryArena(bool use_arena = true) + : _arena(nullptr) { + if (use_arena) { + _arena = new ArenaType; + } + } + + ~MemoryArena() { + if (_arena) { + delete _arena; + _arena = nullptr; + } + } + + ArenaObjPtr CreateMessage(const Def &proto_type) { + Def *msg = proto_type.New(_arena); + if (_arena && !(msg->GetArena())) { + // When cc_enable_arenas option equals to false + _arena->Own(msg); + } + return ArenaObjPtr(msg, !_arena); + } + + template + ArenaObjPtr CreateMessage() { + T *msg = ArenaType::template CreateMessage(_arena); + if (_arena && !(msg->GetArena())) { + // When cc_enable_arenas option equals to false + _arena->Own(msg); + } + return ArenaObjPtr(msg, !_arena); + } + + template + ArenaObjPtr Create(Args&&... args) { + T *obj = ArenaType::template Create(_arena, + std::forward(args)...); + return ArenaObjPtr(obj, !_arena); + } + +private: + ArenaTypePtr _arena; +}; + +} // namespace internal + +// MemoryArenaDefault could be used as a memory management, whose +// implementation is not cared, e.g. in the ProcessRpcRequest. Using +// protobuf arena has higher priority. +// Specialization table: +// + - - - - - - - + - - - - - - - - - - - - - - - - + +// | protobuf 2.x | The simple implementation | +// + - - - - - - - + - - - - - - - - - - - - - - - - + +// | protobuf 3.x | Encapsulating protobuf arena | +// + - - - - - - - + - - - - - - - - - - - - - - - - + +typedef internal::MemoryArena MemoryArenaDefault; + +// MemoryArenaSimple is our simple implementation, no matter protobuf 2.x +// or protobuf 3.x. +// Specialization table: +// + - - - - - - - + - - - - - - - - - - - - - - - - + +// | protobuf 2.x | The simple implementation | +// + - - - - - - - + - - - - - - - - - - - - - - - - + +// | protobuf 3.x | The simple implementation | +// + - - - - - - - + - - - - - - - - - - - - - - - - + +typedef internal::MemoryArena MemoryArenaSimple; + +} // namespace butil + +#endif // BUTIL_MEMORY_ARENA_H + From 50e0af0887b817bb8e7ac1203d65c14d16a532f2 Mon Sep 17 00:00:00 2001 From: heyuyi0906 Date: Sat, 18 Jul 2020 14:00:49 +0800 Subject: [PATCH 2/3] baidu rpc protocol support protobuf arena --- src/brpc/controller.h | 5 +- src/brpc/policy/baidu_rpc_protocol.cpp | 22 +++++--- src/butil/memory_arena.h | 69 +++++++++++++++++--------- 3 files changed, 63 insertions(+), 33 deletions(-) diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 2152bffa06..edd87bb6f9 100755 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -518,9 +518,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); return _arena; } void share_memory_arena(Controller *cntl) { - if (cntl->_arena) { - _arena = cntl->_arena; - } + CHECK(cntl->_arena); + _arena = cntl->_arena; } private: diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 88197f679a..da2d085858 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -148,8 +148,14 @@ void SendRpcResponse(int64_t correlation_id, Socket* sock = accessor.get_sending_socket(); std::unique_ptr recycle_cntl(cntl); ConcurrencyRemover concurrency_remover(method_status, cntl, received_us); - std::unique_ptr recycle_req(req); - std::unique_ptr recycle_res(res); + butil::ArenaObjDeleter + del(!cntl->get_memory_arena()->OwnObject()); + std::unique_ptr + > recycle_req(req, del); + std::unique_ptr + > recycle_res(res, del); StreamId response_stream_id = accessor.response_stream(); @@ -333,8 +339,10 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { LOG(WARNING) << "Fail to new Controller"; return; } - std::unique_ptr req; - std::unique_ptr res; + std::unique_ptr> req; + std::unique_ptr> res; ServerPrivateAccessor server_accessor(server); ControllerPrivateAccessor accessor(cntl.get()); @@ -466,7 +474,8 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { } CompressType req_cmp_type = (CompressType)meta.compress_type(); - req.reset(svc->GetRequestPrototype(method).New()); + req = cntl->get_memory_arena()->CreateMessage( + svc->GetRequestPrototype(method)); if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) { cntl->SetFailed(EREQUEST, "Fail to parse request message, " "CompressType=%s, request_size=%d", @@ -474,7 +483,8 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { break; } - res.reset(svc->GetResponsePrototype(method).New()); + res = cntl->get_memory_arena()->CreateMessage( + svc->GetResponsePrototype(method)); // `socket' will be held until response has been sent google::protobuf::Closure* done = ::brpc::NewCallback< int64_t, Controller*, const google::protobuf::Message*, diff --git a/src/butil/memory_arena.h b/src/butil/memory_arena.h index 1748c49925..e40cee06f0 100755 --- a/src/butil/memory_arena.h +++ b/src/butil/memory_arena.h @@ -2,27 +2,28 @@ #ifndef BUTIL_MEMORY_ARENA_H #define BUTIL_MEMORY_ARENA_H +#include #include "google/protobuf/message.h" +#include "butil/logging.h" namespace butil { template -class ArenaObjPtr { -public: - explicit ArenaObjPtr(T *p, bool own) noexcept - : _ptr(p), _own_ptr(own) { +struct ArenaObjDeleter { + constexpr ArenaObjDeleter() noexcept = default; + + ArenaObjDeleter(bool own) noexcept + : _own_obj(own) { } - ~ArenaObjPtr() { - if (_ptr && _own_ptr) { - delete _ptr; - _ptr = nullptr; + void operator()(T *ptr) const { + if (_own_obj) { + delete ptr; } } - + private: - T *_ptr; - bool _own_ptr; + bool _own_obj; }; namespace internal { @@ -41,23 +42,34 @@ struct ArenaCheck { template class MemoryArena { public: - ArenaObjPtr CreateMessage( - const google::protobuf::Message &proto_type) { + std::unique_ptr< + google::protobuf::Message, + ArenaObjDeleter + > + CreateMessage(const google::protobuf::Message &proto_type) { google::protobuf::Message *msg = proto_type.New(); - return ArenaObjPtr(msg, true); + VLOG(199) << __FUNCTION__; + return std::unique_ptr< + google::protobuf::Message, + ArenaObjDeleter + >(msg, ArenaObjDeleter(true)); } template - ArenaObjPtr CreateMessage() { + std::unique_ptr> CreateMessage() { T *msg = new T; - return ArenaObjPtr(msg, true); + return std::unique_ptr> + (msg, ArenaObjDeleter(true)); } template - ArenaObjPtr Create(Args&&... args) { + std::unique_ptr> Create(Args&&... args) { T *obj = new T(std::forward(args)...); - return ArenaObjPtr(obj, true); + return std::unique_ptr> + (obj, ArenaObjDeleter(true)); } + + bool OwnObject() { return false; } }; // Protobuf arena is encapsulated. @@ -85,32 +97,41 @@ class MemoryArena CreateMessage(const Def &proto_type) { + std::unique_ptr> + CreateMessage(const Def &proto_type) { Def *msg = proto_type.New(_arena); if (_arena && !(msg->GetArena())) { // When cc_enable_arenas option equals to false + VLOG(199) << __FUNCTION__; _arena->Own(msg); + } else { + VLOG(199) << __FUNCTION__; } - return ArenaObjPtr(msg, !_arena); + return std::unique_ptr> + (msg, ArenaObjDeleter(!_arena)); } template - ArenaObjPtr CreateMessage() { + std::unique_ptr> CreateMessage() { T *msg = ArenaType::template CreateMessage(_arena); if (_arena && !(msg->GetArena())) { // When cc_enable_arenas option equals to false _arena->Own(msg); } - return ArenaObjPtr(msg, !_arena); + return std::unique_ptr> + (msg, ArenaObjDeleter(!_arena)); } template - ArenaObjPtr Create(Args&&... args) { + std::unique_ptr> Create(Args&&... args) { T *obj = ArenaType::template Create(_arena, std::forward(args)...); - return ArenaObjPtr(obj, !_arena); + return std::unique_ptr> + (obj, ArenaObjDeleter(!_arena)); } + bool OwnObject() { return true; } + private: ArenaTypePtr _arena; }; From 48771df090dad691ad9632254c7337f833176cb5 Mon Sep 17 00:00:00 2001 From: heyuyi0906 Date: Sat, 18 Jul 2020 20:56:31 +0800 Subject: [PATCH 3/3] fix --- src/butil/memory_arena.h | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/src/butil/memory_arena.h b/src/butil/memory_arena.h index e40cee06f0..351358c16b 100755 --- a/src/butil/memory_arena.h +++ b/src/butil/memory_arena.h @@ -18,6 +18,7 @@ struct ArenaObjDeleter { void operator()(T *ptr) const { if (_own_obj) { + VLOG(199) << "delete!"; delete ptr; } } @@ -99,25 +100,18 @@ class MemoryArena> CreateMessage(const Def &proto_type) { + // If cc_enable_arenas option equals to false, msg is owned by arena + // inside New, but msg->GetArena will return NULL. Def *msg = proto_type.New(_arena); - if (_arena && !(msg->GetArena())) { - // When cc_enable_arenas option equals to false - VLOG(199) << __FUNCTION__; - _arena->Own(msg); - } else { - VLOG(199) << __FUNCTION__; - } + VLOG(199) << __FUNCTION__; return std::unique_ptr> (msg, ArenaObjDeleter(!_arena)); } template std::unique_ptr> CreateMessage() { + // Compile error if cc_enable_arenas option equals to false T *msg = ArenaType::template CreateMessage(_arena); - if (_arena && !(msg->GetArena())) { - // When cc_enable_arenas option equals to false - _arena->Own(msg); - } return std::unique_ptr> (msg, ArenaObjDeleter(!_arena)); } @@ -130,7 +124,7 @@ class MemoryArena(!_arena)); } - bool OwnObject() { return true; } + bool OwnObject() { return _arena; } private: ArenaTypePtr _arena;