From 62a46e2155605e92b5fbf840f4bd23ff6a84ac60 Mon Sep 17 00:00:00 2001 From: jiangyuting Date: Tue, 7 May 2024 10:40:42 +0800 Subject: [PATCH 1/6] circuit breaker with half open state --- src/brpc/circuit_breaker.cpp | 26 +++++++++++++++++++++++++- src/brpc/circuit_breaker.h | 3 +++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/brpc/circuit_breaker.cpp b/src/brpc/circuit_breaker.cpp index 889fe65f41..8a557bc2c7 100644 --- a/src/brpc/circuit_breaker.cpp +++ b/src/brpc/circuit_breaker.cpp @@ -21,6 +21,7 @@ #include #include "brpc/errno.pb.h" +#include "brpc/reloadable_flags.h" #include "butil/time.h" namespace brpc { @@ -45,6 +46,10 @@ DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 30000, "Maximum isolation duration in milliseconds"); DEFINE_double(circuit_breaker_epsilon_value, 0.02, "ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)"); +DEFINE_int32(circuit_breaker_half_open_window_size, 10, + "Half open window sample size."); + +BRPC_VALIDATE_GFLAG(circuit_breaker_half_open_window_size, PositiveInteger); namespace { // EPSILON is used to generate the smoothing coefficient when calculating EMA. @@ -171,7 +176,10 @@ CircuitBreaker::CircuitBreaker() , _last_reset_time_ms(0) , _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms) , _isolated_times(0) - , _broken(false) { + , _broken(false) + , _half_open(false) + , _half_open_success_count(0) + , _half_open_window_size(FLAGS_circuit_breaker_half_open_window_size) { } bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) { @@ -188,6 +196,19 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) { if (_broken.load(butil::memory_order_relaxed)) { return false; } + if (_half_open.load(butil::memory_order_relaxed)) { + if (error_code != 0) { + MarkAsBroken(); + return false; + } + if (_half_open_success_count.fetch_add(1, butil::memory_order_relaxed) + + 1 == _half_open_window_size) { + _half_open.store(false, butil::memory_order_relaxed); + _half_open_success_count.store(0, butil::memory_order_relaxed); + return true; + } + } + if (_long_window.OnCallEnd(error_code, latency) && _short_window.OnCallEnd(error_code, latency)) { return true; @@ -201,6 +222,9 @@ void CircuitBreaker::Reset() { _short_window.Reset(); _last_reset_time_ms = butil::cpuwide_time_ms(); _broken.store(false, butil::memory_order_release); + _half_open.store(true, butil::memory_order_relaxed); + _half_open_success_count.store(0, butil::memory_order_relaxed); + } void CircuitBreaker::MarkAsBroken() { diff --git a/src/brpc/circuit_breaker.h b/src/brpc/circuit_breaker.h index 826e69148f..2745ddfd1c 100644 --- a/src/brpc/circuit_breaker.h +++ b/src/brpc/circuit_breaker.h @@ -87,6 +87,9 @@ class CircuitBreaker { butil::atomic _isolation_duration_ms; butil::atomic _isolated_times; butil::atomic _broken; + butil::atomic _half_open; + butil::atomic _half_open_success_count; + const int _half_open_window_size; }; } // namespace brpc From bacd5f4439bfe9b79555fceaeaf317e6830140cb Mon Sep 17 00:00:00 2001 From: jiangyuting Date: Wed, 15 May 2024 11:21:35 +0800 Subject: [PATCH 2/6] add switch for half open state --- src/brpc/circuit_breaker.cpp | 18 ++++++++++-------- src/brpc/circuit_breaker.h | 1 - 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/brpc/circuit_breaker.cpp b/src/brpc/circuit_breaker.cpp index 8a557bc2c7..74c0ce0239 100644 --- a/src/brpc/circuit_breaker.cpp +++ b/src/brpc/circuit_breaker.cpp @@ -46,10 +46,10 @@ DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 30000, "Maximum isolation duration in milliseconds"); DEFINE_double(circuit_breaker_epsilon_value, 0.02, "ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)"); -DEFINE_int32(circuit_breaker_half_open_window_size, 10, +DEFINE_int32(circuit_breaker_half_open_window_size, 0, "Half open window sample size."); -BRPC_VALIDATE_GFLAG(circuit_breaker_half_open_window_size, PositiveInteger); +BRPC_VALIDATE_GFLAG(circuit_breaker_half_open_window_size, NonNegativeInteger); namespace { // EPSILON is used to generate the smoothing coefficient when calculating EMA. @@ -178,8 +178,7 @@ CircuitBreaker::CircuitBreaker() , _isolated_times(0) , _broken(false) , _half_open(false) - , _half_open_success_count(0) - , _half_open_window_size(FLAGS_circuit_breaker_half_open_window_size) { + , _half_open_success_count(0) { } bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) { @@ -196,13 +195,14 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) { if (_broken.load(butil::memory_order_relaxed)) { return false; } - if (_half_open.load(butil::memory_order_relaxed)) { + if (FLAGS_circuit_breaker_half_open_window_size > 0 + && _half_open.load(butil::memory_order_relaxed)) { if (error_code != 0) { MarkAsBroken(); return false; } if (_half_open_success_count.fetch_add(1, butil::memory_order_relaxed) - + 1 == _half_open_window_size) { + + 1 == FLAGS_circuit_breaker_half_open_window_size) { _half_open.store(false, butil::memory_order_relaxed); _half_open_success_count.store(0, butil::memory_order_relaxed); return true; @@ -222,8 +222,10 @@ void CircuitBreaker::Reset() { _short_window.Reset(); _last_reset_time_ms = butil::cpuwide_time_ms(); _broken.store(false, butil::memory_order_release); - _half_open.store(true, butil::memory_order_relaxed); - _half_open_success_count.store(0, butil::memory_order_relaxed); + if (FLAGS_circuit_breaker_half_open_window_size > 0) { + _half_open.store(true, butil::memory_order_relaxed); + _half_open_success_count.store(0, butil::memory_order_relaxed); + } } diff --git a/src/brpc/circuit_breaker.h b/src/brpc/circuit_breaker.h index 2745ddfd1c..b16a429920 100644 --- a/src/brpc/circuit_breaker.h +++ b/src/brpc/circuit_breaker.h @@ -89,7 +89,6 @@ class CircuitBreaker { butil::atomic _broken; butil::atomic _half_open; butil::atomic _half_open_success_count; - const int _half_open_window_size; }; } // namespace brpc From 2ca6cf5f4d6f118bfc1edca40b4f4519638742f1 Mon Sep 17 00:00:00 2001 From: jiangyuting Date: Wed, 15 May 2024 11:30:47 +0800 Subject: [PATCH 3/6] add half open ut --- test/brpc_circuit_breaker_unittest.cpp | 50 ++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/test/brpc_circuit_breaker_unittest.cpp b/test/brpc_circuit_breaker_unittest.cpp index ef09cd948e..3b41f0fc99 100644 --- a/test/brpc_circuit_breaker_unittest.cpp +++ b/test/brpc_circuit_breaker_unittest.cpp @@ -45,6 +45,7 @@ const int kErrorCodeForSucc = 0; const int kErrorCost = 1000; const int kLatency = 1000; const int kThreadNum = 3; +const int kHalfWindowSize = 0; } // namespace namespace brpc { @@ -54,6 +55,7 @@ DECLARE_int32(circuit_breaker_short_window_error_percent); DECLARE_int32(circuit_breaker_long_window_error_percent); DECLARE_int32(circuit_breaker_min_isolation_duration_ms); DECLARE_int32(circuit_breaker_max_isolation_duration_ms); +DECLARE_int32(circuit_breaker_half_open_window_size); } // namespace brpc int main(int argc, char* argv[]) { @@ -63,6 +65,7 @@ int main(int argc, char* argv[]) { brpc::FLAGS_circuit_breaker_long_window_error_percent = kLongWindowErrorPercent; brpc::FLAGS_circuit_breaker_min_isolation_duration_ms = kMinIsolationDurationMs; brpc::FLAGS_circuit_breaker_max_isolation_duration_ms = kMaxIsolationDurationMs; + brpc::FLAGS_circuit_breaker_half_open_window_size = kHalfWindowSize; testing::InitGoogleTest(&argc, argv); GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); return RUN_ALL_TESTS(); @@ -160,6 +163,53 @@ TEST_F(CircuitBreakerTest, should_isolate) { } } +TEST_F(CircuitBreakerTest, should_isolate_with_half_open) { + std::vector thread_list; + std::vector> fc_list; + StartFeedbackThread(&thread_list, &fc_list, 100); + int total_failed = 0; + for (int i = 0; i < kThreadNum; ++i) { + void* ret_data = nullptr; + ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0); + FeedbackControl* fc = static_cast(ret_data); + EXPECT_GT(fc->_unhealthy_cnt, 0); + EXPECT_FALSE(fc->_healthy); + total_failed += fc->_unhealthy_cnt; + } + _circuit_breaker.Reset(); + + int total_failed1 = 0; + StartFeedbackThread(&thread_list, &fc_list, 100); + for (int i = 0; i < kThreadNum; ++i) { + void* ret_data = nullptr; + ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0); + FeedbackControl* fc = static_cast(ret_data); + EXPECT_FALSE(fc->_healthy); + EXPECT_LE(fc->_healthy_cnt, kShortWindowSize); + EXPECT_GT(fc->_unhealthy_cnt, 0); + total_failed1 += fc->_unhealthy_cnt; + } + + brpc::FLAGS_circuit_breaker_half_open_window_size = 10; + _circuit_breaker.Reset(); + int total_failed2 = 0; + StartFeedbackThread(&thread_list, &fc_list, 100); + for (int i = 0; i < kThreadNum; ++i) { + void* ret_data = nullptr; + ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0); + FeedbackControl* fc = static_cast(ret_data); + EXPECT_FALSE(fc->_healthy); + EXPECT_LE(fc->_healthy_cnt, kShortWindowSize); + EXPECT_GT(fc->_unhealthy_cnt, 0); + total_failed2 += fc->_unhealthy_cnt; + } + brpc::FLAGS_circuit_breaker_half_open_window_size = 0; + + EXPECT_EQ(kLongWindowSize * 2 * kThreadNum - kShortWindowSize * brpc::FLAGS_circuit_breaker_short_window_error_percent / 100, total_failed); + EXPECT_EQ(total_failed1, total_failed); + EXPECT_EQ(kLongWindowSize * 2 * kThreadNum, total_failed2); +} + TEST_F(CircuitBreakerTest, isolation_duration_grow_and_reset) { std::vector thread_list; std::vector> fc_list; From 12f338dea767e490abd0b2d2527331924b83c134 Mon Sep 17 00:00:00 2001 From: jiangyuting Date: Mon, 27 May 2024 15:41:58 +0800 Subject: [PATCH 4/6] add some description --- src/brpc/circuit_breaker.cpp | 6 ++++-- test/brpc_circuit_breaker_unittest.cpp | 9 ++++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/brpc/circuit_breaker.cpp b/src/brpc/circuit_breaker.cpp index 74c0ce0239..61b221c5c6 100644 --- a/src/brpc/circuit_breaker.cpp +++ b/src/brpc/circuit_breaker.cpp @@ -47,8 +47,10 @@ DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 30000, DEFINE_double(circuit_breaker_epsilon_value, 0.02, "ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)"); DEFINE_int32(circuit_breaker_half_open_window_size, 0, - "Half open window sample size."); - + "The limited number of requests allowed to pass through by the half-open " + "window. Only if all of them are successful, the circuit breaker will " + "go to the closed state. Otherwise, it goes back to the open state. " + "Values == 0 disables this feature"); BRPC_VALIDATE_GFLAG(circuit_breaker_half_open_window_size, NonNegativeInteger); namespace { diff --git a/test/brpc_circuit_breaker_unittest.cpp b/test/brpc_circuit_breaker_unittest.cpp index 3b41f0fc99..e8f55153f3 100644 --- a/test/brpc_circuit_breaker_unittest.cpp +++ b/test/brpc_circuit_breaker_unittest.cpp @@ -190,6 +190,8 @@ TEST_F(CircuitBreakerTest, should_isolate_with_half_open) { total_failed1 += fc->_unhealthy_cnt; } + // Enable the half-open state. + // The first request cause _broken = true immediately. brpc::FLAGS_circuit_breaker_half_open_window_size = 10; _circuit_breaker.Reset(); int total_failed2 = 0; @@ -205,7 +207,12 @@ TEST_F(CircuitBreakerTest, should_isolate_with_half_open) { } brpc::FLAGS_circuit_breaker_half_open_window_size = 0; - EXPECT_EQ(kLongWindowSize * 2 * kThreadNum - kShortWindowSize * brpc::FLAGS_circuit_breaker_short_window_error_percent / 100, total_failed); + EXPECT_EQ(kLongWindowSize * 2 * kThreadNum - + kShortWindowSize * + brpc::FLAGS_circuit_breaker_short_window_error_percent / + 100, + total_failed); + EXPECT_EQ(total_failed1, total_failed); EXPECT_EQ(kLongWindowSize * 2 * kThreadNum, total_failed2); } From f6275709a0cb1db06ea84b1f41dfb8c5d38dc460 Mon Sep 17 00:00:00 2001 From: jiangyuting Date: Tue, 28 May 2024 20:58:39 +0800 Subject: [PATCH 5/6] record all lats of the half open window --- src/brpc/circuit_breaker.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/brpc/circuit_breaker.cpp b/src/brpc/circuit_breaker.cpp index 61b221c5c6..a41d523f9b 100644 --- a/src/brpc/circuit_breaker.cpp +++ b/src/brpc/circuit_breaker.cpp @@ -207,7 +207,6 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) { + 1 == FLAGS_circuit_breaker_half_open_window_size) { _half_open.store(false, butil::memory_order_relaxed); _half_open_success_count.store(0, butil::memory_order_relaxed); - return true; } } @@ -228,7 +227,6 @@ void CircuitBreaker::Reset() { _half_open.store(true, butil::memory_order_relaxed); _half_open_success_count.store(0, butil::memory_order_relaxed); } - } void CircuitBreaker::MarkAsBroken() { From 315960ac405e8414e62137536497e57eb78f41f8 Mon Sep 17 00:00:00 2001 From: jiangyuting Date: Wed, 29 May 2024 10:48:57 +0800 Subject: [PATCH 6/6] fix some typo --- src/brpc/circuit_breaker.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/brpc/circuit_breaker.cpp b/src/brpc/circuit_breaker.cpp index a41d523f9b..785ec77a5c 100644 --- a/src/brpc/circuit_breaker.cpp +++ b/src/brpc/circuit_breaker.cpp @@ -139,7 +139,7 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost, if (ema_latency != 0) { error_cost = std::min(ema_latency * max_mutiple, error_cost); } - //Errorous response + // Errorous response if (error_cost != 0) { int64_t ema_error_cost = _ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed); @@ -149,7 +149,7 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost, return ema_error_cost <= max_error_cost; } - //Ordinary response + // Ordinary response int64_t ema_error_cost = _ema_error_cost.load(butil::memory_order_relaxed); do { if (ema_error_cost == 0) {