From 01e78b60091b3b2a7583bc4ba1a8a8699ebf39a5 Mon Sep 17 00:00:00 2001 From: hetao Date: Fri, 17 Jan 2020 18:29:09 +0800 Subject: [PATCH 1/2] rwlock implementation based on atomic and butex --- src/bthread/bthread.h | 1 + src/bthread/rwlock.cpp | 211 ++++++++++++++++++++++++++ src/bthread/rwlock.h | 132 ++++++++++++++++ src/bthread/types.h | 9 ++ test/bthread_brpc_rwlock_unittest.cpp | 200 ++++++++++++++++++++++++ 5 files changed, 553 insertions(+) create mode 100644 src/bthread/rwlock.cpp create mode 100644 src/bthread/rwlock.h create mode 100644 test/bthread_brpc_rwlock_unittest.cpp diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h index c5fa4c6d27..6753d54117 100644 --- a/src/bthread/bthread.h +++ b/src/bthread/bthread.h @@ -30,6 +30,7 @@ #if defined(__cplusplus) # include # include "bthread/mutex.h" // use bthread_mutex_t in the RAII way +# include "bthread/rwlock.h" // use bthread_rwlock_t in the RAII way #endif #include "bthread/id.h" diff --git a/src/bthread/rwlock.cpp b/src/bthread/rwlock.cpp new file mode 100644 index 0000000000..fd72fd5626 --- /dev/null +++ b/src/bthread/rwlock.cpp @@ -0,0 +1,211 @@ +#include +#include +#include // dlsym +#include // O_RDONLY +#include "butil/atomicops.h" +#include "bvar/bvar.h" +#include "bvar/collector.h" +#include "butil/macros.h" // BAIDU_CASSERT +#include "butil/containers/flat_map.h" +#include "butil/iobuf.h" +#include "butil/fd_guard.h" +#include "butil/files/file.h" +#include "butil/files/file_path.h" +#include "butil/file_util.h" +#include "butil/unique_ptr.h" +#include "butil/third_party/murmurhash3/murmurhash3.h" +#include "butil/logging.h" +#include "butil/object_pool.h" +#include "bthread/butex.h" // butex_* +#include "bthread/processor.h" // cpu_relax, barrier +#include "bthread/bthread.h" +#include "bthread/sys_futex.h" +#include "bthread/log.h" + +/* +这是一个写优先和优化读性能的rwlock实现 +1,读请求发现当前有写请求的时候,不管是否已经获得写锁,读请求都要等待; +2,没有写请求竞争的时候,读锁的效率是很高的,读锁之间基本上没有太多竞争, + 只需要对一个读计数的原子变量进行加减,这样在读锁获得和释放的路径上 + 连一个CAS操作都没有。 +Author: hetaofirst@163.com +2020-01-17 +*/ + +namespace bthread { +//写锁标记 +const unsigned RWLOCK_WLOCKED = 1; +//读等标记 +const unsigned RWLOCK_RWAIT = 2; +//由于使用unsigned原子变量的低2位作为标记,那么计数就需要偏移 +const unsigned RWLOCK_SHIFT = 2; + +inline int rwlock_wlock(bthread_rwlock_t* rwlock) { + butil::atomic* wc_rwait = + (butil::atomic*)rwlock->wc_rwait; + butil::atomic* whole = + (butil::atomic*)rwlock->rc_wlock; + //增加写计数 + wc_rwait->fetch_add(1 << RWLOCK_SHIFT, butil::memory_order_acquire); + for(;;) { + unsigned r = whole->load(butil::memory_order_relaxed); + if(r != 0) { + // 说明此时有读锁或者写锁 + if(bthread::butex_wait(whole, r, NULL) < 0 && + errno != EWOULDBLOCK && errno != EINTR) { + wc_rwait->fetch_sub(1 << RWLOCK_SHIFT, butil::memory_order_relaxed); + LOG(ERROR) << "wlock wait error, " << r; + return errno; + } + } + //尝试获得写锁 + else if(whole->compare_exchange_weak(r, r | RWLOCK_WLOCKED, butil::memory_order_acquire)) { + return 0; + } + } +} + + +inline int rwlock_rlock(bthread_rwlock_t* rwlock) { + butil::atomic* wc_rwait = + (butil::atomic*)rwlock->wc_rwait; + for(;;) { + //写优先处理,如果发现当前有写请求,则等待 + unsigned w = wc_rwait->load(butil::memory_order_relaxed); + if( (w >> RWLOCK_SHIFT) > 0) { + //设置标记后等待 + w = wc_rwait->fetch_or(RWLOCK_RWAIT, butil::memory_order_acquire) | RWLOCK_RWAIT; + if((w >> RWLOCK_SHIFT) > 0) { + if(bthread::butex_wait(wc_rwait, w, NULL) && + errno != EWOULDBLOCK && errno != EINTR) { + LOG(ERROR) << "rlock wait error1, " << w; + return errno; + } + } + } + else { + break; + } + } + butil::atomic* whole = + (butil::atomic*)rwlock->rc_wlock; + //不考虑在读锁中使用CAS, 直接加读计数,最大化读锁性能 + //所以下面需要判断写锁标记并等待 + unsigned r = whole->fetch_add(1 << RWLOCK_SHIFT, butil::memory_order_acquire); + if((r & RWLOCK_WLOCKED) == 0) { + return 0; + } + for(;;) { + r = whole->load(butil::memory_order_relaxed); + if((r & RWLOCK_WLOCKED)==0) { + return 0; + } + else { + if(bthread::butex_wait(whole, r, NULL) < 0 && + errno != EWOULDBLOCK && errno != EINTR) { + whole->fetch_sub(1 << RWLOCK_SHIFT, butil::memory_order_relaxed); + LOG(ERROR) << "rlock wait error2, " << r; + return errno; + } + } + } + +} + +inline int rwlock_unrlock(bthread_rwlock_t* rwlock) { + butil::atomic* whole = + (butil::atomic*)rwlock->rc_wlock; + //没有写竞争的时候,fetch_sub效率严重落后load! + //减少读锁计数 + unsigned r = whole->fetch_sub(1 << RWLOCK_SHIFT, butil::memory_order_release) + - (1 << RWLOCK_SHIFT); + if((r >> RWLOCK_SHIFT) == 0) { + butil::atomic* wc_rwait = + (butil::atomic*)rwlock->wc_rwait; + if(wc_rwait->load(butil::memory_order_relaxed) > 0) { + //写计数大于0,才可能是有写等待 + //unlock读锁只需要考虑唤醒写等 + bthread::butex_wake(whole); + } + } + return 0; +} + + +inline int rwlock_unwlock(bthread_rwlock_t* rwlock) { + butil::atomic* whole = + (butil::atomic*)rwlock->rc_wlock; + //清除写锁标记 + unsigned r = whole->fetch_and(~RWLOCK_WLOCKED, butil::memory_order_release); + + butil::atomic* wc_rwait = + (butil::atomic*)rwlock->wc_rwait; + //减少写计数 + unsigned w = wc_rwait->fetch_sub(1 << RWLOCK_SHIFT, butil::memory_order_release) + - (1 << RWLOCK_SHIFT); + //是否需要唤醒whole + //有读等或者写等 + if((r >> RWLOCK_SHIFT) != 0) { + //此处的读等优先唤醒,会卡住其他读写 + bthread::butex_wake_all(whole); + } + else if((w >> RWLOCK_SHIFT) != 0) { + //还有其他写等,唤醒一个即可 + bthread::butex_wake(whole); + } + + //check是否需要唤醒第一处的读等 + if((w >> RWLOCK_SHIFT) == 0 && (w & RWLOCK_RWAIT) != 0) { + //有读等,清空标记后唤醒 + wc_rwait->fetch_and(~RWLOCK_RWAIT, butil::memory_order_relaxed); + bthread::butex_wake_all(wc_rwait); + } + return 0; +} + + +inline int rwlock_unlock(bthread_rwlock_t* rwlock) { + //判断写标记即可 + butil::atomic* whole = + (butil::atomic*)rwlock->rc_wlock; + if ((whole->load(butil::memory_order_relaxed) & RWLOCK_WLOCKED) != 0) { + return rwlock_unwlock(rwlock); + } else { + return rwlock_unrlock(rwlock); + } +} + + +} // namespace bthread + +extern "C" { + +int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock, + const bthread_rwlockattr_t* __restrict attr) { + rwlock->wc_rwait = bthread::butex_create_checked(); + rwlock->rc_wlock = bthread::butex_create_checked(); + if (!rwlock->wc_rwait || !rwlock->rc_wlock) { + LOG(ERROR) << "no memory"; + return ENOMEM; + } + *rwlock->wc_rwait = 0; + *rwlock->rc_wlock = 0; + return 0; +} + +int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) { + bthread::butex_destroy(rwlock->wc_rwait); + bthread::butex_destroy(rwlock->rc_wlock); + return 0; +} + +int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_rlock(rwlock); } + +int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_wlock(rwlock); } + +int bthread_rwlock_unrlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unrlock(rwlock); } + +int bthread_rwlock_unwlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unwlock(rwlock); } + +int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unlock(rwlock); } +} \ No newline at end of file diff --git a/src/bthread/rwlock.h b/src/bthread/rwlock.h new file mode 100644 index 0000000000..ad2ae956f0 --- /dev/null +++ b/src/bthread/rwlock.h @@ -0,0 +1,132 @@ +#ifndef BTHREAD_RW_MUTEX_H +#define BTHREAD_RW_MUTEX_H + +#include "bthread/types.h" +#include "butil/scoped_lock.h" +#include "bvar/utils/lock_timer.h" +#include "bthread/bthread.h" + +__BEGIN_DECLS +// ------------------------------------------- +// Functions for handling read-write locks. +// ------------------------------------------- + +// Initialize read-write lock `rwlock' using attributes `attr', or use +// the default values if later is NULL. +extern int bthread_rwlock_init( + bthread_rwlock_t* __restrict rwlock, const bthread_rwlockattr_t* __restrict attr); + +// Destroy read-write lock `rwlock'. +extern int bthread_rwlock_destroy(bthread_rwlock_t* rwlock); + +// Acquire read lock for `rwlock'. +extern int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock); + +// Try to acquire read lock for `rwlock'. +extern int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock); + +// Try to acquire read lock for `rwlock' or return after specfied time. +extern int bthread_rwlock_timedrdlock( + bthread_rwlock_t* __restrict rwlock, const struct timespec* __restrict abstime); + +// Acquire write lock for `rwlock'. +extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock); + +// Try to acquire write lock for `rwlock'. +extern int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock); + +// Try to acquire write lock for `rwlock' or return after specfied time. +extern int bthread_rwlock_timedwrlock( + bthread_rwlock_t* __restrict rwlock, const struct timespec* __restrict abstime); + +// Unlock `rwlock'. +extern int bthread_rwlock_unlock(bthread_rwlock_t* rwlock); + +extern int bthread_rwlock_unrlock(bthread_rwlock_t* rwlock); +extern int bthread_rwlock_unwlock(bthread_rwlock_t* rwlock); + +// --------------------------------------------------- +// Functions for handling read-write lock attributes. +// --------------------------------------------------- + +// Initialize attribute object `attr' with default values. +extern int bthread_rwlockattr_init(bthread_rwlockattr_t* attr); + +// Destroy attribute object `attr'. +extern int bthread_rwlockattr_destroy(bthread_rwlockattr_t* attr); + +// Return current setting of reader/writer preference. +extern int bthread_rwlockattr_getkind_np(const bthread_rwlockattr_t* attr, int* pref); + +// Set reader/write preference. +extern int bthread_rwlockattr_setkind_np(bthread_rwlockattr_t* attr, int pref); +__END_DECLS + + +// Specialize std::lock_guard and std::unique_lock for bthread_rwlock_t + +namespace bthread { + +class wlock_guard { +public: + explicit wlock_guard(bthread_rwlock_t& mutex) : _pmutex(&mutex) { +#if !defined(NDEBUG) + const int rc = bthread_rwlock_wrlock(_pmutex); + if (rc) { + LOG(FATAL) << "Fail to lock bthread_rwlock_t=" << _pmutex << ", " << berror(rc); + _pmutex = NULL; + } +#else + bthread_rwlock_wrlock(_pmutex); +#endif // NDEBUG + } + + ~wlock_guard() { +#ifndef NDEBUG + if (_pmutex) { + bthread_rwlock_unwlock(_pmutex); + } +#else + bthread_rwlock_unwlock(_pmutex); +#endif + } + +private: + DISALLOW_COPY_AND_ASSIGN(wlock_guard); + bthread_rwlock_t* _pmutex; +}; + +class rlock_guard { +public: + explicit rlock_guard(bthread_rwlock_t& mutex) : _pmutex(&mutex) { +#if !defined(NDEBUG) + const int rc = bthread_rwlock_rdlock(_pmutex); + if (rc) { + LOG(FATAL) << "Fail to lock bthread_rwlock_t=" << _pmutex << ", " << berror(rc); + _pmutex = NULL; + } +#else + bthread_rwlock_rdlock(_pmutex); +#endif // NDEBUG + } + + ~rlock_guard() { +#ifndef NDEBUG + if (_pmutex) { + bthread_rwlock_unrlock(_pmutex); + } +#else + bthread_rwlock_unrlock(_pmutex); +#endif + } + +private: + DISALLOW_COPY_AND_ASSIGN(rlock_guard); + bthread_rwlock_t* _pmutex; +}; + + +} // namespace bthread + + +#endif \ No newline at end of file diff --git a/src/bthread/types.h b/src/bthread/types.h index 8263ea9156..2bd6cff664 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -173,6 +173,15 @@ typedef struct { } bthread_condattr_t; typedef struct { + //高30位读计数和低2位写锁标记 + //加写锁标记的时候要求读计数为0, + //获得读锁的时候要加读计数且写锁标记为0 + //这些都要在一个原子操作中完成 + unsigned* rc_wlock; + //高30位写计数和低2位读等待标记 + //读请求看到写计数大于0,则需要等待 + //写锁释放的时候,看到读等标记需要唤醒 + unsigned* wc_rwait; } bthread_rwlock_t; typedef struct { diff --git a/test/bthread_brpc_rwlock_unittest.cpp b/test/bthread_brpc_rwlock_unittest.cpp new file mode 100644 index 0000000000..cf82a2226d --- /dev/null +++ b/test/bthread_brpc_rwlock_unittest.cpp @@ -0,0 +1,200 @@ +// Copyright (c) 2020 Bigo, Inc. +// Author: HeTao (hetao@bigo.sg) +// Date: Jan 06 2020 + +#include +#include "butil/compat.h" +#include "butil/time.h" +#include "butil/macros.h" +#include "butil/string_printf.h" +#include "butil/logging.h" +#include "bthread/bthread.h" +#include "bthread/butex.h" +#include "bthread/task_control.h" +#include "butil/gperftools_profiler.h" + +#include + +namespace { + +TEST(RwlockTest, sanity) { + bthread_rwlock_t m; + ASSERT_EQ(0, bthread_rwlock_init(&m, NULL)); + ASSERT_EQ(0, bthread_rwlock_rdlock(&m)); + ASSERT_EQ(0, bthread_rwlock_unrlock(&m)); + ASSERT_EQ(0, bthread_rwlock_wrlock(&m)); + ASSERT_EQ(0, bthread_rwlock_unwlock(&m)); + ASSERT_EQ(0, *(m.rc_wlock)); + ASSERT_EQ(0, *(m.wc_rwait)); + ASSERT_EQ(0, bthread_rwlock_destroy(&m)); +} + + + +bool g_started = false; +bool g_stopped = false; + +template +struct BAIDU_CACHELINE_ALIGNMENT PerfArgs { + Rwlock* rwlock; + int64_t counter; + int64_t elapse_ns; + bool ready; + int32_t op_type; /*0 for read,1 for write*/ + + PerfArgs() : rwlock(NULL), counter(0), elapse_ns(0), ready(false), op_type(0) {} +}; + +template +void* add_with_rwlock(void* void_arg) { + PerfArgs* args = (PerfArgs*)void_arg; + args->ready = true; + butil::Timer t; + while (!g_stopped) { + if (g_started) { + break; + } + bthread_usleep(1000); + } + t.start(); + while (!g_stopped) { + if(args->op_type == 0) { + // args->rwlock->Rlock(); + bthread_rwlock_rdlock(args->rwlock); + } + else { + // args->rwlock->Wlock(); + bthread_rwlock_wrlock(args->rwlock); + } + // args->rwlock->Unlock(); + bthread_rwlock_unlock(args->rwlock); + ++args->counter; + } + t.stop(); + args->elapse_ns = t.n_elapsed(); + return NULL; +} + +int g_prof_name_counter = 0; + +template + void PerfTest(Rwlock* rwlock, + ThreadId* /*dummy*/, + int thread_num, + const ThreadCreateFn& create_fn, + const ThreadJoinFn& join_fn, + int op_type=0 /*0 for read,1 for write*/) { + g_started = false; + g_stopped = false; + ThreadId threads[thread_num]; + std::vector > args(thread_num); + for (int i = 0; i < thread_num; ++i) { + args[i].rwlock = rwlock; + args[i].op_type = op_type; + create_fn(&threads[i], NULL, add_with_rwlock, &args[i]); + } + while (true) { + bool all_ready = true; + for (int i = 0; i < thread_num; ++i) { + if (!args[i].ready) { + all_ready = false; + break; + } + } + if (all_ready) { + break; + } + usleep(1000); + } + g_started = true; + char prof_name[32]; + snprintf(prof_name, sizeof(prof_name), "rdlock_perf_%d.prof", ++g_prof_name_counter); + ProfilerStart(prof_name); + usleep(500 * 1000); + ProfilerStop(); + g_stopped = true; + int64_t wait_time = 0; + int64_t count = 0; + for (int i = 0; i < thread_num; ++i) { + join_fn(threads[i], NULL); + wait_time += args[i].elapse_ns; + count += args[i].counter; + } + LOG(INFO) << butil::class_name() << (op_type==0?" readlock ":" writelock ") << " in " + << ((void*)create_fn == (void*)pthread_create ? "pthread" : "bthread") + << " thread_num=" << thread_num + << " count=" << count + << " average_time=" << wait_time / (double)count; +} + + +TEST(RWLockTest, performance) { + const int thread_num = 12; + bthread_rwlock_t brw; + bthread_rwlock_init(&brw, NULL); + //rlock + PerfTest(&brw, (pthread_t*)NULL, thread_num, pthread_create, pthread_join); + PerfTest(&brw, (bthread_t*)NULL, thread_num, bthread_start_background, bthread_join); + + //add test 1 rlock for compare + PerfTest(&brw, (pthread_t*)NULL, 1, pthread_create, pthread_join); + PerfTest(&brw, (bthread_t*)NULL, 1, bthread_start_background, bthread_join); + + //for wlock + PerfTest(&brw, (pthread_t*)NULL, thread_num, pthread_create, pthread_join, 1); + PerfTest(&brw, (bthread_t*)NULL, thread_num, bthread_start_background, bthread_join, 1); + + //add test 1 wlock for compare + PerfTest(&brw, (pthread_t*)NULL, 1, pthread_create, pthread_join, 1); + PerfTest(&brw, (bthread_t*)NULL, 1, bthread_start_background, bthread_join, 1); +} + +void* loop_until_stopped(void* arg) { + bthread_rwlock_t *m = (bthread_rwlock_t*)arg; + while (!g_stopped) { + int r = rand() % 100; + if((r&1)==0) + { + bthread::rlock_guard rg(*m); + } + else{ + bthread::wlock_guard wg(*m); + } + bthread_usleep(20); + } + return NULL; +} + +TEST(RwlockTest, mix_thread_types) { + g_stopped = false; + const int N = 16; + const int M = N * 2; + // bthread::Mutex m; + bthread_rwlock_t brw; + bthread_rwlock_init(&brw, NULL); + + pthread_t pthreads[N]; + bthread_t bthreads[M]; + // reserve enough workers for test. This is a must since we have + // BTHREAD_ATTR_PTHREAD bthreads which may cause deadlocks (the + // bhtread_usleep below can't be scheduled and g_stopped is never + // true, thus loop_until_stopped spins forever) + bthread_setconcurrency(M); + for (int i = 0; i < N; ++i) { + ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped, &brw)); + } + for (int i = 0; i < M; ++i) { + const bthread_attr_t *attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD; + ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, loop_until_stopped, &brw)); + } + bthread_usleep(1000L * 1000); + g_stopped = true; + for (int i = 0; i < M; ++i) { + bthread_join(bthreads[i], NULL); + } + for (int i = 0; i < N; ++i) { + pthread_join(pthreads[i], NULL); + } +} +} // namespace From 8225319ab8688ad28357e143b67bb1b2e9911460 Mon Sep 17 00:00:00 2001 From: hetao Date: Fri, 17 Jan 2020 18:29:09 +0800 Subject: [PATCH 2/2] rwlock implementation based on atomic and butex --- test/BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/test/BUILD b/test/BUILD index c4649ab123..ed2c06c9ec 100644 --- a/test/BUILD +++ b/test/BUILD @@ -235,6 +235,7 @@ cc_test( "bthread_dispatcher_unittest.cpp", "bthread_fd_unittest.cpp", "bthread_mutex_unittest.cpp", + "bthread_brpc_rwlock_unittest.cpp", "bthread_setconcurrency_unittest.cpp", # glog CHECK die with a fatal error "bthread_key_unittest.cpp"