From 2482a4a3a142f89763cc05874debefdf0fab9582 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Mon, 1 Apr 2024 22:59:03 +0800 Subject: [PATCH] Support FastPthreadMutex contention profiler && expose FastPthreadMutex to user --- src/bthread/butex.cpp | 6 +-- src/bthread/id.cpp | 2 +- src/bthread/mutex.cpp | 80 ++++++++++++++++++++++++++------- src/bthread/mutex.h | 15 ++++++- src/bthread/timer_thread.cpp | 2 +- src/bthread/timer_thread.h | 2 +- test/bthread_mutex_unittest.cpp | 40 +++++++++++++++-- 7 files changed, 122 insertions(+), 25 deletions(-) diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp index 1dbd89304c..2b7c78b8c4 100644 --- a/src/bthread/butex.cpp +++ b/src/bthread/butex.cpp @@ -121,7 +121,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Butex { butil::atomic value; ButexWaiterList waiters; - internal::FastPthreadMutex waiter_lock; + FastPthreadMutex waiter_lock; }; BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0); @@ -460,8 +460,8 @@ int butex_requeue(void* arg, void* arg2) { ButexWaiter* front = NULL; { - std::unique_lock lck1(b->waiter_lock, std::defer_lock); - std::unique_lock lck2(m->waiter_lock, std::defer_lock); + std::unique_lock lck1(b->waiter_lock, std::defer_lock); + std::unique_lock lck2(m->waiter_lock, std::defer_lock); butil::double_lock(lck1, lck2); if (b->waiters.empty()) { return 0; diff --git a/src/bthread/id.cpp b/src/bthread/id.cpp index ba77580a04..7aabed6837 100644 --- a/src/bthread/id.cpp +++ b/src/bthread/id.cpp @@ -114,7 +114,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Id { // contended_ver: locked and contended uint32_t first_ver; uint32_t locked_ver; - internal::FastPthreadMutex mutex; + FastPthreadMutex mutex; void* data; int (*on_error)(bthread_id_t, void*, int); int (*on_error2)(bthread_id_t, void*, int, const std::string&); diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp index b4008fd796..d1abb50f3e 100644 --- a/src/bthread/mutex.cpp +++ b/src/bthread/mutex.cpp @@ -448,7 +448,8 @@ int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex) { return sys_pthread_mutex_unlock(mutex); } -inline uint64_t hash_mutex_ptr(const pthread_mutex_t* m) { +template +inline uint64_t hash_mutex_ptr(const Mutex* m) { return butil::fmix64((uint64_t)m); } @@ -468,7 +469,7 @@ static __thread bool tls_inside_lock = false; #ifndef DONT_SPEEDUP_PTHREAD_CONTENTION_PROFILER_WITH_TLS const int TLS_MAX_COUNT = 3; struct MutexAndContentionSite { - pthread_mutex_t* mutex; + void* mutex; bthread_contention_site_t csite; }; struct TLSPthreadContentionSites { @@ -482,8 +483,9 @@ static __thread TLSPthreadContentionSites tls_csites = {0,0,{}}; // Guaranteed in linux/win. const int PTR_BITS = 48; +template inline bthread_contention_site_t* -add_pthread_contention_site(pthread_mutex_t* mutex) { +add_pthread_contention_site(const Mutex* mutex) { MutexMapEntry& entry = g_mutex_map[hash_mutex_ptr(mutex) & (MUTEX_MAP_SIZE - 1)]; butil::static_atomic& m = entry.versioned_mutex; uint64_t expected = m.load(butil::memory_order_relaxed); @@ -500,8 +502,9 @@ add_pthread_contention_site(pthread_mutex_t* mutex) { return NULL; } -inline bool remove_pthread_contention_site( - pthread_mutex_t* mutex, bthread_contention_site_t* saved_csite) { +template +inline bool remove_pthread_contention_site(const Mutex* mutex, + bthread_contention_site_t* saved_csite) { MutexMapEntry& entry = g_mutex_map[hash_mutex_ptr(mutex) & (MUTEX_MAP_SIZE - 1)]; butil::static_atomic& m = entry.versioned_mutex; if ((m.load(butil::memory_order_relaxed) & ((((uint64_t)1) << PTR_BITS) - 1)) @@ -538,16 +541,44 @@ void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns) { tls_inside_lock = false; } -BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) { +namespace internal { +BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex) { + return sys_pthread_mutex_lock(mutex); +} + +BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(pthread_mutex_t* mutex) { + return ::pthread_mutex_trylock(mutex); +} + +BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(pthread_mutex_t* mutex) { + return sys_pthread_mutex_unlock(mutex); +} + +BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex) { + mutex->lock(); + return 0; +} + +BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(FastPthreadMutex* mutex) { + return mutex->try_lock() ? 0 : EBUSY; +} + +BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(FastPthreadMutex* mutex) { + mutex->unlock(); + return 0; +} + +template +BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) { // Don't change behavior of lock when profiler is off. if (!g_cp || // collecting code including backtrace() and submit() may call // pthread_mutex_lock and cause deadlock. Don't sample. tls_inside_lock) { - return sys_pthread_mutex_lock(mutex); + return pthread_mutex_lock_internal(mutex); } // Don't slow down non-contended locks. - int rc = pthread_mutex_trylock(mutex); + int rc = pthread_mutex_trylock_internal(mutex); if (rc != EBUSY) { return rc; } @@ -567,16 +598,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) { csite = &entry.csite; if (!sampling_range) { make_contention_site_invalid(&entry.csite); - return sys_pthread_mutex_lock(mutex); + return pthread_mutex_lock_internal(mutex); } } #endif if (!sampling_range) { // don't sample - return sys_pthread_mutex_lock(mutex); + return pthread_mutex_lock_internal(mutex); } // Lock and monitor the waiting time. const int64_t start_ns = butil::cpuwide_time_ns(); - rc = sys_pthread_mutex_lock(mutex); + rc = pthread_mutex_lock_internal(mutex); if (!rc) { // Inside lock if (!csite) { csite = add_pthread_contention_site(mutex); @@ -590,13 +621,14 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) { return rc; } -BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) { +template +BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(Mutex* mutex) { // Don't change behavior of unlock when profiler is off. if (!g_cp || tls_inside_lock) { // This branch brings an issue that an entry created by - // add_pthread_contention_site may not be cleared. Thus we add a + // add_pthread_contention_site may not be cleared. Thus we add a // 16-bit rolling version in the entry to find out such entry. - return sys_pthread_mutex_unlock(mutex); + return pthread_mutex_unlock_internal(mutex); } int64_t unlock_start_ns = 0; bool miss_in_tls = true; @@ -622,7 +654,7 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) { unlock_start_ns = butil::cpuwide_time_ns(); } } - const int rc = sys_pthread_mutex_unlock(mutex); + const int rc = pthread_mutex_unlock_internal(mutex); // [Outside lock] if (unlock_start_ns) { const int64_t unlock_end_ns = butil::cpuwide_time_ns(); @@ -632,6 +664,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) { return rc; } +} + +BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) { + return internal::pthread_mutex_lock_impl(mutex); +} + +BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) { + return internal::pthread_mutex_unlock_impl(mutex); +} + // Implement bthread_mutex_t related functions struct MutexInternal { butil::static_atomic locked; @@ -714,6 +756,14 @@ void FastPthreadMutex::unlock() { } // namespace internal #endif // BTHREAD_USE_FAST_PTHREAD_MUTEX +void FastPthreadMutex::lock() { + internal::pthread_mutex_lock_impl(&_mutex); +} + +void FastPthreadMutex::unlock() { + internal::pthread_mutex_unlock_impl(&_mutex); +} + } // namespace bthread extern "C" { diff --git a/src/bthread/mutex.h b/src/bthread/mutex.h index 242a620fcd..ad6d2e5cbd 100644 --- a/src/bthread/mutex.h +++ b/src/bthread/mutex.h @@ -72,7 +72,7 @@ namespace internal { class FastPthreadMutex { public: FastPthreadMutex() : _futex(0) {} - ~FastPthreadMutex() {} + ~FastPthreadMutex() = default; void lock(); void unlock(); bool try_lock(); @@ -86,6 +86,19 @@ typedef butil::Mutex FastPthreadMutex; #endif } +class FastPthreadMutex { +public: + FastPthreadMutex() = default; + ~FastPthreadMutex() = default; + DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex); + + void lock(); + void unlock(); + bool try_lock() { return _mutex.try_lock(); } +private: + internal::FastPthreadMutex _mutex; +}; + } // namespace bthread // Specialize std::lock_guard and std::unique_lock for bthread_mutex_t diff --git a/src/bthread/timer_thread.cpp b/src/bthread/timer_thread.cpp index 3b2f8a7698..ee80568c93 100644 --- a/src/bthread/timer_thread.cpp +++ b/src/bthread/timer_thread.cpp @@ -92,7 +92,7 @@ class BAIDU_CACHELINE_ALIGNMENT TimerThread::Bucket { Task* consume_tasks(); private: - internal::FastPthreadMutex _mutex; + FastPthreadMutex _mutex; int64_t _nearest_run_time; Task* _task_head; }; diff --git a/src/bthread/timer_thread.h b/src/bthread/timer_thread.h index 139c2e983a..1be061cc4f 100644 --- a/src/bthread/timer_thread.h +++ b/src/bthread/timer_thread.h @@ -95,7 +95,7 @@ class TimerThread { TimerThreadOptions _options; Bucket* _buckets; // list of tasks to be run - internal::FastPthreadMutex _mutex; // protect _nearest_run_time + FastPthreadMutex _mutex; // protect _nearest_run_time int64_t _nearest_run_time; // the futex for wake up timer thread. can't use _nearest_run_time because // it's 64-bit. diff --git a/test/bthread_mutex_unittest.cpp b/test/bthread_mutex_unittest.cpp index 38c43eed0a..21bd60446f 100644 --- a/test/bthread_mutex_unittest.cpp +++ b/test/bthread_mutex_unittest.cpp @@ -229,8 +229,9 @@ TEST(MutexTest, performance) { PerfTest(&bth_mutex, (bthread_t*)NULL, thread_num, bthread_start_background, bthread_join); } +template void* loop_until_stopped(void* arg) { - bthread::Mutex *m = (bthread::Mutex*)arg; + auto m = (Mutex*)arg; while (!g_stopped) { BAIDU_SCOPED_LOCK(*m); bthread_usleep(20); @@ -251,11 +252,11 @@ TEST(MutexTest, mix_thread_types) { // true, thus loop_until_stopped spins forever) bthread_setconcurrency(M); for (int i = 0; i < N; ++i) { - ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped, &m)); + ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped, &m)); } for (int i = 0; i < M; ++i) { const bthread_attr_t *attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD; - ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, loop_until_stopped, &m)); + ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, loop_until_stopped, &m)); } bthread_usleep(1000L * 1000); g_stopped = true; @@ -266,4 +267,37 @@ TEST(MutexTest, mix_thread_types) { pthread_join(pthreads[i], NULL); } } + +TEST(MutexTest, fast_pthread_mutex) { + bthread::FastPthreadMutex mutex; + ASSERT_TRUE(mutex.try_lock()); + mutex.unlock(); + mutex.lock(); + mutex.unlock(); + { + BAIDU_SCOPED_LOCK(mutex); + } + { + std::unique_lock lck1; + std::unique_lock lck2(mutex); + lck1.swap(lck2); + lck1.unlock(); + lck1.lock(); + } + ASSERT_TRUE(mutex.try_lock()); + mutex.unlock(); + + const int N = 16; + pthread_t pthreads[N]; + for (int i = 0; i < N; ++i) { + ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, + loop_until_stopped, &mutex)); + } + bthread_usleep(1000L * 1000); + g_stopped = true; + for (int i = 0; i < N; ++i) { + pthread_join(pthreads[i], NULL); + } +} + } // namespace