Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions src/brpc/circuit_breaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <gflags/gflags.h>

#include "brpc/errno.pb.h"
#include "brpc/reloadable_flags.h"
#include "butil/time.h"

namespace brpc {
Expand All @@ -45,6 +46,12 @@ DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 30000,
"Maximum isolation duration in milliseconds");
DEFINE_double(circuit_breaker_epsilon_value, 0.02,
"ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)");
DEFINE_int32(circuit_breaker_half_open_window_size, 0,
"The limited number of requests allowed to pass through by the half-open "
"window. Only if all of them are successful, the circuit breaker will "
"go to the closed state. Otherwise, it goes back to the open state. "
"Values == 0 disables this feature");
BRPC_VALIDATE_GFLAG(circuit_breaker_half_open_window_size, NonNegativeInteger);

namespace {
// EPSILON is used to generate the smoothing coefficient when calculating EMA.
Expand Down Expand Up @@ -132,7 +139,7 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
if (ema_latency != 0) {
error_cost = std::min(ema_latency * max_mutiple, error_cost);
}
//Errorous response
// Errorous response
if (error_cost != 0) {
int64_t ema_error_cost =
_ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed);
Expand All @@ -142,7 +149,7 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
return ema_error_cost <= max_error_cost;
}

//Ordinary response
// Ordinary response
int64_t ema_error_cost = _ema_error_cost.load(butil::memory_order_relaxed);
do {
if (ema_error_cost == 0) {
Expand Down Expand Up @@ -171,7 +178,9 @@ CircuitBreaker::CircuitBreaker()
, _last_reset_time_ms(0)
, _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms)
, _isolated_times(0)
, _broken(false) {
, _broken(false)
, _half_open(false)
, _half_open_success_count(0) {
}

bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
Expand All @@ -188,6 +197,19 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
if (_broken.load(butil::memory_order_relaxed)) {
return false;
}
if (FLAGS_circuit_breaker_half_open_window_size > 0
&& _half_open.load(butil::memory_order_relaxed)) {
if (error_code != 0) {
MarkAsBroken();
return false;
}
if (_half_open_success_count.fetch_add(1, butil::memory_order_relaxed)
+ 1 == FLAGS_circuit_breaker_half_open_window_size) {
_half_open.store(false, butil::memory_order_relaxed);
_half_open_success_count.store(0, butil::memory_order_relaxed);
}
}

if (_long_window.OnCallEnd(error_code, latency) &&
_short_window.OnCallEnd(error_code, latency)) {
return true;
Expand All @@ -201,6 +223,10 @@ void CircuitBreaker::Reset() {
_short_window.Reset();
_last_reset_time_ms = butil::cpuwide_time_ms();
_broken.store(false, butil::memory_order_release);
if (FLAGS_circuit_breaker_half_open_window_size > 0) {
_half_open.store(true, butil::memory_order_relaxed);
_half_open_success_count.store(0, butil::memory_order_relaxed);
}
}

void CircuitBreaker::MarkAsBroken() {
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/circuit_breaker.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class CircuitBreaker {
butil::atomic<int> _isolation_duration_ms;
butil::atomic<int> _isolated_times;
butil::atomic<bool> _broken;
butil::atomic<bool> _half_open;
butil::atomic<int32_t> _half_open_success_count;
};

} // namespace brpc
Expand Down
57 changes: 57 additions & 0 deletions test/brpc_circuit_breaker_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const int kErrorCodeForSucc = 0;
const int kErrorCost = 1000;
const int kLatency = 1000;
const int kThreadNum = 3;
const int kHalfWindowSize = 0;
} // namespace

namespace brpc {
Expand All @@ -54,6 +55,7 @@ DECLARE_int32(circuit_breaker_short_window_error_percent);
DECLARE_int32(circuit_breaker_long_window_error_percent);
DECLARE_int32(circuit_breaker_min_isolation_duration_ms);
DECLARE_int32(circuit_breaker_max_isolation_duration_ms);
DECLARE_int32(circuit_breaker_half_open_window_size);
} // namespace brpc

int main(int argc, char* argv[]) {
Expand All @@ -63,6 +65,7 @@ int main(int argc, char* argv[]) {
brpc::FLAGS_circuit_breaker_long_window_error_percent = kLongWindowErrorPercent;
brpc::FLAGS_circuit_breaker_min_isolation_duration_ms = kMinIsolationDurationMs;
brpc::FLAGS_circuit_breaker_max_isolation_duration_ms = kMaxIsolationDurationMs;
brpc::FLAGS_circuit_breaker_half_open_window_size = kHalfWindowSize;
testing::InitGoogleTest(&argc, argv);
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
return RUN_ALL_TESTS();
Expand Down Expand Up @@ -160,6 +163,60 @@ TEST_F(CircuitBreakerTest, should_isolate) {
}
}

TEST_F(CircuitBreakerTest, should_isolate_with_half_open) {
std::vector<pthread_t> thread_list;
std::vector<std::unique_ptr<FeedbackControl>> fc_list;
StartFeedbackThread(&thread_list, &fc_list, 100);
int total_failed = 0;
for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = nullptr;
ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_GT(fc->_unhealthy_cnt, 0);
EXPECT_FALSE(fc->_healthy);
total_failed += fc->_unhealthy_cnt;
}
_circuit_breaker.Reset();

int total_failed1 = 0;
StartFeedbackThread(&thread_list, &fc_list, 100);
for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = nullptr;
ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_FALSE(fc->_healthy);
EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
EXPECT_GT(fc->_unhealthy_cnt, 0);
total_failed1 += fc->_unhealthy_cnt;
}

// Enable the half-open state.
// The first request cause _broken = true immediately.
brpc::FLAGS_circuit_breaker_half_open_window_size = 10;
_circuit_breaker.Reset();
int total_failed2 = 0;
StartFeedbackThread(&thread_list, &fc_list, 100);
for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = nullptr;
ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_FALSE(fc->_healthy);
EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
EXPECT_GT(fc->_unhealthy_cnt, 0);
total_failed2 += fc->_unhealthy_cnt;
}
brpc::FLAGS_circuit_breaker_half_open_window_size = 0;

EXPECT_EQ(kLongWindowSize * 2 * kThreadNum -
kShortWindowSize *
brpc::FLAGS_circuit_breaker_short_window_error_percent /
100,
total_failed);

EXPECT_EQ(total_failed1, total_failed);
EXPECT_EQ(kLongWindowSize * 2 * kThreadNum, total_failed2);
}

TEST_F(CircuitBreakerTest, isolation_duration_grow_and_reset) {
std::vector<pthread_t> thread_list;
std::vector<std::unique_ptr<FeedbackControl>> fc_list;
Expand Down