Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/brpc/adaptive_max_concurrency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "butil/logging.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/adaptive_max_concurrency.h"
#include "brpc/concurrency_limiter.h"

namespace brpc {

Expand Down Expand Up @@ -72,6 +73,9 @@ void AdaptiveMaxConcurrency::operator=(const butil::StringPiece& value) {
value.CopyToString(&_value);
_max_concurrency = -1;
}
if (_cl) {
_cl->ResetMaxConcurrency(*this);
}
}

void AdaptiveMaxConcurrency::operator=(int max_concurrency) {
Expand All @@ -82,12 +86,18 @@ void AdaptiveMaxConcurrency::operator=(int max_concurrency) {
_value = butil::string_printf("%d", max_concurrency);
_max_concurrency = max_concurrency;
}
if (_cl) {
Comment thread
yanglimingcn marked this conversation as resolved.
_cl->ResetMaxConcurrency(*this);
}
}

void AdaptiveMaxConcurrency::operator=(const TimeoutConcurrencyConf& value) {
_value = "timeout";
_max_concurrency = -1;
_timeout_conf = value;
if (_cl) {
_cl->ResetMaxConcurrency(*this);
}
}

const std::string& AdaptiveMaxConcurrency::type() const {
Expand Down
4 changes: 4 additions & 0 deletions src/brpc/adaptive_max_concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct TimeoutConcurrencyConf {
int max_concurrency;
};

class ConcurrencyLimiter;
class AdaptiveMaxConcurrency{
public:
explicit AdaptiveMaxConcurrency();
Expand Down Expand Up @@ -68,11 +69,14 @@ class AdaptiveMaxConcurrency{
static const std::string& UNLIMITED();
static const std::string& CONSTANT();

void SetConcurrencyLimiter(ConcurrencyLimiter* cl) { _cl = cl; }

private:
std::string _value;
int _max_concurrency;
TimeoutConcurrencyConf
_timeout_conf; // TODO std::varient for different type
ConcurrencyLimiter* _cl{nullptr};
};

inline std::ostream& operator<<(std::ostream& os, const AdaptiveMaxConcurrency& amc) {
Expand Down
3 changes: 3 additions & 0 deletions src/brpc/concurrency_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class ConcurrencyLimiter {
// The return value is only for logging.
virtual int MaxConcurrency() = 0;

// Reset max_concurrency
virtual int ResetMaxConcurrency(const AdaptiveMaxConcurrency& amc) = 0;

// Create an instance from the amc
// Caller is responsible for delete the instance after usage.
virtual ConcurrencyLimiter* New(const AdaptiveMaxConcurrency& amc) const = 0;
Expand Down
4 changes: 4 additions & 0 deletions src/brpc/policy/auto_concurrency_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ int AutoConcurrencyLimiter::MaxConcurrency() {
return _max_concurrency;
}

int AutoConcurrencyLimiter::ResetMaxConcurrency(const AdaptiveMaxConcurrency&) {
return -1;
}

int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
int64_t reset_start_us = sampling_time_us +
(FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2 +
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/policy/auto_concurrency_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class AutoConcurrencyLimiter : public ConcurrencyLimiter {

int MaxConcurrency() override;

int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override;

AutoConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override;

private:
Expand Down
6 changes: 6 additions & 0 deletions src/brpc/policy/constant_concurrency_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ int ConstantConcurrencyLimiter::MaxConcurrency() {
return _max_concurrency.load(butil::memory_order_relaxed);
}

int ConstantConcurrencyLimiter::ResetMaxConcurrency(
const AdaptiveMaxConcurrency& amc) {
_max_concurrency.store(static_cast<int>(amc), butil::memory_order_relaxed);
return 0;
}

ConstantConcurrencyLimiter*
ConstantConcurrencyLimiter::New(const AdaptiveMaxConcurrency& amc) const {
CHECK_EQ(amc.type(), AdaptiveMaxConcurrency::CONSTANT());
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/policy/constant_concurrency_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class ConstantConcurrencyLimiter : public ConcurrencyLimiter {

int MaxConcurrency() override;

int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override;

ConstantConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override;

private:
Expand Down
5 changes: 5 additions & 0 deletions src/brpc/policy/timeout_concurrency_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ int TimeoutConcurrencyLimiter::MaxConcurrency() {
return FLAGS_timeout_cl_max_concurrency;
}

int TimeoutConcurrencyLimiter::ResetMaxConcurrency(
const AdaptiveMaxConcurrency &) {
return -1;
}

bool TimeoutConcurrencyLimiter::AddSample(int error_code, int64_t latency_us,
int64_t sampling_time_us) {
std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/policy/timeout_concurrency_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {

int MaxConcurrency() override;

int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override;

TimeoutConcurrencyLimiter* New(
const AdaptiveMaxConcurrency&) const override;

Expand Down
9 changes: 1 addition & 8 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
return -1;
}
it->second.status->SetConcurrencyLimiter(cl);
it->second.max_concurrency.SetConcurrencyLimiter(cl);
}
}
if (0 != SetServiceMaxConcurrency(_options.nshead_service)) {
Expand Down Expand Up @@ -2221,10 +2222,6 @@ int Server::ResetMaxConcurrency(int max_concurrency) {
}

AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowed before Server started";
return g_default_max_concurrency_of_method;
}
if (mp->status == NULL) {
LOG(ERROR) << "method=" << mp->method->full_name()
<< " does not support max_concurrency";
Expand All @@ -2235,10 +2232,6 @@ AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
}

int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowed before Server started";
return g_default_max_concurrency_of_method;
}
if (mp == NULL || mp->status == NULL) {
return 0;
}
Expand Down