From 54869d0eced27cc5a7e61ba03a29e0fc1187cd7a Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Thu, 25 Apr 2024 22:40:53 +0800 Subject: [PATCH] Fix max concurrency of thrift protocol and nshead protocol --- src/brpc/nshead_service.h | 13 ++++++----- src/brpc/server.cpp | 47 +++++++++++++++++++++++++++++++-------- src/brpc/server.h | 21 +++++++++++++++++ src/brpc/thrift_service.h | 6 +++-- 4 files changed, 70 insertions(+), 17 deletions(-) diff --git a/src/brpc/nshead_service.h b/src/brpc/nshead_service.h index c90c7873e8..49ff9d79ce 100644 --- a/src/brpc/nshead_service.h +++ b/src/brpc/nshead_service.h @@ -22,7 +22,7 @@ #include "brpc/controller.h" // Controller #include "brpc/nshead_message.h" // NsheadMessage #include "brpc/describable.h" - +#include "brpc/adaptive_max_concurrency.h" namespace brpc { @@ -40,7 +40,7 @@ class NsheadClosure : public google::protobuf::Closure { explicit NsheadClosure(void* additional_space); // [Required] Call this to send response back to the client. - void Run(); + void Run() override; // [Optional] Set the full method name. If unset, use name of the service. void SetMethodName(const std::string& full_method_name); @@ -59,7 +59,7 @@ class NsheadClosure : public google::protobuf::Closure { friend void policy::ProcessNsheadRequest(InputMessageBase* msg_base); friend class DeleteNsheadClosure; // Only callable by Run(). - ~NsheadClosure(); + ~NsheadClosure() override; const Server* _server; int64_t _received_us; @@ -84,8 +84,8 @@ struct NsheadServiceOptions { class NsheadService : public Describable { public: NsheadService(); - NsheadService(const NsheadServiceOptions&); - virtual ~NsheadService(); + explicit NsheadService(const NsheadServiceOptions&); + ~NsheadService() override; // Implement this method to handle nshead requests. Notice that this // method can be called with a failed Controller(something wrong with the @@ -104,7 +104,7 @@ class NsheadService : public Describable { NsheadClosure* done) = 0; // Put descriptions into the stream. - void Describe(std::ostream &os, const DescribeOptions&) const; + void Describe(std::ostream &os, const DescribeOptions&) const override; private: DISALLOW_COPY_AND_ASSIGN(NsheadService); @@ -118,6 +118,7 @@ friend class Server; // Tracking status of non NsheadPbService MethodStatus* _status; + AdaptiveMaxConcurrency _max_concurrency; size_t _additional_space; std::string _cached_name; }; diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 9d938c6bc9..c65b5a6fb2 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -738,8 +738,8 @@ static int get_port_from_fd(int fd) { return ntohs(addr.sin_port); } -static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc, - ConcurrencyLimiter** out) { +bool Server::CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc, + ConcurrencyLimiter** out) { if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) { *out = NULL; return true; @@ -1040,6 +1040,15 @@ int Server::StartInternal(const butil::EndPoint& endpoint, it->second.status->SetConcurrencyLimiter(cl); } } + if (0 != SetServiceMaxConcurrency(_options.nshead_service)) { + return -1; + } +#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL + if (0 != SetServiceMaxConcurrency(_options.thrift_service)) { + return -1; + } +#endif + // Create listening ports if (port_range.min_port > port_range.max_port) { @@ -2206,13 +2215,33 @@ int Server::MaxConcurrencyOf(const MethodProperty* mp) const { } AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) { - MethodProperty* mp = _method_map.seek(full_method_name); - if (mp == NULL) { - LOG(ERROR) << "Fail to find method=" << full_method_name; - _failed_to_set_max_concurrency_of_method = true; - return g_default_max_concurrency_of_method; - } - return MaxConcurrencyOf(mp); + do { + if (full_method_name == butil::class_name_str()) { + if (NULL == options().nshead_service) { + break; + } + return options().nshead_service->_max_concurrency; + } +#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL + if (full_method_name == butil::class_name_str()) { + if (NULL == options().thrift_service) { + break; + } + return options().thrift_service->_max_concurrency; + } +#endif + + MethodProperty* mp = _method_map.seek(full_method_name); + if (mp == NULL) { + break; + } + return MaxConcurrencyOf(mp); + + } while (false); + + LOG(ERROR) << "Fail to find method=" << full_method_name; + _failed_to_set_max_concurrency_of_method = true; + return g_default_max_concurrency_of_method; } int Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) const { diff --git a/src/brpc/server.h b/src/brpc/server.h index 4843d0d0f6..5bc518ef64 100644 --- a/src/brpc/server.h +++ b/src/brpc/server.h @@ -42,6 +42,7 @@ #include "brpc/http2.h" #include "brpc/redis.h" #include "brpc/interceptor.h" +#include "brpc/concurrency_limiter.h" namespace brpc { @@ -674,6 +675,26 @@ friend class Controller; AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*); int MaxConcurrencyOf(const MethodProperty*) const; + static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc, + ConcurrencyLimiter** out); + + template + int SetServiceMaxConcurrency(T* service) { + if (NULL != service) { + const AdaptiveMaxConcurrency* amc = &service->_max_concurrency; + if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) { + amc = &_options.method_max_concurrency; + } + ConcurrencyLimiter* cl = NULL; + if (!CreateConcurrencyLimiter(*amc, &cl)) { + LOG(ERROR) << "Fail to create ConcurrencyLimiter for method"; + return -1; + } + service->_status->SetConcurrencyLimiter(cl); + } + return 0; + } + DISALLOW_COPY_AND_ASSIGN(Server); // Put frequently-accessed data pool at first. diff --git a/src/brpc/thrift_service.h b/src/brpc/thrift_service.h index c3d341e0ac..bd4ca44a1f 100644 --- a/src/brpc/thrift_service.h +++ b/src/brpc/thrift_service.h @@ -22,6 +22,7 @@ #include "brpc/controller.h" // Controller #include "brpc/thrift_message.h" // ThriftFramedMessage #include "brpc/describable.h" +#include "brpc/adaptive_max_concurrency.h" namespace brpc { @@ -38,7 +39,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base); class ThriftService : public Describable { public: ThriftService(); - virtual ~ThriftService(); + ~ThriftService() override; // Implement this method to handle thrift_binary requests. // Parameters: @@ -53,7 +54,7 @@ class ThriftService : public Describable { ::google::protobuf::Closure* done) = 0; // Put descriptions into the stream. - void Describe(std::ostream &os, const DescribeOptions&) const; + void Describe(std::ostream &os, const DescribeOptions&) const override; private: DISALLOW_COPY_AND_ASSIGN(ThriftService); @@ -66,6 +67,7 @@ friend class Server; void Expose(const butil::StringPiece& prefix); MethodStatus* _status; + AdaptiveMaxConcurrency _max_concurrency; }; } // namespace brpc