Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 180 additions & 39 deletions src/bthread/key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,35 @@
// Date: Sun Aug 3 12:46:15 CST 2014

#include <pthread.h>
#include "butil/macros.h"
#include <gflags/gflags.h>

#include "bthread/errno.h" // EAGAIN
#include "bthread/task_group.h" // TaskGroup
#include "butil/atomicops.h"
#include "butil/macros.h"
#include "butil/thread_key.h"
#include "butil/thread_local.h"
#include "bvar/passive_status.h"
#include "bthread/errno.h" // EAGAIN
#include "bthread/task_group.h" // TaskGroup

// Implement bthread_key_t related functions

namespace bthread {

DEFINE_uint32(key_table_list_size, 5000,
"The maximum length of the KeyTableList. Once this value is "
"exceeded, a portion of the KeyTables will be moved to the "
"global free_keytables list.");

DEFINE_uint32(borrow_from_globle_size, 100,
"The maximum number of KeyTables retrieved in a single operation "
"from the global free_keytables when no KeyTable exists in the "
"current thread's keytable_list.");

EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);

class KeyTable;

// defined in task_group.cpp
extern __thread TaskGroup* tls_task_group;
extern __thread LocalStorage tls_bls;
static __thread bool tls_ever_created_keytable = false;

Expand All @@ -52,7 +66,7 @@ static const uint32_t KEY_2NDLEVEL_SIZE = 32;
static const uint32_t KEY_1STLEVEL_SIZE = 31;

// Max tls in one thread, currently the value is 992 which should be enough
// for most projects throughout baidu.
// for most projects throughout baidu.
static const uint32_t KEYS_MAX = KEY_2NDLEVEL_SIZE * KEY_1STLEVEL_SIZE;

// destructors/version of TLS.
Expand Down Expand Up @@ -94,7 +108,7 @@ class BAIDU_CACHELINE_ALIGNMENT SubKeyTable {
// Set the position to NULL before calling dtor which may set
// the position again.
_data[i].ptr = NULL;

KeyInfo info = bthread::s_key_info[offset + i];
if (info.dtor && _data[i].version == info.version) {
info.dtor(p, info.dtor_args);
Expand Down Expand Up @@ -205,53 +219,148 @@ class BAIDU_CACHELINE_ALIGNMENT KeyTable {
SubKeyTable* _subs[KEY_1STLEVEL_SIZE];
};

struct KeyTableList {
KeyTableList() {
keytable = NULL;
class BAIDU_CACHELINE_ALIGNMENT KeyTableList {
public:
KeyTableList() :
_head(NULL), _tail(NULL), _length(0) {
}

~KeyTableList() {
bthread::TaskGroup* g = bthread::tls_task_group;
bthread::KeyTable* old_kt = bthread::tls_bls.keytable;
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
KeyTable* old_kt = tls_bls.keytable;
KeyTable* keytable = _head;
while (keytable) {
bthread::KeyTable* kt = keytable;
KeyTable* kt = keytable;
keytable = kt->next;
bthread::tls_bls.keytable = kt;
tls_bls.keytable = kt;
if (g) {
g->current_task()->local_storage.keytable = kt;
}
delete kt;
if (old_kt == kt) {
old_kt = NULL;
}
g = bthread::tls_task_group;
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
bthread::tls_bls.keytable = old_kt;
if(g) {
tls_bls.keytable = old_kt;
if (g) {
g->current_task()->local_storage.keytable = old_kt;
}
}
KeyTable* keytable;

void append(KeyTable* keytable) {
if (keytable == NULL) {
return;
}
if (_head == NULL) {
_head = _tail = keytable;
} else {
_tail->next = keytable;
_tail = keytable;
}
keytable->next = NULL;
_length++;
}

KeyTable* remove_front() {
if (_head == NULL) {
return NULL;
}
KeyTable* temp = _head;
_head = _head->next;
_length--;
if (_head == NULL) {
_tail = NULL;
}
return temp;
}

int move_first_n_to_target(KeyTable** target, uint32_t size) {
if (size > _length || _head == NULL) {
return 0;
}

KeyTable* current = _head;
KeyTable* prev = NULL;
uint32_t count = 0;
while (current != NULL && count < size) {
prev = current;
current = current->next;
count++;
}
if (prev != NULL) {
prev->next = NULL;
if (*target == NULL) {
*target = _head;
} else {
(*target)->next = _head;
}
_head = current;
_length -= count;
if (_head == NULL) {
_tail = NULL;
}
}
return count;
}

inline uint32_t get_length() {
return _length;
}

// Only for test
inline bool check_length() {
KeyTable* current = _head;
uint32_t count = 0;
while (current != NULL) {
current = current->next;
count++;
}
return count == _length;
}

private:
KeyTable* _head;
KeyTable* _tail;
uint32_t _length;
};

static KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) {
KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) {
if (pool != NULL && (pool->list || pool->free_keytables)) {
KeyTable* p;
pthread_rwlock_rdlock(&pool->rwlock);
auto list = (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
if (list && list->get()->keytable) {
p = list->get()->keytable;
list->get()->keytable = p->next;
pthread_rwlock_unlock(&pool->rwlock);
return p;
if (list) {
p = list->get()->remove_front();
if (p) {
pthread_rwlock_unlock(&pool->rwlock);
return p;
}
}
pthread_rwlock_unlock(&pool->rwlock);
if (pool->free_keytables) {
pthread_rwlock_wrlock(&pool->rwlock);
p = (KeyTable*)pool->free_keytables;
if (p) {
pool->free_keytables = p->next;
if (list) {
for (uint32_t i = 0; i < FLAGS_borrow_from_globle_size; ++i) {
if (p) {
pool->free_keytables = p->next;
list->get()->append(p);
p = (KeyTable*)pool->free_keytables;
--pool->size;
} else {
break;
}
}
KeyTable* result = list->get()->remove_front();
pthread_rwlock_unlock(&pool->rwlock);
return p;
return result;
} else {
if (p) {
pool->free_keytables = p->next;
pthread_rwlock_unlock(&pool->rwlock);
return p;
}
}
pthread_rwlock_unlock(&pool->rwlock);
}
Expand All @@ -276,8 +385,17 @@ void return_keytable(bthread_keytable_pool_t* pool, KeyTable* kt) {
return;
}
auto list = (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
kt->next = list->get()->keytable;
list->get()->keytable = kt;
list->get()->append(kt);
if (list->get()->get_length() > FLAGS_key_table_list_size) {
pthread_rwlock_unlock(&pool->rwlock);
pthread_rwlock_wrlock(&pool->rwlock);
if (!pool->destroyed) {
int out = list->get()->move_first_n_to_target(
(KeyTable**)(&pool->free_keytables),
FLAGS_key_table_list_size / 2);
pool->size += out;
}
}
pthread_rwlock_unlock(&pool->rwlock);
}

Expand Down Expand Up @@ -327,6 +445,7 @@ int bthread_keytable_pool_init(bthread_keytable_pool_t* pool) {
pthread_rwlock_init(&pool->rwlock, NULL);
pool->list = new butil::ThreadLocal<bthread::KeyTableList>();
pool->free_keytables = NULL;
pool->size = 0;
pool->destroyed = 0;
return 0;
}
Expand All @@ -339,14 +458,16 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* pool) {
bthread::KeyTable* saved_free_keytables = NULL;
pthread_rwlock_wrlock(&pool->rwlock);
pool->destroyed = 1;
pool->size = 0;
delete (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
saved_free_keytables = (bthread::KeyTable*)pool->free_keytables;
pool->list = NULL;
pool->free_keytables = NULL;
pthread_rwlock_unlock(&pool->rwlock);

// Cheat get/setspecific and destroy the keytables.
bthread::TaskGroup* g = bthread::tls_task_group;
bthread::TaskGroup* g =
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
bthread::KeyTable* old_kt = bthread::tls_bls.keytable;
while (saved_free_keytables) {
bthread::KeyTable* kt = saved_free_keytables;
Expand All @@ -356,7 +477,7 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* pool) {
g->current_task()->local_storage.keytable = kt;
}
delete kt;
g = bthread::tls_task_group;
g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
bthread::tls_bls.keytable = old_kt;
if (g) {
Expand All @@ -374,15 +495,34 @@ int bthread_keytable_pool_getstat(bthread_keytable_pool_t* pool,
LOG(ERROR) << "Param[pool] or Param[stat] is NULL";
return EINVAL;
}
pthread_rwlock_rdlock(&pool->rwlock);
size_t count = 0;
bthread::KeyTable* p = (bthread::KeyTable*)pool->free_keytables;
for (; p; p = p->next, ++count) {}
stat->nfree = count;
pthread_rwlock_wrlock(&pool->rwlock);
stat->nfree = pool->size;
pthread_rwlock_unlock(&pool->rwlock);
return 0;
}

int get_thread_local_keytable_list_length(bthread_keytable_pool_t* pool) {
if (pool == NULL) {
LOG(ERROR) << "Param[pool] is NULL";
return EINVAL;
}
int length = 0;
pthread_rwlock_rdlock(&pool->rwlock);
if (pool->destroyed) {
pthread_rwlock_unlock(&pool->rwlock);
return length;
}
auto list = (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
if (list) {
length = (int)(list->get()->get_length());
if (!list->get()->check_length()) {
LOG(ERROR) << "Length is not equal";
}
}
pthread_rwlock_unlock(&pool->rwlock);
return length;
}

// TODO: this is not strict `reserve' because we only check #free.
// Currently there's no way to track KeyTables that may be returned
// to the pool in future.
Expand Down Expand Up @@ -418,6 +558,7 @@ void bthread_keytable_pool_reserve(bthread_keytable_pool_t* pool,
}
kt->next = (bthread::KeyTable*)pool->free_keytables;
pool->free_keytables = kt;
++pool->size;
pthread_rwlock_unlock(&pool->rwlock);
if (data == NULL) {
break;
Expand Down Expand Up @@ -467,10 +608,10 @@ int bthread_key_delete(bthread_key_t key) {
++bthread::s_key_info[key.index].version;
}
bthread::s_key_info[key.index].dtor = NULL;
bthread::s_key_info[key.index].dtor_args = NULL;
bthread::s_key_info[key.index].dtor_args = NULL;
bthread::s_free_keys[bthread::nfreekey++] = key.index;
return 0;
}
}
}
CHECK(false) << "bthread_key_delete is called on invalid " << key;
return EINVAL;
Expand All @@ -489,7 +630,7 @@ int bthread_setspecific(bthread_key_t key, void* data) {
return ENOMEM;
}
bthread::tls_bls.keytable = kt;
bthread::TaskGroup* const g = bthread::tls_task_group;
bthread::TaskGroup* const g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g) {
g->current_task()->local_storage.keytable = kt;
} else {
Expand All @@ -510,7 +651,7 @@ void* bthread_getspecific(bthread_key_t key) {
if (kt) {
return kt->get_data(key);
}
bthread::TaskGroup* const g = bthread::tls_task_group;
bthread::TaskGroup* const g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g) {
bthread::TaskMeta* const task = g->current_task();
kt = bthread::borrow_keytable(task->attr.keytable_pool);
Expand Down
1 change: 1 addition & 0 deletions src/bthread/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ typedef struct {
pthread_rwlock_t rwlock;
void* list;
void* free_keytables;
size_t size;
int destroyed;
} bthread_keytable_pool_t;

Expand Down
4 changes: 4 additions & 0 deletions src/bthread/unstable.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ extern int bthread_keytable_pool_destroy(bthread_keytable_pool_t*);
extern int bthread_keytable_pool_getstat(bthread_keytable_pool_t* pool,
bthread_keytable_pool_stat_t* stat);

// [RPC INTERNAL]
// Return thread local keytable list length if exist.
extern int get_thread_local_keytable_list_length(bthread_keytable_pool_t* pool);

// [RPC INTERNAL]
// Reserve at most `nfree' keytables with `key' pointing to data created by
// ctor(args).
Expand Down
Loading