Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 33 additions & 105 deletions src/butil/containers/doubly_buffered_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,83 +123,26 @@ class DoublyBufferedData {
// Returns 0 on success, -1 otherwise.
int Read(ScopedPtr* ptr);

// `fn(const T&)' will be called with foreground instance.
// This function is not blocked by Read() and Modify() in other threads.
// Returns 0 on success, otherwise on error.
template<typename Fn>
int Read(Fn&& fn);

// Modify background and foreground instances. fn(T&, ...) will be called
// twice. Modify() from different threads are exclusive from each other.
// NOTE: Call same series of fn to different equivalent instances should
// result in equivalent instances, otherwise foreground and background
// instance will be inconsistent.
template <typename Fn> size_t Modify(Fn& fn);
template <typename Fn, typename Arg1> size_t Modify(Fn& fn, const Arg1&);
template <typename Fn, typename Arg1, typename Arg2>
size_t Modify(Fn& fn, const Arg1&, const Arg2&);
template <typename Fn, typename... Args>
size_t Modify(Fn&& fn, Args&&... args);

// fn(T& background, const T& foreground, ...) will be called to background
// and foreground instances respectively.
template <typename Fn> size_t ModifyWithForeground(Fn& fn);
template <typename Fn, typename Arg1>
size_t ModifyWithForeground(Fn& fn, const Arg1&);
template <typename Fn, typename Arg1, typename Arg2>
size_t ModifyWithForeground(Fn& fn, const Arg1&, const Arg2&);
template <typename Fn, typename... Args>
size_t ModifyWithForeground(Fn&& fn, Args&&... args);

private:
template <typename Fn>
struct WithFG0 {
WithFG0(Fn& fn, T* data) : _fn(fn), _data(data) { }
size_t operator()(T& bg) {
return _fn(bg, (const T&)_data[&bg == _data]);
}
private:
Fn& _fn;
T* _data;
};

template <typename Fn, typename Arg1>
struct WithFG1 {
WithFG1(Fn& fn, T* data, const Arg1& arg1)
: _fn(fn), _data(data), _arg1(arg1) {}
size_t operator()(T& bg) {
return _fn(bg, (const T&)_data[&bg == _data], _arg1);
}
private:
Fn& _fn;
T* _data;
const Arg1& _arg1;
};

template <typename Fn, typename Arg1, typename Arg2>
struct WithFG2 {
WithFG2(Fn& fn, T* data, const Arg1& arg1, const Arg2& arg2)
: _fn(fn), _data(data), _arg1(arg1), _arg2(arg2) {}
size_t operator()(T& bg) {
return _fn(bg, (const T&)_data[&bg == _data], _arg1, _arg2);
}
private:
Fn& _fn;
T* _data;
const Arg1& _arg1;
const Arg2& _arg2;
};

template <typename Fn, typename Arg1>
struct Closure1 {
Closure1(Fn& fn, const Arg1& arg1) : _fn(fn), _arg1(arg1) {}
size_t operator()(T& bg) { return _fn(bg, _arg1); }
private:
Fn& _fn;
const Arg1& _arg1;
};

template <typename Fn, typename Arg1, typename Arg2>
struct Closure2 {
Closure2(Fn& fn, const Arg1& arg1, const Arg2& arg2)
: _fn(fn), _arg1(arg1), _arg2(arg2) {}
size_t operator()(T& bg) { return _fn(bg, _arg1, _arg2); }
private:
Fn& _fn;
const Arg1& _arg1;
const Arg2& _arg2;
};

const T* UnsafeRead() const {
return _data + _index.load(butil::memory_order_acquire);
}
Expand Down Expand Up @@ -253,7 +196,8 @@ template <typename T, typename TLS, bool AllowBthreadSuspended>
class DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSGroup {
public:
const static size_t RAW_BLOCK_SIZE = 4096;
const static size_t ELEMENTS_PER_BLOCK = RAW_BLOCK_SIZE / sizeof(Wrapper) > 0 ? RAW_BLOCK_SIZE / sizeof(Wrapper) : 1;
const static size_t ELEMENTS_PER_BLOCK =
RAW_BLOCK_SIZE / sizeof(Wrapper) > 0 ? RAW_BLOCK_SIZE / sizeof(Wrapper) : 1;

struct BAIDU_CACHELINE_ALIGNMENT ThreadBlock {
inline DoublyBufferedData::Wrapper* at(size_t offset) {
Expand Down Expand Up @@ -572,7 +516,20 @@ int DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Read(

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn& fn) {
int DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Read(Fn&& fn) {
BAIDU_CASSERT((is_result_void<Fn, const T&>::value),
"Fn must accept `const T&' and return void");
ScopedPtr ptr;
if (Read(&ptr) != 0) {
return -1;
}
fn(*ptr);
return 0;
}

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename... Args>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn&& fn, Args&&... args) {
// _modify_mutex sequences modifications. Using a separate mutex rather
// than _wrappers_mutex is to avoid blocking threads calling
// AddWrapper() or RemoveWrapper() too long. Most of the time, modifications
Expand All @@ -581,7 +538,7 @@ size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn& fn) {
int bg_index = !_index.load(butil::memory_order_relaxed);
// background instance is not accessed by other threads, being safe to
// modify.
const size_t ret = fn(_data[bg_index]);
const size_t ret = fn(_data[bg_index], std::forward<Args>(args)...);
if (!ret) {
return 0;
}
Expand All @@ -607,46 +564,17 @@ size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn& fn) {
}
}

const size_t ret2 = fn(_data[bg_index]);
const size_t ret2 = fn(_data[bg_index], std::forward<Args>(args)...);
CHECK_EQ(ret2, ret) << "index=" << _index.load(butil::memory_order_relaxed);
return ret2;
}

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename Arg1>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn& fn, const Arg1& arg1) {
Closure1<Fn, Arg1> c(fn, arg1);
return Modify(c);
}

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename Arg1, typename Arg2>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(
Fn& fn, const Arg1& arg1, const Arg2& arg2) {
Closure2<Fn, Arg1, Arg2> c(fn, arg1, arg2);
return Modify(c);
}

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ModifyWithForeground(Fn& fn) {
WithFG0<Fn> c(fn, _data);
return Modify(c);
}

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename Arg1>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ModifyWithForeground(Fn& fn, const Arg1& arg1) {
WithFG1<Fn, Arg1> c(fn, _data, arg1);
return Modify(c);
}

template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename Arg1, typename Arg2>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ModifyWithForeground(
Fn& fn, const Arg1& arg1, const Arg2& arg2) {
WithFG2<Fn, Arg1, Arg2> c(fn, _data, arg1, arg2);
return Modify(c);
template <typename Fn, typename... Args>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ModifyWithForeground(Fn&& fn, Args&&... args) {
return Modify([this, &fn](T& bg, Args&&... args) {
return fn(bg, (const T&)_data[&bg == _data], std::forward<Args>(args)...);
}, std::forward<Args>(args)...);
}

} // namespace butil
Expand Down
6 changes: 3 additions & 3 deletions src/butil/type_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ template <class T> struct is_pod
std::is_trivial<T>::value)> {};
#else
template <class T> struct is_pod : std::is_pod<T> {};
#endif
#endif // __cplusplus

#else
// We can't get is_pod right without compiler help, so fail conservatively.
Expand Down Expand Up @@ -334,7 +334,7 @@ template<typename T>
struct remove_cvref {
typedef typename remove_cv<typename remove_reference<T>::type>::type type;
};
#endif
#endif // __cplusplus

// is_reference is false except for reference types.
template<typename T> struct is_reference : false_type {};
Expand Down Expand Up @@ -378,7 +378,7 @@ template <typename F>
using result_of = std::result_of<F>;
#else
#error Only C++11 or later is supported.
#endif
#endif // __cplusplus

template <typename F>
using result_of_t = typename result_of<F>::type;
Expand Down
25 changes: 24 additions & 1 deletion test/brpc_load_balancer_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ bool AddN(Foo& f, int n) {
return true;
}

void read_cb(const Foo& f) {
ASSERT_EQ(0, f.x);
}

struct CallableObj {
void operator()(const Foo& f) {
ASSERT_EQ(0, f.x);
}
};

template <typename DBD>
void test_doubly_buffered_data() {
// test doubly_buffered_data TLS limits
Expand All @@ -97,17 +107,30 @@ void test_doubly_buffered_data() {
ASSERT_EQ(0, d.Read(&ptr));
ASSERT_EQ(0, ptr->x);
}
{
ASSERT_EQ(0, d.Read([](const Foo& f) {
ASSERT_EQ(0, f.x);
}));
ASSERT_EQ(0, d.Read(read_cb));
ASSERT_EQ(0, d.Read(CallableObj()));
CallableObj co;
ASSERT_EQ(0, d.Read(co));
}
{
typename DBD::ScopedPtr ptr;
ASSERT_EQ(0, d.Read(&ptr));
ASSERT_EQ(0, ptr->x);
}

d.Modify(AddN, 10);
d.Modify([](Foo& f, int n) -> size_t {
f.x += n;
return 1;
}, 10);
{
typename DBD::ScopedPtr ptr;
ASSERT_EQ(0, d.Read(&ptr));
ASSERT_EQ(10, ptr->x);
ASSERT_EQ(20, ptr->x);
}
}

Expand Down