Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 78 additions & 73 deletions src/brpc/rdma/block_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
#include "butil/iobuf.h"
#include "butil/object_pool.h"
#include "butil/thread_local.h"
#include "bthread/bthread.h"
#include "butil/memory/scope_guard.h"
#include "brpc/rdma/block_pool.h"


namespace brpc {
namespace rdma {

Expand Down Expand Up @@ -98,6 +97,8 @@ struct GlobalInfo {
std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
int region_num[BLOCK_SIZE_COUNT];
butil::Mutex extend_lock;
std::vector<IdleNode*> expansion_list[BLOCK_SIZE_COUNT];
std::vector<size_t> expansion_size[BLOCK_SIZE_COUNT];
};
static GlobalInfo* g_info = NULL;

Expand Down Expand Up @@ -129,36 +130,20 @@ uint32_t GetRegionId(const void* buf) {
return r->id;
}

// When both rdma_memory_pool_max_regions and rdma_memory_pool_buckets are
// greater than 1, dynamic memory expansion may cause concurrent modification
// issues in the memory linked list due to lock contention problems. To address
// this, we increase the region_num count for each block_type. Dynamic memory
// expansion is only permitted when both of the following conditions are met:
// rdma_memory_pool_buckets equals 1
// g_info->region_num[block_type] is less than 1
static bool CanExtendBlockRuntime(int block_type) {
return FLAGS_rdma_memory_pool_buckets == 1 ||
g_info->region_num[block_type] < 1;
}

static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
int block_type) {
if (CanExtendBlockRuntime(block_type) == false) {
LOG(INFO) << "Runtime extend memory only support one bucket or region "
"num is zero for per block_type";
static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, int block_type) {
auto region_base_guard = butil::MakeScopeGuard([region_base]() {
free(region_base);
errno = ENOMEM;
return NULL;
}
});
Comment thread
chenBright marked this conversation as resolved.

if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
LOG(INFO) << "Memory pool reaches max regions";
free(region_base);
LOG_EVERY_SECOND(ERROR) << "Memory pool reaches max regions";
errno = ENOMEM;
return NULL;
}

uint32_t id = g_cb(region_base, region_size);
if (id == 0) {
free(region_base);
errno = EINVAL;
return NULL;
}

Expand All @@ -170,7 +155,7 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
for (size_t j = 0; j < i; ++j) {
butil::return_object<IdleNode>(node[j]);
}
free(region_base);
errno = ENOMEM;
return NULL;
}
}
Expand All @@ -184,12 +169,15 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
for (size_t i = 0; i < g_buckets; ++i) {
node[i]->start = (void*)(region->start + i * (region_size / g_buckets));
node[i]->len = region_size / g_buckets;
node[i]->next = g_info->idle_list[block_type][i];
g_info->idle_list[block_type][i] = node[i];
g_info->idle_size[block_type][i] += node[i]->len;
node[i]->next = g_info->expansion_list[block_type][i];
g_info->expansion_list[block_type][i] = node[i];
g_info->expansion_size[block_type][i] += node[i]->len;
}
g_info->region_num[block_type]++;

// `region_base' is inuse, cannot be freed.
region_base_guard.dismiss();

return region_base;
}

Expand All @@ -203,7 +191,7 @@ static void* ExtendBlockPool(size_t region_size, int block_type) {
if (FLAGS_rdma_memory_pool_user_specified_memory) {
LOG_EVERY_SECOND(ERROR) << "Fail to extend new region, "
"rdma_memory_pool_user_specified_memory is "
"true, ExtendBlockPool is disabled";
"true, ExtendBlockPool is disabled";
return NULL;
}

Expand All @@ -222,24 +210,27 @@ static void* ExtendBlockPool(size_t region_size, int block_type) {
return ExtendBlockPoolImpl(region_base, region_size, block_type);
Comment thread
chenBright marked this conversation as resolved.
}

void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
int block_type) {
if (FLAGS_rdma_memory_pool_user_specified_memory == false) {
void* ExtendBlockPoolByUser(void* region_base, size_t region_size, int block_type) {
Comment thread
chenBright marked this conversation as resolved.
Comment thread
chenBright marked this conversation as resolved.
auto region_base_guard = butil::MakeScopeGuard([region_base]() {
free(region_base);
});
Comment thread
chenBright marked this conversation as resolved.

if (!FLAGS_rdma_memory_pool_user_specified_memory) {
LOG_EVERY_SECOND(ERROR) << "User extend memory is disabled";
return NULL;
}
if (reinterpret_cast<uintptr_t>(region_base) % 4096 != 0) {
LOG_EVERY_SECOND(ERROR) << "region_base must be 4096 aligned";
errno = EINVAL;
return NULL;
}

uint64_t index = butil::fast_rand() % g_buckets;
BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
BAIDU_SCOPED_LOCK(g_info->extend_lock);
region_size =
region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
region_size *= g_block_size[block_type] * g_buckets;
Comment thread
chenBright marked this conversation as resolved.

region_base_guard.dismiss();
Comment thread
chenBright marked this conversation as resolved.
BAIDU_SCOPED_LOCK(g_info->extend_lock);
return ExtendBlockPoolImpl(region_base, region_size, block_type);
}
Comment thread
chenBright marked this conversation as resolved.

Expand Down Expand Up @@ -316,6 +307,14 @@ bool InitBlockPool(RegisterCallback cb) {
return false;
}
}
g_info->expansion_list[i].resize(g_buckets, NULL);
if (g_info->expansion_list[i].size() != g_buckets) {
return false;
}
g_info->expansion_size[i].resize(g_buckets, 0);
if (g_info->expansion_size[i].size() != g_buckets) {
return false;
}
}

g_dump_mutex = new butil::Mutex;
Expand All @@ -332,66 +331,74 @@ bool InitBlockPool(RegisterCallback cb) {
return false;
}

static void MoveExpansionList2EmptyIdleList(int block_type, size_t index) {
CHECK(NULL == g_info->idle_list[block_type][index]);
Comment thread
chenBright marked this conversation as resolved.

g_info->idle_list[block_type][index] = g_info->expansion_list[block_type][index];
g_info->idle_size[block_type][index] += g_info->expansion_size[block_type][index];
g_info->expansion_list[block_type][index] = NULL;
g_info->expansion_size[block_type][index] = 0;
}

static void* AllocBlockFrom(int block_type) {
bool locked = false;
if (BAIDU_UNLIKELY(g_dump_enable)) {
g_dump_mutex->lock();
locked = true;
}
BUTIL_SCOPE_EXIT {
if (locked) {
g_dump_mutex->unlock();
}
};

void* ptr = NULL;
if (block_type == 0 && tls_idle_list != NULL){
if (0 == block_type && NULL != tls_idle_list) {
CHECK(tls_idle_num > 0);
IdleNode* n = tls_idle_list;
tls_idle_list = n->next;
ptr = n->start;
butil::return_object<IdleNode>(n);
tls_idle_num--;
if (locked) {
g_dump_mutex->unlock();
}
return ptr;
}

uint64_t index = butil::fast_rand() % g_buckets;
size_t index = butil::fast_rand() % g_buckets;
BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
IdleNode* node = g_info->idle_list[block_type][index];
if (!node) {
if (NULL == node) {
BAIDU_SCOPED_LOCK(g_info->extend_lock);
node = g_info->idle_list[block_type][index];
if (!node) {
// There is no block left, extend a new region
if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,
block_type)) {
if (NULL == node && NULL != g_info->expansion_list[block_type][index]) {
MoveExpansionList2EmptyIdleList(block_type, index);
node = g_info->idle_list[block_type][index];
}
if (NULL == node) {
// There is no block left, extend a new region.
if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb, block_type)) {
LOG_EVERY_SECOND(ERROR) << "Fail to extend new region. "
<< "You can set the size of memory pool larger. "
<< "Refer to the help message of these flags: "
<< "rdma_memory_pool_initial_size_mb, "
<< "rdma_memory_pool_increase_size_mb, "
<< "rdma_memory_pool_max_regions.";
if (locked) {
g_dump_mutex->unlock();
}
return NULL;
}
MoveExpansionList2EmptyIdleList(block_type, index);
Comment thread
chenBright marked this conversation as resolved.
node = g_info->idle_list[block_type][index];
}
}
if (node) {
ptr = node->start;
if (node->len > g_block_size[block_type]) {
node->start = (char*)node->start + g_block_size[block_type];
node->len -= g_block_size[block_type];
} else {
g_info->idle_list[block_type][index] = node->next;
butil::return_object<IdleNode>(node);
}
g_info->idle_size[block_type][index] -= g_block_size[block_type];
CHECK(NULL != node);

ptr = node->start;
if (node->len > g_block_size[block_type]) {
node->start = (char*)node->start + g_block_size[block_type];
node->len -= g_block_size[block_type];
} else {
if (locked) {
g_dump_mutex->unlock();
}
return NULL;
g_info->idle_list[block_type][index] = node->next;
butil::return_object<IdleNode>(node);
}
g_info->idle_size[block_type][index] -= g_block_size[block_type];

// Move more blocks from global list to tls list
if (block_type == 0) {
Expand All @@ -417,9 +424,6 @@ static void* AllocBlockFrom(int block_type) {
}
}

if (locked) {
g_dump_mutex->unlock();
}
return ptr;
}

Expand Down Expand Up @@ -482,6 +486,12 @@ int DeallocBlock(void* buf) {
g_dump_mutex->lock();
locked = true;
}
BUTIL_SCOPE_EXIT {
if (locked) {
g_dump_mutex->unlock();
}
};

if (block_type == 0 && tls_idle_num < (uint32_t)FLAGS_rdma_memory_pool_tls_cache_num) {
if (!tls_inited) {
tls_inited = true;
Expand All @@ -494,9 +504,6 @@ int DeallocBlock(void* buf) {
tls_idle_num++;
node->next = tls_idle_list;
tls_idle_list = node;
if (locked) {
g_dump_mutex->unlock();
}
return 0;
}

Expand Down Expand Up @@ -527,9 +534,6 @@ int DeallocBlock(void* buf) {
g_info->idle_list[block_type][index] = node;
g_info->idle_size[block_type][index] += node->len;
}
if (locked) {
g_dump_mutex->unlock();
}
return 0;
}

Expand Down Expand Up @@ -557,7 +561,8 @@ void DumpMemoryPoolInfo(std::ostream& os) {
for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
os << "\tFor block size " << GetBlockSize(i) << ":\n";
for (size_t j = 0; j < g_buckets; ++j) {
os << "\t\tBucket " << j << ": " << g_info->idle_size[i][j] << "\n";
os << "\t\tBucket " << j << ": {" << g_info->idle_size[i][j]
<< ", " << g_info->expansion_size[i][j] << "}\n";
Comment thread
chenBright marked this conversation as resolved.
}
}
os << "Thread Local Cache Info:\n";
Expand Down
3 changes: 1 addition & 2 deletions src/brpc/rdma/block_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ bool InitBlockPool(RegisterCallback cb);
// FLAGS_rdma_memory_pool_user_specified_memory is true, user is responsibility
// of extending memory blocks , this ensuring flexibility for advanced use
// cases.
void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
int block_type);
void* ExtendBlockPoolByUser(void* region_base, size_t region_size, int block_type);

// Allocate a buf with length at least @a size (require: size>0)
// Return the address allocated, NULL if failed and errno is set.
Expand Down
2 changes: 2 additions & 0 deletions src/butil/memory/scope_guard.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,6 @@ operator+(ScopeExitHelper, Callback&& callback) {
auto BRPC_ANONYMOUS_VARIABLE(SCOPE_EXIT) = \
::butil::internal::ScopeExitHelper() + [&]() noexcept

#define BUTIL_SCOPE_EXIT BRPC_SCOPE_EXIT

#endif // BUTIL_SCOPED_GUARD_H
Loading