Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ common --registry=https://bcr.bazel.build
common --registry=https://baidu.github.io/babylon/registry

build --cxxopt="-std=c++17"
build --copt="-fno-omit-frame-pointer"
# Use gnu17 for asm keyword.
build --conlyopt="-std=gnu17"

Expand All @@ -33,8 +34,8 @@ build --features=per_object_debug_info
build --define absl=1

# For brpc.
build --define=BRPC_WITH_GLOG=true
test --define=BRPC_BUILD_FOR_UNITTEST=true
test --test_output=streamed

# Pass PATH, CC, CXX and LLVM_CONFIG variables from the environment.
build --action_env=CC
Expand Down
2 changes: 1 addition & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
### What problem does this PR solve?

Issue Number:
Issue Number: resolve

Problem Summary:

Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/ci-linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,12 @@ jobs:
run: |
cd test
sh ./run_tests.sh

bazel-bvar-unittest:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v2
- run: bazel test --verbose_failures //test:bvar_test
- run: bazel test --verbose_failures --define with_babylon_counter=true //test:bvar_test
- run: bazel test --verbose_failures --action_env=CC=clang //test:bvar_test
- run: bazel test --verbose_failures --action_env=CC=clang --define with_babylon_counter=true //test:bvar_test
1 change: 0 additions & 1 deletion .github/workflows/ci-macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ jobs:

compile-with-bazel:
runs-on: macos-latest # https://github.com/actions/runner-images

steps:
- uses: actions/checkout@v2
- run: bazel build --verbose_failures -- //:brpc -//example/...
10 changes: 9 additions & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ licenses(["notice"]) # Apache v2
exports_files(["LICENSE"])

COPTS = [
"-fno-omit-frame-pointer",
"-DBTHREAD_USE_FAST_PTHREAD_MUTEX",
"-D__const__=__unused__",
"-D_GNU_SOURCE",
Expand Down Expand Up @@ -377,6 +378,10 @@ cc_library(
"src/bvar/utils/*.h",
"src/bvar/detail/*.h",
]),
defines = [] + select({
"//bazel/config:with_babylon_counter": ["WITH_BABYLON_COUNTER=1"],
"//conditions:default": [],
}),
copts = COPTS + select({
"//bazel/config:brpc_build_for_unittest": [
"-DBVAR_NOT_LINK_DEFAULT_VARIABLES",
Expand All @@ -391,7 +396,10 @@ cc_library(
visibility = ["//visibility:public"],
deps = [
":butil",
],
] + select({
"//bazel/config:with_babylon_counter": ["@babylon//:concurrent_counter"],
"//conditions:default": [],
}),
)

cc_library(
Expand Down
1 change: 1 addition & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bazel_dep(name = 'rules_cc', version = '0.0.1')
bazel_dep(name = 'rules_proto', version = '4.0.0')
bazel_dep(name = 'zlib', version = '1.3.1.bcr.5', repo_name = 'com_github_madler_zlib')
bazel_dep(name = 'libunwind', version = '1.8.1', repo_name = 'com_github_libunwind_libunwind')
bazel_dep(name = 'babylon', version = '1.4.4')

# --registry=https://baidu.github.io/babylon/registry
bazel_dep(name = 'leveldb', version = '1.23', repo_name = 'com_github_google_leveldb')
Expand Down
6 changes: 6 additions & 0 deletions bazel/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,10 @@ config_setting(
name = "brpc_with_no_pthread_mutex_hook",
define_values = {"BRPC_WITH_NO_PTHREAD_MUTEX_HOOK": "true"},
visibility = ["//visibility:public"],
)

config_setting(
name = "with_babylon_counter",
define_values = {"with_babylon_counter": "true"},
visibility = ["//visibility:public"],
)
6 changes: 6 additions & 0 deletions docs/cn/bvar_c++.md
Original file line number Diff line number Diff line change
Expand Up @@ -650,3 +650,9 @@ static bvar::GFlag s_gflag_my_flag_that_matters("my_flag_that_matters");
// Expose the gflag as a bvar named "foo_bar_my_flag_that_matters".
static bvar::GFlag s_gflag_my_flag_that_matters_with_prefix("foo_bar", "my_flag_that_matters");
```
# babylon counter bvar

原理和性能见[babylon介绍](https://github.com/baidu/babylon/tree/main/example/use-counter-with-bvar)。

目前只支持bazel编译方式:`--define with_babylon_counter=true`,babylon版本要求:>= 1.4.4。打开开关后,即可使用基于babylon counter实现的更高性能的bvar,无需修改代码。

6 changes: 6 additions & 0 deletions docs/en/bvar_c++.md
Original file line number Diff line number Diff line change
Expand Up @@ -490,3 +490,9 @@ static bvar::GFlag s_gflag_my_flag_that_matters("my_flag_that_matters");
// Expose the gflag as a bvar named "foo_bar_my_flag_that_matters".
static bvar::GFlag s_gflag_my_flag_that_matters_with_prefix("foo_bar", "my_flag_that_matters");
```

# babylon counter bvar

For details on the principles and performance, see [Use concurrent counter optimize bvar](https://github.com/baidu/babylon/tree/main/example/use-counter-with-bvar).

Currently, this feature is only supported by the Bazel compilation method: `--define with_babylon_counter=true`. The required Babylon version is 1.4.4 or higher. Once enabled, you can use the higher-performance bvar implementation based on the babylon counter without modifying your code.
4 changes: 2 additions & 2 deletions src/butil/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@

// Declarations for a class to be unassignable.
#define DISALLOW_ASSIGN(TypeName) \
BUTIL_DELETE_FUNCTION(void operator=(const TypeName&))
BUTIL_DELETE_FUNCTION(TypeName& operator=(const TypeName&))

// Declarations for a class to be move-unassignable.
#define DISALLOW_MOVE_ASSIGN(TypeName) \
BUTIL_DELETE_FUNCTION(void operator=(TypeName&&))
BUTIL_DELETE_FUNCTION(TypeName& operator=(TypeName&&))

// A macro to disallow the copy constructor and operator= functions.
#define DISALLOW_COPY_AND_ASSIGN(TypeName) \
Expand Down
2 changes: 1 addition & 1 deletion src/butil/scoped_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ template<> class unique_lock<pthread_spinlock_t> {

namespace butil {

// Lock both lck1 and lck2 without the dead lock issue
// Lock both lck1 and lck2 without the deadlock issue.
template <typename Mutex1, typename Mutex2>
void double_lock(std::unique_lock<Mutex1> &lck1, std::unique_lock<Mutex2> &lck2) {
DCHECK(!lck1.owns_lock());
Expand Down
37 changes: 36 additions & 1 deletion src/bvar/detail/percentile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace bvar {
namespace detail {

#if !WITH_BABYLON_COUNTER
inline uint32_t ones32(uint32_t x) {
/* 32-bit recursive reduction using SWAR...
* but first step is mapping 2-bit values
Expand Down Expand Up @@ -127,6 +127,41 @@ Percentile &Percentile::operator<<(int64_t latency) {
agent->merge_global(AddLatency(latency), _combiner);
return *this;
}
#else
Percentile::value_type Percentile::reset() {
constexpr static size_t SAMPLE_SIZE = value_type::SAMPLE_SIZE;
value_type result;
_concurrent_sampler.for_each([&](
size_t index, const babylon::ConcurrentSampler::SampleBucket& bucket) {
result.merge(bucket, index);
auto capacity = _concurrent_sampler.bucket_capacity(index);
auto num_added = bucket.record_num.load(::std::memory_order_relaxed);
if (capacity < SAMPLE_SIZE && num_added > capacity) {
capacity = std::min<size_t>(SAMPLE_SIZE, num_added * 1.5);
_concurrent_sampler.set_bucket_capacity(index, capacity);
}
});
_concurrent_sampler.reset();
return result;
}

Percentile& Percentile::operator<<(int64_t value) {
if (BAIDU_UNLIKELY(value < 0)) {
// we don't check overflow(of uint32) in percentile because the
// overflowed value which is included in last range does not affect
// overall distribution of other values too much.
if (!_debug_name.empty()) {
LOG_EVERY_SECOND(WARNING) << "Input=" << value << " to `" << _debug_name
<< "' is negative, drop";
} else {
LOG_EVERY_SECOND(WARNING) << "Input=" << value << " to Percentile("
<< (void*)this << ") is negative, drop";
}
} else {
_concurrent_sampler << value;
}
return *this;
}
#endif // WITH_BABYLON_COUNTER
} // namespace detail
} // namespace bvar
135 changes: 119 additions & 16 deletions src/bvar/detail/percentile.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
#include "bvar/detail/combiner.h" // AgentCombiner
#include "bvar/detail/sampler.h" // ReducerSampler
#include "butil/fast_rand.h"
#if WITH_BABYLON_COUNTER
#include "babylon/concurrent/counter.h"
#endif // WITH_BABYLON_COUNTER

namespace bvar {
namespace detail {
Expand Down Expand Up @@ -139,6 +142,43 @@ class PercentileInterval {
_num_added += rhs._num_added;
}

#if WITH_BABYLON_COUNTER
size_t merge(const babylon::ConcurrentSampler::SampleBucket& bucket) {
auto num_added = bucket.record_num.load(std::memory_order_acquire);
if (num_added == 0) {
return 0;
}
auto num_samples = std::min(num_added, static_cast<uint32_t>(bucket.capacity));
// If there is space, deposit directly.
if (_num_samples + num_samples <= SAMPLE_SIZE) {
__builtin_memcpy(_samples + _num_samples, bucket.data,
sizeof(uint32_t) * num_samples);
_num_samples += num_samples;
} else {
// Sample probability weighting.
float ratio = static_cast<float>(num_samples) / num_added;
// Try to deposit directly first.
if (_num_samples < SAMPLE_SIZE) {
auto copy_size = SAMPLE_SIZE - _num_samples;
num_samples -= copy_size;
__builtin_memcpy(_samples + _num_samples,
bucket.data + num_samples, sizeof(uint32_t) * copy_size);
}
// The remaining samples are stored according to probability.
for (size_t i = 0; i < num_samples; ++i) {
auto index = butil::fast_rand() %
static_cast<uint64_t>((_num_added + i) * ratio + 1);
if (index < SAMPLE_SIZE) {
_samples[index] = bucket.data[i];
}
}
_num_samples = SAMPLE_SIZE;
}
_num_added += num_added;
return num_added;
}
#endif // WITH_BABYLON_COUNTER

// Randomly pick n samples from mutable_rhs to |this|
template <size_t size2>
void merge_with_expectation(PercentileInterval<size2>& mutable_rhs, size_t n) {
Expand Down Expand Up @@ -239,7 +279,9 @@ class AddLatency;
template <size_t SAMPLE_SIZE_IN>
class PercentileSamples {
public:
#if !WITH_BABYLON_COUNTER
friend class AddLatency;
#endif // WITH_BABYLON_COUNTER

static const size_t SAMPLE_SIZE = SAMPLE_SIZE_IN;

Expand Down Expand Up @@ -324,6 +366,12 @@ friend class AddLatency;
}
}

#if WITH_BABYLON_COUNTER
void merge(const babylon::ConcurrentSampler::SampleBucket& bucket, size_t index) {
_num_added += get_interval_at(index).merge(bucket);
}
#endif // WITH_BABYLON_COUNTER

// Combine multiple into a single PercentileSamples
template <typename Iterator>
void combine_of(const Iterator& begin, const Iterator& end) {
Expand Down Expand Up @@ -443,31 +491,36 @@ std::ostream &operator<<(std::ostream &os, const PercentileSamples<size> &p) {
typedef PercentileSamples<254> GlobalPercentileSamples;
typedef PercentileSamples<30> ThreadLocalPercentileSamples;

namespace detail {
struct AddPercentileSamples {
template <size_t size1, size_t size2>
void operator()(PercentileSamples<size1> &b1,
const PercentileSamples<size2> &b2) const {
b1.merge(b2);
}
};
} // namespace detail

// A specialized reducer for finding the percentile of latencies.
// NOTE: DON'T use it directly, use LatencyRecorder instead.
#if !WITH_BABYLON_COUNTER
class Percentile {
public:
struct AddPercentileSamples {
template <size_t size1, size_t size2>
void operator()(PercentileSamples<size1> &b1,
const PercentileSamples<size2> &b2) const {
b1.merge(b2);
}
};

typedef GlobalPercentileSamples value_type;
typedef ReducerSampler<Percentile,
GlobalPercentileSamples,
AddPercentileSamples, VoidOp> sampler_type;
typedef GlobalPercentileSamples value_type;
typedef ReducerSampler<Percentile, GlobalPercentileSamples,
detail::AddPercentileSamples, VoidOp>sampler_type;
typedef AgentCombiner <GlobalPercentileSamples,
ThreadLocalPercentileSamples,
AddPercentileSamples> combiner_type;
typedef typename combiner_type::self_shared_type shared_combiner_type;
typedef combiner_type::Agent agent_type;
detail::AddPercentileSamples> combiner_type;
typedef combiner_type::self_shared_type shared_combiner_type;
typedef combiner_type::Agent agent_type;

Percentile();
~Percentile();

AddPercentileSamples op() const { return AddPercentileSamples(); }
detail::AddPercentileSamples op() const {
return detail::AddPercentileSamples();
}
VoidOp inv_op() const { return VoidOp(); }

// The sampler for windows over percentile.
Expand Down Expand Up @@ -499,6 +552,56 @@ class Percentile {
sampler_type* _sampler;
std::string _debug_name;
};
#else
class Percentile {
public:
typedef GlobalPercentileSamples value_type;
typedef detail::AddPercentileSamples AddPercentileSamples;
typedef AddPercentileSamples Op;
typedef VoidOp InvOp;
typedef ReducerSampler<Percentile, value_type, Op, InvOp> sampler_type;

Percentile() = default;
DISALLOW_COPY_AND_MOVE(Percentile);
~Percentile() noexcept {
if (NULL != _sampler) {
_sampler->destroy();
}
}

Op op() const { return Op(); }
InvOp inv_op() const { return InvOp(); }

sampler_type* get_sampler() {
if (NULL == _sampler) {
_sampler = new sampler_type(this);
_sampler->schedule();
}
return _sampler;
}

value_type reset();

value_type get_value() const {
LOG_EVERY_SECOND(ERROR) << "Percentile should never call this get_value()";
return value_type();
}

Percentile& operator<<(int64_t value);

bool valid() const { return true; }

// This name is useful for warning negative latencies in operator<<
void set_debug_name(const butil::StringPiece& name) {
_debug_name.assign(name.data(), name.size());
}

private:
babylon::ConcurrentSampler _concurrent_sampler;
sampler_type* _sampler{NULL};
std::string _debug_name;
};
#endif // WITH_BABYLON_COUNTER

} // namespace detail
} // namespace bvar
Expand Down
2 changes: 1 addition & 1 deletion src/bvar/latency_recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ static int64_t double_to_random_int(double dval) {
}

static int64_t get_window_recorder_qps(void* arg) {
detail::Sample<Stat> s;
Sample<Stat> s;
static_cast<RecorderWindow*>(arg)->get_span(&s);
// Use floating point to avoid overflow.
if (s.time_us <= 0) {
Expand Down
Loading
Loading