diff --git a/.bazelrc b/.bazelrc index 3ffec2dcf6..2ee10ddac7 100644 --- a/.bazelrc +++ b/.bazelrc @@ -20,6 +20,7 @@ common --registry=https://bcr.bazel.build common --registry=https://baidu.github.io/babylon/registry build --cxxopt="-std=c++17" +build --copt="-fno-omit-frame-pointer" # Use gnu17 for asm keyword. build --conlyopt="-std=gnu17" @@ -33,8 +34,8 @@ build --features=per_object_debug_info build --define absl=1 # For brpc. -build --define=BRPC_WITH_GLOG=true test --define=BRPC_BUILD_FOR_UNITTEST=true +test --test_output=streamed # Pass PATH, CC, CXX and LLVM_CONFIG variables from the environment. build --action_env=CC diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 6cd036b6a9..aa227cd6c9 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -1,6 +1,6 @@ ### What problem does this PR solve? -Issue Number: +Issue Number: resolve Problem Summary: diff --git a/.github/workflows/ci-linux.yml b/.github/workflows/ci-linux.yml index 5d749a3231..f2d6d69287 100644 --- a/.github/workflows/ci-linux.yml +++ b/.github/workflows/ci-linux.yml @@ -225,3 +225,12 @@ jobs: run: | cd test sh ./run_tests.sh + + bazel-bvar-unittest: + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v2 + - run: bazel test --verbose_failures //test:bvar_test + - run: bazel test --verbose_failures --define with_babylon_counter=true //test:bvar_test + - run: bazel test --verbose_failures --action_env=CC=clang //test:bvar_test + - run: bazel test --verbose_failures --action_env=CC=clang --define with_babylon_counter=true //test:bvar_test diff --git a/.github/workflows/ci-macos.yml b/.github/workflows/ci-macos.yml index 92b70b5bf1..61d45ac821 100644 --- a/.github/workflows/ci-macos.yml +++ b/.github/workflows/ci-macos.yml @@ -61,7 +61,6 @@ jobs: compile-with-bazel: runs-on: macos-latest # https://github.com/actions/runner-images - steps: - uses: actions/checkout@v2 - run: bazel build --verbose_failures -- //:brpc -//example/... diff --git a/BUILD.bazel b/BUILD.bazel index a3e7b29f00..138e416b10 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -21,6 +21,7 @@ licenses(["notice"]) # Apache v2 exports_files(["LICENSE"]) COPTS = [ + "-fno-omit-frame-pointer", "-DBTHREAD_USE_FAST_PTHREAD_MUTEX", "-D__const__=__unused__", "-D_GNU_SOURCE", @@ -377,6 +378,10 @@ cc_library( "src/bvar/utils/*.h", "src/bvar/detail/*.h", ]), + defines = [] + select({ + "//bazel/config:with_babylon_counter": ["WITH_BABYLON_COUNTER=1"], + "//conditions:default": [], + }), copts = COPTS + select({ "//bazel/config:brpc_build_for_unittest": [ "-DBVAR_NOT_LINK_DEFAULT_VARIABLES", @@ -391,7 +396,10 @@ cc_library( visibility = ["//visibility:public"], deps = [ ":butil", - ], + ] + select({ + "//bazel/config:with_babylon_counter": ["@babylon//:concurrent_counter"], + "//conditions:default": [], + }), ) cc_library( diff --git a/MODULE.bazel b/MODULE.bazel index fac96037f0..43223fb162 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -17,6 +17,7 @@ bazel_dep(name = 'rules_cc', version = '0.0.1') bazel_dep(name = 'rules_proto', version = '4.0.0') bazel_dep(name = 'zlib', version = '1.3.1.bcr.5', repo_name = 'com_github_madler_zlib') bazel_dep(name = 'libunwind', version = '1.8.1', repo_name = 'com_github_libunwind_libunwind') +bazel_dep(name = 'babylon', version = '1.4.4') # --registry=https://baidu.github.io/babylon/registry bazel_dep(name = 'leveldb', version = '1.23', repo_name = 'com_github_google_leveldb') diff --git a/bazel/config/BUILD.bazel b/bazel/config/BUILD.bazel index 594dc5133b..d08ea2ec23 100644 --- a/bazel/config/BUILD.bazel +++ b/bazel/config/BUILD.bazel @@ -142,4 +142,10 @@ config_setting( name = "brpc_with_no_pthread_mutex_hook", define_values = {"BRPC_WITH_NO_PTHREAD_MUTEX_HOOK": "true"}, visibility = ["//visibility:public"], +) + +config_setting( + name = "with_babylon_counter", + define_values = {"with_babylon_counter": "true"}, + visibility = ["//visibility:public"], ) \ No newline at end of file diff --git a/docs/cn/bvar_c++.md b/docs/cn/bvar_c++.md index 478335fec6..ddc0de1b0a 100644 --- a/docs/cn/bvar_c++.md +++ b/docs/cn/bvar_c++.md @@ -650,3 +650,9 @@ static bvar::GFlag s_gflag_my_flag_that_matters("my_flag_that_matters"); // Expose the gflag as a bvar named "foo_bar_my_flag_that_matters". static bvar::GFlag s_gflag_my_flag_that_matters_with_prefix("foo_bar", "my_flag_that_matters"); ``` +# babylon counter bvar + +原理和性能见[babylon介绍](https://github.com/baidu/babylon/tree/main/example/use-counter-with-bvar)。 + +目前只支持bazel编译方式:`--define with_babylon_counter=true`,babylon版本要求:>= 1.4.4。打开开关后,即可使用基于babylon counter实现的更高性能的bvar,无需修改代码。 + diff --git a/docs/en/bvar_c++.md b/docs/en/bvar_c++.md index 73fac6c41b..160cdfcc08 100644 --- a/docs/en/bvar_c++.md +++ b/docs/en/bvar_c++.md @@ -490,3 +490,9 @@ static bvar::GFlag s_gflag_my_flag_that_matters("my_flag_that_matters"); // Expose the gflag as a bvar named "foo_bar_my_flag_that_matters". static bvar::GFlag s_gflag_my_flag_that_matters_with_prefix("foo_bar", "my_flag_that_matters"); ``` + +# babylon counter bvar + +For details on the principles and performance, see [Use concurrent counter optimize bvar](https://github.com/baidu/babylon/tree/main/example/use-counter-with-bvar). + +Currently, this feature is only supported by the Bazel compilation method: `--define with_babylon_counter=true`. The required Babylon version is 1.4.4 or higher. Once enabled, you can use the higher-performance bvar implementation based on the babylon counter without modifying your code. \ No newline at end of file diff --git a/src/butil/macros.h b/src/butil/macros.h index 63bb64dee6..d88a82d00e 100644 --- a/src/butil/macros.h +++ b/src/butil/macros.h @@ -44,11 +44,11 @@ // Declarations for a class to be unassignable. #define DISALLOW_ASSIGN(TypeName) \ - BUTIL_DELETE_FUNCTION(void operator=(const TypeName&)) + BUTIL_DELETE_FUNCTION(TypeName& operator=(const TypeName&)) // Declarations for a class to be move-unassignable. #define DISALLOW_MOVE_ASSIGN(TypeName) \ - BUTIL_DELETE_FUNCTION(void operator=(TypeName&&)) + BUTIL_DELETE_FUNCTION(TypeName& operator=(TypeName&&)) // A macro to disallow the copy constructor and operator= functions. #define DISALLOW_COPY_AND_ASSIGN(TypeName) \ diff --git a/src/butil/scoped_lock.h b/src/butil/scoped_lock.h index 45f52301b5..1111daab8c 100644 --- a/src/butil/scoped_lock.h +++ b/src/butil/scoped_lock.h @@ -378,7 +378,7 @@ template<> class unique_lock { namespace butil { -// Lock both lck1 and lck2 without the dead lock issue +// Lock both lck1 and lck2 without the deadlock issue. template void double_lock(std::unique_lock &lck1, std::unique_lock &lck2) { DCHECK(!lck1.owns_lock()); diff --git a/src/bvar/detail/percentile.cpp b/src/bvar/detail/percentile.cpp index 37181cc3a8..99de328c42 100644 --- a/src/bvar/detail/percentile.cpp +++ b/src/bvar/detail/percentile.cpp @@ -22,7 +22,7 @@ namespace bvar { namespace detail { - +#if !WITH_BABYLON_COUNTER inline uint32_t ones32(uint32_t x) { /* 32-bit recursive reduction using SWAR... * but first step is mapping 2-bit values @@ -127,6 +127,41 @@ Percentile &Percentile::operator<<(int64_t latency) { agent->merge_global(AddLatency(latency), _combiner); return *this; } +#else +Percentile::value_type Percentile::reset() { + constexpr static size_t SAMPLE_SIZE = value_type::SAMPLE_SIZE; + value_type result; + _concurrent_sampler.for_each([&]( + size_t index, const babylon::ConcurrentSampler::SampleBucket& bucket) { + result.merge(bucket, index); + auto capacity = _concurrent_sampler.bucket_capacity(index); + auto num_added = bucket.record_num.load(::std::memory_order_relaxed); + if (capacity < SAMPLE_SIZE && num_added > capacity) { + capacity = std::min(SAMPLE_SIZE, num_added * 1.5); + _concurrent_sampler.set_bucket_capacity(index, capacity); + } + }); + _concurrent_sampler.reset(); + return result; +} +Percentile& Percentile::operator<<(int64_t value) { + if (BAIDU_UNLIKELY(value < 0)) { + // we don't check overflow(of uint32) in percentile because the + // overflowed value which is included in last range does not affect + // overall distribution of other values too much. + if (!_debug_name.empty()) { + LOG_EVERY_SECOND(WARNING) << "Input=" << value << " to `" << _debug_name + << "' is negative, drop"; + } else { + LOG_EVERY_SECOND(WARNING) << "Input=" << value << " to Percentile(" + << (void*)this << ") is negative, drop"; + } + } else { + _concurrent_sampler << value; + } + return *this; +} +#endif // WITH_BABYLON_COUNTER } // namespace detail } // namespace bvar diff --git a/src/bvar/detail/percentile.h b/src/bvar/detail/percentile.h index 186103b4af..bac729f162 100644 --- a/src/bvar/detail/percentile.h +++ b/src/bvar/detail/percentile.h @@ -32,6 +32,9 @@ #include "bvar/detail/combiner.h" // AgentCombiner #include "bvar/detail/sampler.h" // ReducerSampler #include "butil/fast_rand.h" +#if WITH_BABYLON_COUNTER +#include "babylon/concurrent/counter.h" +#endif // WITH_BABYLON_COUNTER namespace bvar { namespace detail { @@ -139,6 +142,43 @@ class PercentileInterval { _num_added += rhs._num_added; } +#if WITH_BABYLON_COUNTER + size_t merge(const babylon::ConcurrentSampler::SampleBucket& bucket) { + auto num_added = bucket.record_num.load(std::memory_order_acquire); + if (num_added == 0) { + return 0; + } + auto num_samples = std::min(num_added, static_cast(bucket.capacity)); + // If there is space, deposit directly. + if (_num_samples + num_samples <= SAMPLE_SIZE) { + __builtin_memcpy(_samples + _num_samples, bucket.data, + sizeof(uint32_t) * num_samples); + _num_samples += num_samples; + } else { + // Sample probability weighting. + float ratio = static_cast(num_samples) / num_added; + // Try to deposit directly first. + if (_num_samples < SAMPLE_SIZE) { + auto copy_size = SAMPLE_SIZE - _num_samples; + num_samples -= copy_size; + __builtin_memcpy(_samples + _num_samples, + bucket.data + num_samples, sizeof(uint32_t) * copy_size); + } + // The remaining samples are stored according to probability. + for (size_t i = 0; i < num_samples; ++i) { + auto index = butil::fast_rand() % + static_cast((_num_added + i) * ratio + 1); + if (index < SAMPLE_SIZE) { + _samples[index] = bucket.data[i]; + } + } + _num_samples = SAMPLE_SIZE; + } + _num_added += num_added; + return num_added; + } +#endif // WITH_BABYLON_COUNTER + // Randomly pick n samples from mutable_rhs to |this| template void merge_with_expectation(PercentileInterval& mutable_rhs, size_t n) { @@ -239,7 +279,9 @@ class AddLatency; template class PercentileSamples { public: +#if !WITH_BABYLON_COUNTER friend class AddLatency; +#endif // WITH_BABYLON_COUNTER static const size_t SAMPLE_SIZE = SAMPLE_SIZE_IN; @@ -324,6 +366,12 @@ friend class AddLatency; } } +#if WITH_BABYLON_COUNTER + void merge(const babylon::ConcurrentSampler::SampleBucket& bucket, size_t index) { + _num_added += get_interval_at(index).merge(bucket); + } +#endif // WITH_BABYLON_COUNTER + // Combine multiple into a single PercentileSamples template void combine_of(const Iterator& begin, const Iterator& end) { @@ -443,31 +491,36 @@ std::ostream &operator<<(std::ostream &os, const PercentileSamples &p) { typedef PercentileSamples<254> GlobalPercentileSamples; typedef PercentileSamples<30> ThreadLocalPercentileSamples; +namespace detail { +struct AddPercentileSamples { + template + void operator()(PercentileSamples &b1, + const PercentileSamples &b2) const { + b1.merge(b2); + } +}; +} // namespace detail + // A specialized reducer for finding the percentile of latencies. // NOTE: DON'T use it directly, use LatencyRecorder instead. +#if !WITH_BABYLON_COUNTER class Percentile { public: - struct AddPercentileSamples { - template - void operator()(PercentileSamples &b1, - const PercentileSamples &b2) const { - b1.merge(b2); - } - }; - - typedef GlobalPercentileSamples value_type; - typedef ReducerSampler sampler_type; + typedef GlobalPercentileSamples value_type; + typedef ReducerSamplersampler_type; typedef AgentCombiner combiner_type; - typedef typename combiner_type::self_shared_type shared_combiner_type; - typedef combiner_type::Agent agent_type; + detail::AddPercentileSamples> combiner_type; + typedef combiner_type::self_shared_type shared_combiner_type; + typedef combiner_type::Agent agent_type; + Percentile(); ~Percentile(); - AddPercentileSamples op() const { return AddPercentileSamples(); } + detail::AddPercentileSamples op() const { + return detail::AddPercentileSamples(); + } VoidOp inv_op() const { return VoidOp(); } // The sampler for windows over percentile. @@ -499,6 +552,56 @@ class Percentile { sampler_type* _sampler; std::string _debug_name; }; +#else +class Percentile { +public: + typedef GlobalPercentileSamples value_type; + typedef detail::AddPercentileSamples AddPercentileSamples; + typedef AddPercentileSamples Op; + typedef VoidOp InvOp; + typedef ReducerSampler sampler_type; + + Percentile() = default; + DISALLOW_COPY_AND_MOVE(Percentile); + ~Percentile() noexcept { + if (NULL != _sampler) { + _sampler->destroy(); + } + } + + Op op() const { return Op(); } + InvOp inv_op() const { return InvOp(); } + + sampler_type* get_sampler() { + if (NULL == _sampler) { + _sampler = new sampler_type(this); + _sampler->schedule(); + } + return _sampler; + } + + value_type reset(); + + value_type get_value() const { + LOG_EVERY_SECOND(ERROR) << "Percentile should never call this get_value()"; + return value_type(); + } + + Percentile& operator<<(int64_t value); + + bool valid() const { return true; } + + // This name is useful for warning negative latencies in operator<< + void set_debug_name(const butil::StringPiece& name) { + _debug_name.assign(name.data(), name.size()); + } + +private: + babylon::ConcurrentSampler _concurrent_sampler; + sampler_type* _sampler{NULL}; + std::string _debug_name; +}; +#endif // WITH_BABYLON_COUNTER } // namespace detail } // namespace bvar diff --git a/src/bvar/latency_recorder.cpp b/src/bvar/latency_recorder.cpp index f70894aed1..a951376d76 100644 --- a/src/bvar/latency_recorder.cpp +++ b/src/bvar/latency_recorder.cpp @@ -99,7 +99,7 @@ static int64_t double_to_random_int(double dval) { } static int64_t get_window_recorder_qps(void* arg) { - detail::Sample s; + Sample s; static_cast(arg)->get_span(&s); // Use floating point to avoid overflow. if (s.time_us <= 0) { diff --git a/src/bvar/latency_recorder.h b/src/bvar/latency_recorder.h index 5e7029923b..874b1178d7 100644 --- a/src/bvar/latency_recorder.h +++ b/src/bvar/latency_recorder.h @@ -38,7 +38,7 @@ typedef Window PercentileWindow; class CDF : public Variable { public: explicit CDF(PercentileWindow* w); - ~CDF(); + ~CDF() override; void describe(std::ostream& os, bool quote_string) const override; int describe_series(std::ostream& os, const SeriesOptions& options) const override; private: @@ -77,7 +77,7 @@ class LatencyRecorder : public detail::LatencyRecorderBase { public: LatencyRecorder() : Base(-1) {} explicit LatencyRecorder(time_t window_size) : Base(window_size) {} - explicit LatencyRecorder(const butil::StringPiece& prefix) : Base(-1) { + LatencyRecorder(const butil::StringPiece& prefix) : Base(-1) { expose(prefix); } LatencyRecorder(const butil::StringPiece& prefix, diff --git a/src/bvar/recorder.h b/src/bvar/recorder.h index 99bdfa752e..b28b6372f6 100644 --- a/src/bvar/recorder.h +++ b/src/bvar/recorder.h @@ -27,6 +27,9 @@ #include "bvar/variable.h" #include "bvar/window.h" #include "bvar/detail/sampler.h" +#if WITH_BABYLON_COUNTER +#include "babylon/concurrent/counter.h" +#endif // WITH_BABYLON_COUNTER namespace bvar { @@ -76,11 +79,22 @@ inline std::ostream& operator<<(std::ostream& os, const Stat& s) { } } +namespace detail { +struct AddStat { + void operator()(Stat& s1, const Stat& s2) const { s1 += s2; } +}; + +struct MinusStat { + void operator()(Stat& s1, const Stat& s2) const { s1 -= s2; } +}; +} // namespace detail + // For calculating average of numbers. // Example: // IntRecorder latency; // latency << 1 << 3 << 5; // CHECK_EQ(3, latency.average()); +#if !WITH_BABYLON_COUNTER class IntRecorder : public Variable { public: // Compressing format: @@ -92,16 +106,10 @@ class IntRecorder : public Variable { BAIDU_CASSERT(SUM_BIT_WIDTH > 32 && SUM_BIT_WIDTH < 64, SUM_BIT_WIDTH_must_be_between_33_and_63); - struct AddStat { - void operator()(Stat& s1, const Stat& s2) const { s1 += s2; } - }; - struct MinusStat { - void operator()(Stat& s1, const Stat& s2) const { s1 -= s2; } - }; - typedef Stat value_type; typedef detail::ReducerSampler sampler_type; + detail::AddStat, + detail::MinusStat> sampler_type; typedef Stat SampleSet; @@ -118,7 +126,7 @@ class IntRecorder : public Variable { IntRecorder() : _combiner(std::make_shared()), _sampler(NULL) {} - explicit IntRecorder(const butil::StringPiece& name) : IntRecorder() { + IntRecorder(const butil::StringPiece& name) : IntRecorder() { expose(name); } @@ -154,8 +162,8 @@ class IntRecorder : public Variable { return _combiner->reset_all_agents(); } - AddStat op() const { return AddStat(); } - MinusStat inv_op() const { return MinusStat(); } + detail::AddStat op() const { return detail::AddStat(); } + detail::MinusStat inv_op() const { return detail::MinusStat(); } void describe(std::ostream& os, bool /*quote_string*/) const override { os << get_value(); @@ -286,6 +294,100 @@ inline IntRecorder& IntRecorder::operator<<(int64_t sample) { n, _compress(num + 1, sum + complement))); return *this; } +#else // WITH_BABYLON_COUNTER +class IntRecorder : public Variable { +public: + typedef Stat value_type; + typedef detail::AddStat Op; + typedef detail::MinusStat InvOp; + typedef detail::ReducerSampler sampler_type; + + COMMON_VARIABLE_CONSTRUCTOR(IntRecorder); + + DISALLOW_COPY_AND_MOVE(IntRecorder); + + ~IntRecorder() override { + hide(); + if (NULL != _sampler) { + _sampler->destroy(); + } + } + + // Note: The input type is acutally int. Use int64_t to check overflow. + IntRecorder& operator<<(int64_t value) { + if (BAIDU_UNLIKELY((int64_t)(int)value != value)) { + const char* reason = NULL; + if (value > std::numeric_limits::max()) { + reason = "overflows"; + value = std::numeric_limits::max(); + } else { + reason = "underflows"; + value = std::numeric_limits::min(); + } + // Truncate to be max or min of int. We're using 44 bits to store the + // sum thus following aggregations are not likely to be over/underflow. + if (!name().empty()) { + LOG(WARNING) << "Input=" << value << " to `" << name() + << "\' " << reason; + } else if (!_debug_name.empty()) { + LOG(WARNING) << "Input=" << value << " to `" << _debug_name + << "\' " << reason; + } else { + LOG(WARNING) << "Input=" << value << " to IntRecorder(" + << (void*)this << ") " << reason; + } + } + + _summer << value; + return *this; + } + + int64_t average() const { + return get_value().get_average_int(); + } + + double average(double) const { + return get_value().get_average_double(); + } + + value_type get_value() const { + auto summary = _summer.value(); + return value_type{summary.sum, static_cast(summary.num)}; + } + + value_type reset() { + LOG_EVERY_SECOND(ERROR) << "IntRecorder with babylon counter should never call reset()"; + return get_value(); + } + + Op op() const { return Op(); } + InvOp inv_op() const { return InvOp(); } + + void describe(::std::ostream& os, bool) const override { + os << get_value(); + } + + bool valid() const { return true; } + + sampler_type* get_sampler() { + if (NULL == _sampler) { + _sampler = new sampler_type(this); + _sampler->schedule(); + } + return _sampler; + } + + // This name is useful for printing overflow log in operator<< since + // IntRecorder is often used as the source of data and not exposed. + void set_debug_name(const butil::StringPiece& name) { + _debug_name.assign(name.data(), name.size()); + } +private: + babylon::ConcurrentSummer _summer; + sampler_type* _sampler{NULL}; + std::string _debug_name; +}; +#endif // WITH_BABYLON_COUNTER } // namespace bvar diff --git a/src/bvar/reducer.h b/src/bvar/reducer.h index ccf7805420..543e77c8b0 100644 --- a/src/bvar/reducer.h +++ b/src/bvar/reducer.h @@ -29,9 +29,133 @@ #include "bvar/detail/sampler.h" // ReducerSampler #include "bvar/detail/series.h" #include "bvar/window.h" +#if WITH_BABYLON_COUNTER +#include "babylon/concurrent/counter.h" +#endif // WITH_BABYLON_COUNTER namespace bvar { +namespace detail { +template +class SeriesSamplerImpl : public Sampler { +public: + SeriesSamplerImpl(O* owner, const Op& op) + : _owner(owner), _series(op) {} + void take_sample() override { _series.append(_owner->get_value()); } + void describe(std::ostream& os) { _series.describe(os, NULL); } + +private: + O* _owner; + Series _series; +}; + +#if WITH_BABYLON_COUNTER +template +class BabylonVariable: public Variable { +public: + typedef ReducerSampler sampler_type; + typedef SeriesSamplerImpl series_sampler_type; + + BabylonVariable() = default; + + template::value, bool>::type = false> + BabylonVariable(U) {} + // For Maxer. + template::value, bool>::type = false> + BabylonVariable(U default_value) : _counter(default_value) {} + + DISALLOW_COPY_AND_MOVE(BabylonVariable); + + ~BabylonVariable() override { + hide(); + if (NULL != _sampler) { + _sampler->destroy(); + } + if (NULL != _series_sampler) { + _series_sampler->destroy(); + } + } + + BabylonVariable& operator<<(T value) { + _counter << value; + return *this; + } + + sampler_type* get_sampler() { + if (NULL == _sampler) { + _sampler = new sampler_type(this); + _sampler->schedule(); + } + return _sampler; + } + + T get_value() const { + return _counter.value(); + } + + T reset() { + if (BAIDU_UNLIKELY((!butil::is_same::value))) { + CHECK(false) << "You should not call Reducer<" << butil::class_name_str() + << ", " << butil::class_name_str() << ">::get_value() when a" + << " Window<> is used because the operator does not have inverse."; + return get_value(); + } + + T result = _counter.value(); + _counter.reset(); + return result; + } + + bool valid() const { return true; } + + const Op& op() const { return _op; } + const InvOp& inv_op() const { return _inv_op;} + + void describe(std::ostream& os, bool quote_string) const override { + if (butil::is_same::value && quote_string) { + os << '"' << get_value() << '"'; + } else { + os << get_value(); + } + } + + int describe_series(std::ostream& os, const SeriesOptions& options) const override { + if (NULL == _series_sampler) { + return 1; + } + if (!options.test_only) { + _series_sampler->describe(os); + } + return 0; + } + +protected: + int expose_impl(const butil::StringPiece& prefix, + const butil::StringPiece& name, + DisplayFilter display_filter) override { + const int rc = Variable::expose_impl(prefix, name, display_filter); + if (rc == 0 && NULL == _series_sampler && + !butil::is_same::value && + !butil::is_same::value && + FLAGS_save_series) { + _series_sampler = new series_sampler_type(this, _op); + _series_sampler->schedule(); + } + return rc; + } + +private: + Counter _counter; + sampler_type* _sampler{NULL}; + series_sampler_type* _series_sampler{NULL}; + Op _op; + InvOp _inv_op; +}; +#endif // WITH_BABYLON_COUNTER +} // namespace detail + // Reduce multiple values into one with `Op': e1 Op e2 Op e3 ... // `Op' shall satisfy: // - associative: a Op (b Op c) == (a Op b) Op c @@ -68,22 +192,12 @@ namespace bvar { template class Reducer : public Variable { public: - typedef typename detail::AgentCombiner combiner_type; + typedef detail::AgentCombiner combiner_type; typedef typename combiner_type::self_shared_type shared_combiner_type; typedef typename combiner_type::Agent agent_type; typedef detail::ReducerSampler sampler_type; - class SeriesSampler : public detail::Sampler { - public: - SeriesSampler(Reducer* owner, const Op& op) - : _owner(owner), _series(op) {} - void take_sample() override { _series.append(_owner->get_value()); } - void describe(std::ostream& os) { _series.describe(os, NULL); } - private: - Reducer* _owner; - detail::Series _series; - }; + typedef detail::SeriesSamplerImpl SeriesSampler; -public: // The `identify' must satisfy: identity Op a == a explicit Reducer(typename butil::add_cr_non_integral::type identity = T(), const Op& op = Op(), const InvOp& inv_op = InvOp()) @@ -205,34 +319,55 @@ inline Reducer& Reducer::operator<<( namespace detail { template struct AddTo { - void operator()(Tp & lhs, + void operator()(Tp & lhs, typename butil::add_cr_non_integral::type rhs) const { lhs += rhs; } }; template struct MinusFrom { - void operator()(Tp & lhs, + void operator()(Tp & lhs, typename butil::add_cr_non_integral::type rhs) const { lhs -= rhs; } }; -} -template +} // namespace detail + +template class Adder : public Reducer, detail::MinusFrom > { public: typedef Reducer, detail::MinusFrom > Base; typedef T value_type; typedef typename Base::sampler_type sampler_type; -public: + Adder() : Base() {} - explicit Adder(const butil::StringPiece& name) : Base() { + Adder(const butil::StringPiece& name) : Base() { this->expose(name); } Adder(const butil::StringPiece& prefix, const butil::StringPiece& name) : Base() { this->expose_as(prefix, name); } - ~Adder() { Variable::hide(); } + ~Adder() override { Variable::hide(); } +}; + +#if WITH_BABYLON_COUNTER +// Numerical types supported by babylon counter. +template +class Adder>::value>> + : public detail::BabylonVariable, + detail::AddTo, detail::MinusFrom> { +public: + typedef T value_type; +private: + typedef detail::BabylonVariable, + detail::AddTo, detail::MinusFrom> Base; +public: + typedef detail::AddTo Op; + typedef detail::MinusFrom InvOp; + typedef typename Base::sampler_type sampler_type; + + COMMON_VARIABLE_CONSTRUCTOR(Adder); }; +#endif // WITH_BABYLON_COUNTER // bvar::Maxer max_value; // max_value << 1 << 2 << 3 << 4; @@ -248,17 +383,19 @@ struct MaxTo { } } }; + class LatencyRecorderBase; -} -template +} // namespace detail + +template class Maxer : public Reducer > { public: typedef Reducer > Base; typedef T value_type; typedef typename Base::sampler_type sampler_type; -public: + Maxer() : Base(std::numeric_limits::min()) {} - explicit Maxer(const butil::StringPiece& name) + Maxer(const butil::StringPiece& name) : Base(std::numeric_limits::min()) { this->expose(name); } @@ -266,7 +403,8 @@ class Maxer : public Reducer > { : Base(std::numeric_limits::min()) { this->expose_as(prefix, name); } - ~Maxer() { Variable::hide(); } + ~Maxer() override { Variable::hide(); } + private: friend class detail::LatencyRecorderBase; // The following private funcition a now used in LatencyRecorder, @@ -283,32 +421,83 @@ class Maxer : public Reducer > { } }; +#if WITH_BABYLON_COUNTER +namespace detail { +template +class ConcurrentMaxer : public babylon::GenericsConcurrentMaxer { + typedef babylon::GenericsConcurrentMaxer Base; +public: + ConcurrentMaxer() = default; + ConcurrentMaxer(T default_value) : _default_value(default_value) {} + + T value() const { + T result; + if (!Base::value(result)) { + return _default_value; + } + return std::max(result, _default_value); + } +private: + T _default_value{0}; +}; +} // namespace detail + +// Numerical types supported by babylon counter. +template +class Maxer>::value>> + : public detail::BabylonVariable, + detail::MaxTo, detail::VoidOp> { +public: + typedef T value_type; +private: + typedef detail::BabylonVariable, + detail::MaxTo, detail::VoidOp> Base; +public: + typedef detail::MaxTo Op; + typedef detail::VoidOp InvOp; + typedef typename Base::sampler_type sampler_type; + + COMMON_VARIABLE_CONSTRUCTOR(Maxer); + +private: +friend class detail::LatencyRecorderBase; + + Maxer(T default_value) : Base(default_value) {} + Maxer(T default_value, const butil::StringPiece& prefix, const butil::StringPiece& name) + : Base(default_value) { + Variable::expose_as(prefix, name); + } + Maxer(T default_value, const butil::StringPiece& name) + : Base(default_value) { + Variable::expose(name); + } +}; +#endif // WITH_BABYLON_COUNTER + // bvar::Miner min_value; // min_value << 1 << 2 << 3 << 4; // LOG(INFO) << min_value.get_value(); // 1 namespace detail { - -template +template struct MinTo { - void operator()(Tp & lhs, + void operator()(Tp & lhs, typename butil::add_cr_non_integral::type rhs) const { if (rhs < lhs) { lhs = rhs; } } }; - } // namespace detail -template +template class Miner : public Reducer > { public: typedef Reducer > Base; typedef T value_type; typedef typename Base::sampler_type sampler_type; -public: + Miner() : Base(std::numeric_limits::max()) {} - explicit Miner(const butil::StringPiece& name) + Miner(const butil::StringPiece& name) : Base(std::numeric_limits::max()) { this->expose(name); } @@ -316,8 +505,28 @@ class Miner : public Reducer > { : Base(std::numeric_limits::max()) { this->expose_as(prefix, name); } - ~Miner() { Variable::hide(); } + ~Miner() override { Variable::hide(); } +}; + +#if WITH_BABYLON_COUNTER +// Numerical types supported by babylon counter. +template +class Miner>::value>> + : public detail::BabylonVariable, + detail::MinTo, detail::VoidOp> { +public: + typedef T value_type; +private: + typedef detail::BabylonVariable, + detail::MinTo, detail::VoidOp> Base; +public: + typedef detail::MinTo Op; + typedef detail::VoidOp InvOp; + typedef typename Base::sampler_type sampler_type; + + COMMON_VARIABLE_CONSTRUCTOR(Miner); }; +#endif // WITH_BABYLON_COUNTER } // namespace bvar diff --git a/src/bvar/variable.h b/src/bvar/variable.h index 86e9cd0c34..f01626fd13 100644 --- a/src/bvar/variable.h +++ b/src/bvar/variable.h @@ -39,6 +39,16 @@ namespace bvar { DECLARE_bool(save_series); +#define COMMON_VARIABLE_CONSTRUCTOR(TypeName) \ + TypeName() = default; \ + TypeName(const butil::StringPiece& name) { \ + this->expose(name); \ + } \ + TypeName(const butil::StringPiece& prefix, const butil::StringPiece& name) { \ + this->expose_as(prefix, name); \ + } \ + + // Bitwise masks of displayable targets enum DisplayFilter { DISPLAY_ON_HTML = 1, diff --git a/test/bvar_lock_timer_unittest.cpp b/test/bvar_lock_timer_unittest.cpp index f69ba3a298..c4299b55d3 100644 --- a/test/bvar_lock_timer_unittest.cpp +++ b/test/bvar_lock_timer_unittest.cpp @@ -197,8 +197,12 @@ TEST_F(LockTimerTest, double_lock_time) { ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads), (size_t)r1.count()); LOG(INFO) << r0; LOG(INFO) << r1._latency; +#if !WITH_BABYLON_COUNTER + // reset() of IntRecorder with babylon counter does not work, + // should never call reset(). r0.reset(); r1._latency.reset(); +#endif // !WITH_BABYLON_COUNTER DoubleLockArg arg1; arg1.m0.set_recorder(r1); arg1.m1.set_recorder(r0); @@ -209,8 +213,13 @@ TEST_F(LockTimerTest, double_lock_time) { for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) { pthread_join(threads[i], NULL); } +#if !WITH_BABYLON_COUNTER ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads), (size_t)r0.get_value().num); ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads), (size_t)r1.count()); +#else + ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads) * 2, (size_t)r0.get_value().num); + ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads) * 2, (size_t)r1.count()); +#endif // !WITH_BABYLON_COUNTER LOG(INFO) << r0; LOG(INFO) << r1._latency; } diff --git a/test/bvar_percentile_unittest.cpp b/test/bvar_percentile_unittest.cpp index 696e2a5c6f..f647e272ba 100644 --- a/test/bvar_percentile_unittest.cpp +++ b/test/bvar_percentile_unittest.cpp @@ -137,6 +137,7 @@ TEST_F(PercentileTest, merge2) { << " belong_to_b2=" << belong_to_b2; } +#if !WITH_BABYLON_COUNTER TEST_F(PercentileTest, combine_of) { // Combine multiple percentle samplers into one const int num_samplers = 10; @@ -146,7 +147,7 @@ TEST_F(PercentileTest, combine_of) { const int N = 1000; size_t belongs[num_samplers] = {0}; size_t total = 0; - for (int repeat = 0; repeat < 100; ++repeat) { + for (int repeat = 0; repeat < 1; ++repeat) { bvar::detail::Percentile p[num_samplers]; for (int i = 0; i < num_samplers; ++i) { for (int j = 0; j < N * (i + 1); ++j) { @@ -186,3 +187,4 @@ TEST_F(PercentileTest, combine_of) { } } +#endif // !WITH_BABYLON_COUNTER diff --git a/test/bvar_recorder_unittest.cpp b/test/bvar_recorder_unittest.cpp index 659db01b2c..c0d3206244 100644 --- a/test/bvar_recorder_unittest.cpp +++ b/test/bvar_recorder_unittest.cpp @@ -29,6 +29,7 @@ #include namespace { +#if !WITH_BABYLON_COUNTER TEST(RecorderTest, test_complement) { LOG(INFO) << "sizeof(LatencyRecorder)=" << sizeof(bvar::LatencyRecorder) << " " << sizeof(bvar::detail::Percentile) @@ -61,6 +62,7 @@ TEST(RecorderTest, test_compress_negtive_number) { ASSERT_EQ(a, bvar::IntRecorder::_extend_sign_bit(bvar::IntRecorder::_get_sum(compressed))); } } +#endif // !WITH_BABYLON_COUNTER TEST(RecorderTest, sanity) { { diff --git a/test/bvar_reducer_unittest.cpp b/test/bvar_reducer_unittest.cpp index 45e42cd3bc..48e13b3c0b 100644 --- a/test/bvar_reducer_unittest.cpp +++ b/test/bvar_reducer_unittest.cpp @@ -248,8 +248,8 @@ void ReducerTest_window() { } TEST_F(ReducerTest, window) { -#if !BRPC_WITH_GLOG ReducerTest_window(); +#if !BRPC_WITH_GLOG logging::StringSink log_str; logging::LogSink* old_sink = logging::SetLogSink(&log_str); sleep(1);