Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Butex {

butil::atomic<int> value;
ButexWaiterList waiters;
internal::FastPthreadMutex waiter_lock;
FastPthreadMutex waiter_lock;
};

BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);
Expand Down Expand Up @@ -460,8 +460,8 @@ int butex_requeue(void* arg, void* arg2) {

ButexWaiter* front = NULL;
{
std::unique_lock<internal::FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock);
std::unique_lock<internal::FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock);
std::unique_lock<FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock);
std::unique_lock<FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock);
butil::double_lock(lck1, lck2);
if (b->waiters.empty()) {
return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Id {
// contended_ver: locked and contended
uint32_t first_ver;
uint32_t locked_ver;
internal::FastPthreadMutex mutex;
FastPthreadMutex mutex;
void* data;
int (*on_error)(bthread_id_t, void*, int);
int (*on_error2)(bthread_id_t, void*, int, const std::string&);
Expand Down
80 changes: 65 additions & 15 deletions src/bthread/mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex) {
return sys_pthread_mutex_unlock(mutex);
}

inline uint64_t hash_mutex_ptr(const pthread_mutex_t* m) {
template <typename Mutex>
inline uint64_t hash_mutex_ptr(const Mutex* m) {
return butil::fmix64((uint64_t)m);
}

Expand All @@ -468,7 +469,7 @@ static __thread bool tls_inside_lock = false;
#ifndef DONT_SPEEDUP_PTHREAD_CONTENTION_PROFILER_WITH_TLS
const int TLS_MAX_COUNT = 3;
struct MutexAndContentionSite {
pthread_mutex_t* mutex;
void* mutex;
bthread_contention_site_t csite;
};
struct TLSPthreadContentionSites {
Expand All @@ -482,8 +483,9 @@ static __thread TLSPthreadContentionSites tls_csites = {0,0,{}};
// Guaranteed in linux/win.
const int PTR_BITS = 48;

template <typename Mutex>
inline bthread_contention_site_t*
add_pthread_contention_site(pthread_mutex_t* mutex) {
add_pthread_contention_site(const Mutex* mutex) {
MutexMapEntry& entry = g_mutex_map[hash_mutex_ptr(mutex) & (MUTEX_MAP_SIZE - 1)];
butil::static_atomic<uint64_t>& m = entry.versioned_mutex;
uint64_t expected = m.load(butil::memory_order_relaxed);
Expand All @@ -500,8 +502,9 @@ add_pthread_contention_site(pthread_mutex_t* mutex) {
return NULL;
}

inline bool remove_pthread_contention_site(
pthread_mutex_t* mutex, bthread_contention_site_t* saved_csite) {
template <typename Mutex>
inline bool remove_pthread_contention_site(const Mutex* mutex,
bthread_contention_site_t* saved_csite) {
MutexMapEntry& entry = g_mutex_map[hash_mutex_ptr(mutex) & (MUTEX_MAP_SIZE - 1)];
butil::static_atomic<uint64_t>& m = entry.versioned_mutex;
if ((m.load(butil::memory_order_relaxed) & ((((uint64_t)1) << PTR_BITS) - 1))
Expand Down Expand Up @@ -538,16 +541,44 @@ void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns) {
tls_inside_lock = false;
}

BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
namespace internal {
BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex) {
return sys_pthread_mutex_lock(mutex);
}

BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(pthread_mutex_t* mutex) {
return ::pthread_mutex_trylock(mutex);
}

BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(pthread_mutex_t* mutex) {
return sys_pthread_mutex_unlock(mutex);
}

BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex) {
mutex->lock();
return 0;
}

BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(FastPthreadMutex* mutex) {
return mutex->try_lock() ? 0 : EBUSY;
}

BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(FastPthreadMutex* mutex) {
mutex->unlock();
return 0;
}

template <typename Mutex>
BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) {
// Don't change behavior of lock when profiler is off.
if (!g_cp ||
// collecting code including backtrace() and submit() may call
// pthread_mutex_lock and cause deadlock. Don't sample.
tls_inside_lock) {
return sys_pthread_mutex_lock(mutex);
return pthread_mutex_lock_internal(mutex);
}
// Don't slow down non-contended locks.
int rc = pthread_mutex_trylock(mutex);
int rc = pthread_mutex_trylock_internal(mutex);
if (rc != EBUSY) {
return rc;
}
Expand All @@ -567,16 +598,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
csite = &entry.csite;
if (!sampling_range) {
make_contention_site_invalid(&entry.csite);
return sys_pthread_mutex_lock(mutex);
return pthread_mutex_lock_internal(mutex);
}
}
#endif
if (!sampling_range) { // don't sample
return sys_pthread_mutex_lock(mutex);
return pthread_mutex_lock_internal(mutex);
}
// Lock and monitor the waiting time.
const int64_t start_ns = butil::cpuwide_time_ns();
rc = sys_pthread_mutex_lock(mutex);
rc = pthread_mutex_lock_internal(mutex);
if (!rc) { // Inside lock
if (!csite) {
csite = add_pthread_contention_site(mutex);
Expand All @@ -590,13 +621,14 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
return rc;
}

BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
template <typename Mutex>
BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(Mutex* mutex) {
// Don't change behavior of unlock when profiler is off.
if (!g_cp || tls_inside_lock) {
// This branch brings an issue that an entry created by
// add_pthread_contention_site may not be cleared. Thus we add a
// add_pthread_contention_site may not be cleared. Thus we add a
// 16-bit rolling version in the entry to find out such entry.
return sys_pthread_mutex_unlock(mutex);
return pthread_mutex_unlock_internal(mutex);
}
int64_t unlock_start_ns = 0;
bool miss_in_tls = true;
Expand All @@ -622,7 +654,7 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
unlock_start_ns = butil::cpuwide_time_ns();
}
}
const int rc = sys_pthread_mutex_unlock(mutex);
const int rc = pthread_mutex_unlock_internal(mutex);
// [Outside lock]
if (unlock_start_ns) {
const int64_t unlock_end_ns = butil::cpuwide_time_ns();
Expand All @@ -632,6 +664,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
return rc;
}

}

BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
return internal::pthread_mutex_lock_impl(mutex);
}

BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
return internal::pthread_mutex_unlock_impl(mutex);
}

// Implement bthread_mutex_t related functions
struct MutexInternal {
butil::static_atomic<unsigned char> locked;
Expand Down Expand Up @@ -714,6 +756,14 @@ void FastPthreadMutex::unlock() {
} // namespace internal
#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX

void FastPthreadMutex::lock() {
internal::pthread_mutex_lock_impl(&_mutex);
}

void FastPthreadMutex::unlock() {
internal::pthread_mutex_unlock_impl(&_mutex);
}

} // namespace bthread

extern "C" {
Expand Down
15 changes: 14 additions & 1 deletion src/bthread/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace internal {
class FastPthreadMutex {
public:
FastPthreadMutex() : _futex(0) {}
~FastPthreadMutex() {}
~FastPthreadMutex() = default;
void lock();
void unlock();
bool try_lock();
Expand All @@ -86,6 +86,19 @@ typedef butil::Mutex FastPthreadMutex;
#endif
}

class FastPthreadMutex {
public:
FastPthreadMutex() = default;
~FastPthreadMutex() = default;
DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);

void lock();
void unlock();
bool try_lock() { return _mutex.try_lock(); }
private:
internal::FastPthreadMutex _mutex;
};

} // namespace bthread

// Specialize std::lock_guard and std::unique_lock for bthread_mutex_t
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/timer_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class BAIDU_CACHELINE_ALIGNMENT TimerThread::Bucket {
Task* consume_tasks();

private:
internal::FastPthreadMutex _mutex;
FastPthreadMutex _mutex;
int64_t _nearest_run_time;
Task* _task_head;
};
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/timer_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class TimerThread {

TimerThreadOptions _options;
Bucket* _buckets; // list of tasks to be run
internal::FastPthreadMutex _mutex; // protect _nearest_run_time
FastPthreadMutex _mutex; // protect _nearest_run_time
int64_t _nearest_run_time;
// the futex for wake up timer thread. can't use _nearest_run_time because
// it's 64-bit.
Expand Down
40 changes: 37 additions & 3 deletions test/bthread_mutex_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ TEST(MutexTest, performance) {
PerfTest(&bth_mutex, (bthread_t*)NULL, thread_num, bthread_start_background, bthread_join);
}

template <typename Mutex>
void* loop_until_stopped(void* arg) {
bthread::Mutex *m = (bthread::Mutex*)arg;
auto m = (Mutex*)arg;
while (!g_stopped) {
BAIDU_SCOPED_LOCK(*m);
bthread_usleep(20);
Expand All @@ -251,11 +252,11 @@ TEST(MutexTest, mix_thread_types) {
// true, thus loop_until_stopped spins forever)
bthread_setconcurrency(M);
for (int i = 0; i < N; ++i) {
ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped, &m));
ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped<bthread::Mutex>, &m));
}
for (int i = 0; i < M; ++i) {
const bthread_attr_t *attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD;
ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, loop_until_stopped, &m));
ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, loop_until_stopped<bthread::Mutex>, &m));
}
bthread_usleep(1000L * 1000);
g_stopped = true;
Expand All @@ -266,4 +267,37 @@ TEST(MutexTest, mix_thread_types) {
pthread_join(pthreads[i], NULL);
}
}

TEST(MutexTest, fast_pthread_mutex) {
bthread::FastPthreadMutex mutex;
ASSERT_TRUE(mutex.try_lock());
mutex.unlock();
mutex.lock();
mutex.unlock();
{
BAIDU_SCOPED_LOCK(mutex);
}
{
std::unique_lock<bthread::FastPthreadMutex> lck1;
std::unique_lock<bthread::FastPthreadMutex> lck2(mutex);
lck1.swap(lck2);
lck1.unlock();
lck1.lock();
}
ASSERT_TRUE(mutex.try_lock());
mutex.unlock();

const int N = 16;
pthread_t pthreads[N];
for (int i = 0; i < N; ++i) {
ASSERT_EQ(0, pthread_create(&pthreads[i], NULL,
loop_until_stopped<bthread::FastPthreadMutex>, &mutex));
}
bthread_usleep(1000L * 1000);
g_stopped = true;
for (int i = 0; i < N; ++i) {
pthread_join(pthreads[i], NULL);
}
}

} // namespace