From 13d4bd81450468c8cc30defdd23263948514eb3f Mon Sep 17 00:00:00 2001 From: zhanghao Date: Sun, 19 Jan 2020 12:16:49 +0800 Subject: [PATCH 1/5] add support for rwlock --- src/bthread/bthread.h | 1 + src/bthread/rwlock.cpp | 163 +++++++++++++++++++++ src/bthread/rwlock.h | 130 +++++++++++++++++ src/bthread/types.h | 2 + test/bthread_brpc_rwlock_unittest.cpp | 198 ++++++++++++++++++++++++++ 5 files changed, 494 insertions(+) create mode 100644 src/bthread/rwlock.cpp create mode 100644 src/bthread/rwlock.h create mode 100644 test/bthread_brpc_rwlock_unittest.cpp diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h index c5fa4c6d27..30b43a3458 100644 --- a/src/bthread/bthread.h +++ b/src/bthread/bthread.h @@ -30,6 +30,7 @@ #if defined(__cplusplus) # include # include "bthread/mutex.h" // use bthread_mutex_t in the RAII way +# include "bthread/rwlock.h" // use bthread_rwlock_t in the RAII way #endif #include "bthread/id.h" diff --git a/src/bthread/rwlock.cpp b/src/bthread/rwlock.cpp new file mode 100644 index 0000000000..726c5a4748 --- /dev/null +++ b/src/bthread/rwlock.cpp @@ -0,0 +1,163 @@ +#include +#include +#include // dlsym +#include // O_RDONLY +#include "butil/atomicops.h" +#include "bvar/bvar.h" +#include "bvar/collector.h" +#include "butil/macros.h" // BAIDU_CASSERT +#include "butil/containers/flat_map.h" +#include "butil/iobuf.h" +#include "butil/fd_guard.h" +#include "butil/files/file.h" +#include "butil/files/file_path.h" +#include "butil/file_util.h" +#include "butil/unique_ptr.h" +#include "butil/third_party/murmurhash3/murmurhash3.h" +#include "butil/logging.h" +#include "butil/object_pool.h" +#include "bthread/butex.h" // butex_* +#include "bthread/processor.h" // cpu_relax, barrier +#include "bthread/bthread.h" +#include "bthread/sys_futex.h" +#include "bthread/log.h" + + +namespace bthread { + +inline int rwlock_unrlock(bthread_rwlock_t* rwlock) { + butil::atomic* whole = + (butil::atomic*)rwlock->lock_flag; + + while(1) { + unsigned r = whole->load(); + if(r==0 || (r>>31) != 0) { + LOG(ERROR) << "wrong unrlock!"; + return 0; + } + if(!(whole->compare_exchange_weak(r, r-1))) { + continue; + } + //wake up write waiter + bthread::butex_wake(whole); + return 0; + } + +} + + +inline int rwlock_unwlock(bthread_rwlock_t* rwlock) { + butil::atomic* whole = + (butil::atomic*)rwlock->lock_flag; + + while(1) { + unsigned r = whole->load(); + if(r != (unsigned)(1<<31) ) { + LOG(ERROR) << "wrong unwlock!"; + return 0; + } + if(!whole->compare_exchange_weak(r, 0)) { + continue; + } + //wake up write waiter first + bthread::butex_wake(whole); + butil::atomic* w_wait_count = (butil::atomic*)rwlock->w_wait_count; + //try reduce wait_count for read waiters,and wake up read waiters + w_wait_count->fetch_sub(1); + bthread::butex_wake_all(w_wait_count); + return 0; + } + +} + + +inline int rwlock_unlock(bthread_rwlock_t* rwlock) { + butil::atomic* whole = + (butil::atomic*)rwlock->lock_flag; + if ((whole->load(butil::memory_order_relaxed) >> 31) != 0) { + return rwlock_unwlock(rwlock); + } else { + return rwlock_unrlock(rwlock); + } +} + +inline int rwlock_rlock(bthread_rwlock_t* rwlock) { + butil::atomic* whole = + (butil::atomic*)rwlock->lock_flag; + + butil::atomic* w_wait_count = (butil::atomic*)rwlock->w_wait_count; + while (1) { + unsigned w = w_wait_count->load(); + if( w > 0) { + if(bthread::butex_wait(w_wait_count, w, NULL) < 0 && + errno != EWOULDBLOCK && errno != EINTR) { + return errno; + } + continue; + } + //FIXME!! we don't consider read_wait_count overflow yet,2^31 should be enough here + unsigned r = whole->load(); + if((r >> 31) == 0) { + if(whole->compare_exchange_weak(r, r + 1)) { + return 0; + } + } + } + +} + +inline int rwlock_wlock(bthread_rwlock_t* rwlock) { + butil::atomic* w_wait_count = (butil::atomic*)rwlock->w_wait_count; + butil::atomic* whole = (butil::atomic*)rwlock->lock_flag; + //we don't consider w_wait_count overflow yet,2^32 should be enough here + w_wait_count->fetch_add(1); + while(1) { + unsigned r = whole->load(); + if(r != 0) { + if(bthread::butex_wait(whole, r, NULL) < 0 && + errno != EWOULDBLOCK && errno != EINTR) { + whole->fetch_sub(1); + return errno; + } + continue; + } + if(whole->compare_exchange_weak(r, (unsigned)(1<<31) )) { + return 0; + } + } +} + +} // namespace bthread + +extern "C" { + +int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock, + const bthread_rwlockattr_t* __restrict attr) { + rwlock->w_wait_count = bthread::butex_create_checked(); + rwlock->lock_flag = + bthread::butex_create_checked(); + if (!rwlock->w_wait_count || !rwlock->lock_flag) { + LOG(ERROR) << "no memory"; + return ENOMEM; + } + *rwlock->w_wait_count = 0; + *rwlock->lock_flag = 0; + return 0; +} + +int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) { + bthread::butex_destroy(rwlock->w_wait_count); + bthread::butex_destroy(rwlock->lock_flag); + return 0; +} + +int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_rlock(rwlock); } + +int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_wlock(rwlock); } + +int bthread_rwlock_unrlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unrlock(rwlock); } + +int bthread_rwlock_unwlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unwlock(rwlock); } + +int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unlock(rwlock); } +} diff --git a/src/bthread/rwlock.h b/src/bthread/rwlock.h new file mode 100644 index 0000000000..786df3bcf4 --- /dev/null +++ b/src/bthread/rwlock.h @@ -0,0 +1,130 @@ +#ifndef BTHREAD_RW_MUTEX_H +#define BTHREAD_RW_MUTEX_H + +#include "bthread/types.h" +#include "butil/scoped_lock.h" +#include "bvar/utils/lock_timer.h" +#include "bthread/bthread.h" + +__BEGIN_DECLS +// ------------------------------------------- +// Functions for handling read-write locks. +// ------------------------------------------- + +// Initialize read-write lock `rwlock' using attributes `attr', or use +// the default values if later is NULL. +extern int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock, + const bthread_rwlockattr_t* __restrict attr); + +// Destroy read-write lock `rwlock'. +extern int bthread_rwlock_destroy(bthread_rwlock_t* rwlock); + +// Acquire read lock for `rwlock'. +extern int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock); + +// Try to acquire read lock for `rwlock'. +extern int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock); + +// Try to acquire read lock for `rwlock' or return after specfied time. +extern int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock, + const struct timespec* __restrict abstime); + +// Acquire write lock for `rwlock'. +extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock); + +// Try to acquire write lock for `rwlock'. +extern int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock); + +// Try to acquire write lock for `rwlock' or return after specfied time. +extern int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock, + const struct timespec* __restrict abstime); + +// Unlock `rwlock'. +extern int bthread_rwlock_unlock(bthread_rwlock_t* rwlock); + + +// --------------------------------------------------- +// Functions for handling read-write lock attributes. +// --------------------------------------------------- + +// Initialize attribute object `attr' with default values. +extern int bthread_rwlockattr_init(bthread_rwlockattr_t* attr); + +// Destroy attribute object `attr'. +extern int bthread_rwlockattr_destroy(bthread_rwlockattr_t* attr); + +// Return current setting of reader/writer preference. +extern int bthread_rwlockattr_getkind_np(const bthread_rwlockattr_t* attr, int* pref); + +// Set reader/write preference. +extern int bthread_rwlockattr_setkind_np(bthread_rwlockattr_t* attr, int pref); +__END_DECLS + + +// Specialize std::lock_guard and std::unique_lock for bthread_rwlock_t + +namespace bthread { + +class wlock_guard { +public: + explicit wlock_guard(bthread_rwlock_t& mutex) : _pmutex(&mutex) { +#if !defined(NDEBUG) + const int rc = bthread_rwlock_wrlock(_pmutex); + if (rc) { + LOG(FATAL) << "Fail to lock bthread_rwlock_t=" << _pmutex << ", " << berror(rc); + _pmutex = NULL; + } +#else + bthread_rwlock_wrlock(_pmutex); +#endif // NDEBUG + } + + ~wlock_guard() { +#ifndef NDEBUG + if (_pmutex) { + bthread_rwlock_unlock(_pmutex); + } +#else + bthread_rwlock_unlock(_pmutex); +#endif + } + +private: + DISALLOW_COPY_AND_ASSIGN(wlock_guard); + bthread_rwlock_t* _pmutex; +}; + +class rlock_guard { +public: + explicit rlock_guard(bthread_rwlock_t& mutex) : _pmutex(&mutex) { +#if !defined(NDEBUG) + const int rc = bthread_rwlock_rdlock(_pmutex); + if (rc) { + LOG(FATAL) << "Fail to lock bthread_rwlock_t=" << _pmutex << ", " << berror(rc); + _pmutex = NULL; + } +#else + bthread_rwlock_rdlock(_pmutex); +#endif // NDEBUG + } + + ~rlock_guard() { +#ifndef NDEBUG + if (_pmutex) { + bthread_rwlock_unlock(_pmutex); + } +#else + bthread_rwlock_unlock(_pmutex); +#endif + } + +private: + DISALLOW_COPY_AND_ASSIGN(rlock_guard); + bthread_rwlock_t* _pmutex; +}; + + +} // namespace bthread + + +#endif diff --git a/src/bthread/types.h b/src/bthread/types.h index 8263ea9156..80acfd8c16 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -173,6 +173,8 @@ typedef struct { } bthread_condattr_t; typedef struct { + unsigned* w_wait_count; //include the bthread who holding wlock yet + unsigned* lock_flag; //highest bit 1 for wlocked, low 31 bit for read lock holding count, 0 for unlocked } bthread_rwlock_t; typedef struct { diff --git a/test/bthread_brpc_rwlock_unittest.cpp b/test/bthread_brpc_rwlock_unittest.cpp new file mode 100644 index 0000000000..2a94d65820 --- /dev/null +++ b/test/bthread_brpc_rwlock_unittest.cpp @@ -0,0 +1,198 @@ +// Copyright (c) 2020 Bigo, Inc. +// Author: HeTao (hetao@bigo.sg) +// Date: Jan 06 2020 + +#include +#include "butil/compat.h" +#include "butil/time.h" +#include "butil/macros.h" +#include "butil/string_printf.h" +#include "butil/logging.h" +#include "bthread/bthread.h" +#include "bthread/butex.h" +#include "bthread/task_control.h" +#include "butil/gperftools_profiler.h" + +#include + +namespace { + +TEST(RwlockTest, sanity) { + bthread_rwlock_t m; + ASSERT_EQ(0, bthread_rwlock_init(&m, NULL)); + ASSERT_EQ(0, bthread_rwlock_rdlock(&m)); + ASSERT_EQ(0, bthread_rwlock_unlock(&m)); + ASSERT_EQ(0, bthread_rwlock_wrlock(&m)); + ASSERT_EQ(0, bthread_rwlock_unlock(&m)); + ASSERT_EQ(0, bthread_rwlock_destroy(&m)); +} + + + +bool g_started = false; +bool g_stopped = false; + +template +struct BAIDU_CACHELINE_ALIGNMENT PerfArgs { + Rwlock* rwlock; + int64_t counter; + int64_t elapse_ns; + bool ready; + int32_t op_type; /*0 for read,1 for write*/ + + PerfArgs() : rwlock(NULL), counter(0), elapse_ns(0), ready(false), op_type(0) {} +}; + +template +void* add_with_rwlock(void* void_arg) { + PerfArgs* args = (PerfArgs*)void_arg; + args->ready = true; + butil::Timer t; + while (!g_stopped) { + if (g_started) { + break; + } + bthread_usleep(1000); + } + t.start(); + while (!g_stopped) { + if(args->op_type == 0) { + // args->rwlock->Rlock(); + bthread_rwlock_rdlock(args->rwlock); + } + else { + // args->rwlock->Wlock(); + bthread_rwlock_wrlock(args->rwlock); + } + // args->rwlock->Unlock(); + bthread_rwlock_unlock(args->rwlock); + ++args->counter; + } + t.stop(); + args->elapse_ns = t.n_elapsed(); + return NULL; +} + +int g_prof_name_counter = 0; + +template + void PerfTest(Rwlock* rwlock, + ThreadId* /*dummy*/, + int thread_num, + const ThreadCreateFn& create_fn, + const ThreadJoinFn& join_fn, + int op_type=0 /*0 for read,1 for write*/) { + g_started = false; + g_stopped = false; + ThreadId threads[thread_num]; + std::vector > args(thread_num); + for (int i = 0; i < thread_num; ++i) { + args[i].rwlock = rwlock; + args[i].op_type = op_type; + create_fn(&threads[i], NULL, add_with_rwlock, &args[i]); + } + while (true) { + bool all_ready = true; + for (int i = 0; i < thread_num; ++i) { + if (!args[i].ready) { + all_ready = false; + break; + } + } + if (all_ready) { + break; + } + usleep(1000); + } + g_started = true; + char prof_name[32]; + snprintf(prof_name, sizeof(prof_name), "rdlock_perf_%d.prof", ++g_prof_name_counter); + ProfilerStart(prof_name); + usleep(500 * 1000); + ProfilerStop(); + g_stopped = true; + int64_t wait_time = 0; + int64_t count = 0; + for (int i = 0; i < thread_num; ++i) { + join_fn(threads[i], NULL); + wait_time += args[i].elapse_ns; + count += args[i].counter; + } + LOG(INFO) << butil::class_name() << (op_type==0?" readlock ":" writelock ") << " in " + << ((void*)create_fn == (void*)pthread_create ? "pthread" : "bthread") + << " thread_num=" << thread_num + << " count=" << count + << " average_time=" << wait_time / (double)count; +} + + +TEST(RWLockTest, performance) { + const int thread_num = 12; + bthread_rwlock_t brw; + bthread_rwlock_init(&brw, NULL); + //rlock + PerfTest(&brw, (pthread_t*)NULL, thread_num, pthread_create, pthread_join); + PerfTest(&brw, (bthread_t*)NULL, thread_num, bthread_start_background, bthread_join); + + //add test 1 rlock for compare + PerfTest(&brw, (pthread_t*)NULL, 1, pthread_create, pthread_join); + PerfTest(&brw, (bthread_t*)NULL, 1, bthread_start_background, bthread_join); + + //for wlock + PerfTest(&brw, (pthread_t*)NULL, thread_num, pthread_create, pthread_join, 1); + PerfTest(&brw, (bthread_t*)NULL, thread_num, bthread_start_background, bthread_join, 1); + + //add test 1 wlock for compare + PerfTest(&brw, (pthread_t*)NULL, 1, pthread_create, pthread_join, 1); + PerfTest(&brw, (bthread_t*)NULL, 1, bthread_start_background, bthread_join, 1); +} + +void* loop_until_stopped(void* arg) { + bthread_rwlock_t *m = (bthread_rwlock_t*)arg; + while (!g_stopped) { + int r = rand() % 100; + if((r&1)==0) + { + bthread::rlock_guard rg(*m); + } + else{ + bthread::wlock_guard wg(*m); + } + bthread_usleep(20); + } + return NULL; +} + +TEST(RwlockTest, mix_thread_types) { + g_stopped = false; + const int N = 16; + const int M = N * 2; + // bthread::Mutex m; + bthread_rwlock_t brw; + bthread_rwlock_init(&brw, NULL); + + pthread_t pthreads[N]; + bthread_t bthreads[M]; + // reserve enough workers for test. This is a must since we have + // BTHREAD_ATTR_PTHREAD bthreads which may cause deadlocks (the + // bhtread_usleep below can't be scheduled and g_stopped is never + // true, thus loop_until_stopped spins forever) + bthread_setconcurrency(M); + for (int i = 0; i < N; ++i) { + ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped, &brw)); + } + for (int i = 0; i < M; ++i) { + const bthread_attr_t *attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD; + ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, loop_until_stopped, &brw)); + } + bthread_usleep(1000L * 1000); + g_stopped = true; + for (int i = 0; i < M; ++i) { + bthread_join(bthreads[i], NULL); + } + for (int i = 0; i < N; ++i) { + pthread_join(pthreads[i], NULL); + } +} +} // namespace From 0b89c30fa9a53c067f96f7f24fdc9cec6494242c Mon Sep 17 00:00:00 2001 From: zhanghao Date: Sun, 19 Jan 2020 17:35:06 +0800 Subject: [PATCH 2/5] add ut build for rwlock --- test/BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/test/BUILD b/test/BUILD index c4649ab123..6e55001248 100644 --- a/test/BUILD +++ b/test/BUILD @@ -235,6 +235,7 @@ cc_test( "bthread_dispatcher_unittest.cpp", "bthread_fd_unittest.cpp", "bthread_mutex_unittest.cpp", + "bthread_rwlock_unittest.cpp", "bthread_setconcurrency_unittest.cpp", # glog CHECK die with a fatal error "bthread_key_unittest.cpp" From 6db436900fd715041e3b5bbf39ffef048548ca85 Mon Sep 17 00:00:00 2001 From: zhanghao Date: Sun, 19 Jan 2020 18:23:36 +0800 Subject: [PATCH 3/5] rename perf name in rwlock ut --- test/bthread_brpc_rwlock_unittest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/bthread_brpc_rwlock_unittest.cpp b/test/bthread_brpc_rwlock_unittest.cpp index 2a94d65820..f5ce3fb052 100644 --- a/test/bthread_brpc_rwlock_unittest.cpp +++ b/test/bthread_brpc_rwlock_unittest.cpp @@ -107,7 +107,7 @@ template Date: Sun, 19 Jan 2020 18:30:01 +0800 Subject: [PATCH 4/5] fix wrong ut name for rwlock --- test/BUILD | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/BUILD b/test/BUILD index 6e55001248..ed2c06c9ec 100644 --- a/test/BUILD +++ b/test/BUILD @@ -235,7 +235,7 @@ cc_test( "bthread_dispatcher_unittest.cpp", "bthread_fd_unittest.cpp", "bthread_mutex_unittest.cpp", - "bthread_rwlock_unittest.cpp", + "bthread_brpc_rwlock_unittest.cpp", "bthread_setconcurrency_unittest.cpp", # glog CHECK die with a fatal error "bthread_key_unittest.cpp" From 9d1a2b240160ccb487ee4fd8c6f7d6b8646ed15d Mon Sep 17 00:00:00 2001 From: zhanghao Date: Wed, 24 Aug 2022 14:59:05 +0800 Subject: [PATCH 5/5] fix wlock bug cases when butex_wait fail --- src/bthread/rwlock.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/bthread/rwlock.cpp b/src/bthread/rwlock.cpp index 726c5a4748..e0b090be7b 100644 --- a/src/bthread/rwlock.cpp +++ b/src/bthread/rwlock.cpp @@ -116,7 +116,8 @@ inline int rwlock_wlock(bthread_rwlock_t* rwlock) { if(r != 0) { if(bthread::butex_wait(whole, r, NULL) < 0 && errno != EWOULDBLOCK && errno != EINTR) { - whole->fetch_sub(1); + w_wait_count->fetch_sub(1); + bthread::butex_wake_all(w_wait_count); return errno; } continue;