diff --git a/src/brpc/rdma/block_pool.cpp b/src/brpc/rdma/block_pool.cpp index 826fc5d181..24907a194a 100644 --- a/src/brpc/rdma/block_pool.cpp +++ b/src/brpc/rdma/block_pool.cpp @@ -25,10 +25,9 @@ #include "butil/iobuf.h" #include "butil/object_pool.h" #include "butil/thread_local.h" -#include "bthread/bthread.h" +#include "butil/memory/scope_guard.h" #include "brpc/rdma/block_pool.h" - namespace brpc { namespace rdma { @@ -98,6 +97,8 @@ struct GlobalInfo { std::vector idle_size[BLOCK_SIZE_COUNT]; int region_num[BLOCK_SIZE_COUNT]; butil::Mutex extend_lock; + std::vector expansion_list[BLOCK_SIZE_COUNT]; + std::vector expansion_size[BLOCK_SIZE_COUNT]; }; static GlobalInfo* g_info = NULL; @@ -129,36 +130,20 @@ uint32_t GetRegionId(const void* buf) { return r->id; } -// When both rdma_memory_pool_max_regions and rdma_memory_pool_buckets are -// greater than 1, dynamic memory expansion may cause concurrent modification -// issues in the memory linked list due to lock contention problems. To address -// this, we increase the region_num count for each block_type. Dynamic memory -// expansion is only permitted when both of the following conditions are met: -// rdma_memory_pool_buckets equals 1 -// g_info->region_num[block_type] is less than 1 -static bool CanExtendBlockRuntime(int block_type) { - return FLAGS_rdma_memory_pool_buckets == 1 || - g_info->region_num[block_type] < 1; -} - -static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, - int block_type) { - if (CanExtendBlockRuntime(block_type) == false) { - LOG(INFO) << "Runtime extend memory only support one bucket or region " - "num is zero for per block_type"; +static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, int block_type) { + auto region_base_guard = butil::MakeScopeGuard([region_base]() { free(region_base); - errno = ENOMEM; - return NULL; - } + }); + if (g_region_num == FLAGS_rdma_memory_pool_max_regions) { - LOG(INFO) << "Memory pool reaches max regions"; - free(region_base); + LOG_EVERY_SECOND(ERROR) << "Memory pool reaches max regions"; errno = ENOMEM; return NULL; } + uint32_t id = g_cb(region_base, region_size); if (id == 0) { - free(region_base); + errno = EINVAL; return NULL; } @@ -170,7 +155,7 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, for (size_t j = 0; j < i; ++j) { butil::return_object(node[j]); } - free(region_base); + errno = ENOMEM; return NULL; } } @@ -184,12 +169,15 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, for (size_t i = 0; i < g_buckets; ++i) { node[i]->start = (void*)(region->start + i * (region_size / g_buckets)); node[i]->len = region_size / g_buckets; - node[i]->next = g_info->idle_list[block_type][i]; - g_info->idle_list[block_type][i] = node[i]; - g_info->idle_size[block_type][i] += node[i]->len; + node[i]->next = g_info->expansion_list[block_type][i]; + g_info->expansion_list[block_type][i] = node[i]; + g_info->expansion_size[block_type][i] += node[i]->len; } g_info->region_num[block_type]++; + // `region_base' is inuse, cannot be freed. + region_base_guard.dismiss(); + return region_base; } @@ -203,7 +191,7 @@ static void* ExtendBlockPool(size_t region_size, int block_type) { if (FLAGS_rdma_memory_pool_user_specified_memory) { LOG_EVERY_SECOND(ERROR) << "Fail to extend new region, " "rdma_memory_pool_user_specified_memory is " - "true, ExtendBlockPool is disabled"; + "true, ExtendBlockPool is disabled"; return NULL; } @@ -222,24 +210,27 @@ static void* ExtendBlockPool(size_t region_size, int block_type) { return ExtendBlockPoolImpl(region_base, region_size, block_type); } -void* ExtendBlockPoolByUser(void* region_base, size_t region_size, - int block_type) { - if (FLAGS_rdma_memory_pool_user_specified_memory == false) { +void* ExtendBlockPoolByUser(void* region_base, size_t region_size, int block_type) { + auto region_base_guard = butil::MakeScopeGuard([region_base]() { + free(region_base); + }); + + if (!FLAGS_rdma_memory_pool_user_specified_memory) { LOG_EVERY_SECOND(ERROR) << "User extend memory is disabled"; return NULL; } if (reinterpret_cast(region_base) % 4096 != 0) { LOG_EVERY_SECOND(ERROR) << "region_base must be 4096 aligned"; + errno = EINVAL; return NULL; } - uint64_t index = butil::fast_rand() % g_buckets; - BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]); - BAIDU_SCOPED_LOCK(g_info->extend_lock); region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets; region_size *= g_block_size[block_type] * g_buckets; + region_base_guard.dismiss(); + BAIDU_SCOPED_LOCK(g_info->extend_lock); return ExtendBlockPoolImpl(region_base, region_size, block_type); } @@ -316,6 +307,14 @@ bool InitBlockPool(RegisterCallback cb) { return false; } } + g_info->expansion_list[i].resize(g_buckets, NULL); + if (g_info->expansion_list[i].size() != g_buckets) { + return false; + } + g_info->expansion_size[i].resize(g_buckets, 0); + if (g_info->expansion_size[i].size() != g_buckets) { + return false; + } } g_dump_mutex = new butil::Mutex; @@ -332,66 +331,74 @@ bool InitBlockPool(RegisterCallback cb) { return false; } +static void MoveExpansionList2EmptyIdleList(int block_type, size_t index) { + CHECK(NULL == g_info->idle_list[block_type][index]); + + g_info->idle_list[block_type][index] = g_info->expansion_list[block_type][index]; + g_info->idle_size[block_type][index] += g_info->expansion_size[block_type][index]; + g_info->expansion_list[block_type][index] = NULL; + g_info->expansion_size[block_type][index] = 0; +} + static void* AllocBlockFrom(int block_type) { bool locked = false; if (BAIDU_UNLIKELY(g_dump_enable)) { g_dump_mutex->lock(); locked = true; } + BUTIL_SCOPE_EXIT { + if (locked) { + g_dump_mutex->unlock(); + } + }; + void* ptr = NULL; - if (block_type == 0 && tls_idle_list != NULL){ + if (0 == block_type && NULL != tls_idle_list) { CHECK(tls_idle_num > 0); IdleNode* n = tls_idle_list; tls_idle_list = n->next; ptr = n->start; butil::return_object(n); tls_idle_num--; - if (locked) { - g_dump_mutex->unlock(); - } return ptr; } - uint64_t index = butil::fast_rand() % g_buckets; + size_t index = butil::fast_rand() % g_buckets; BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]); IdleNode* node = g_info->idle_list[block_type][index]; - if (!node) { + if (NULL == node) { BAIDU_SCOPED_LOCK(g_info->extend_lock); node = g_info->idle_list[block_type][index]; - if (!node) { - // There is no block left, extend a new region - if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb, - block_type)) { + if (NULL == node && NULL != g_info->expansion_list[block_type][index]) { + MoveExpansionList2EmptyIdleList(block_type, index); + node = g_info->idle_list[block_type][index]; + } + if (NULL == node) { + // There is no block left, extend a new region. + if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb, block_type)) { LOG_EVERY_SECOND(ERROR) << "Fail to extend new region. " << "You can set the size of memory pool larger. " << "Refer to the help message of these flags: " << "rdma_memory_pool_initial_size_mb, " << "rdma_memory_pool_increase_size_mb, " << "rdma_memory_pool_max_regions."; - if (locked) { - g_dump_mutex->unlock(); - } return NULL; } + MoveExpansionList2EmptyIdleList(block_type, index); node = g_info->idle_list[block_type][index]; } } - if (node) { - ptr = node->start; - if (node->len > g_block_size[block_type]) { - node->start = (char*)node->start + g_block_size[block_type]; - node->len -= g_block_size[block_type]; - } else { - g_info->idle_list[block_type][index] = node->next; - butil::return_object(node); - } - g_info->idle_size[block_type][index] -= g_block_size[block_type]; + CHECK(NULL != node); + + ptr = node->start; + if (node->len > g_block_size[block_type]) { + node->start = (char*)node->start + g_block_size[block_type]; + node->len -= g_block_size[block_type]; } else { - if (locked) { - g_dump_mutex->unlock(); - } - return NULL; + g_info->idle_list[block_type][index] = node->next; + butil::return_object(node); } + g_info->idle_size[block_type][index] -= g_block_size[block_type]; // Move more blocks from global list to tls list if (block_type == 0) { @@ -417,9 +424,6 @@ static void* AllocBlockFrom(int block_type) { } } - if (locked) { - g_dump_mutex->unlock(); - } return ptr; } @@ -482,6 +486,12 @@ int DeallocBlock(void* buf) { g_dump_mutex->lock(); locked = true; } + BUTIL_SCOPE_EXIT { + if (locked) { + g_dump_mutex->unlock(); + } + }; + if (block_type == 0 && tls_idle_num < (uint32_t)FLAGS_rdma_memory_pool_tls_cache_num) { if (!tls_inited) { tls_inited = true; @@ -494,9 +504,6 @@ int DeallocBlock(void* buf) { tls_idle_num++; node->next = tls_idle_list; tls_idle_list = node; - if (locked) { - g_dump_mutex->unlock(); - } return 0; } @@ -527,9 +534,6 @@ int DeallocBlock(void* buf) { g_info->idle_list[block_type][index] = node; g_info->idle_size[block_type][index] += node->len; } - if (locked) { - g_dump_mutex->unlock(); - } return 0; } @@ -557,7 +561,8 @@ void DumpMemoryPoolInfo(std::ostream& os) { for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) { os << "\tFor block size " << GetBlockSize(i) << ":\n"; for (size_t j = 0; j < g_buckets; ++j) { - os << "\t\tBucket " << j << ": " << g_info->idle_size[i][j] << "\n"; + os << "\t\tBucket " << j << ": {" << g_info->idle_size[i][j] + << ", " << g_info->expansion_size[i][j] << "}\n"; } } os << "Thread Local Cache Info:\n"; diff --git a/src/brpc/rdma/block_pool.h b/src/brpc/rdma/block_pool.h index 00a310824f..f9018e5ecc 100644 --- a/src/brpc/rdma/block_pool.h +++ b/src/brpc/rdma/block_pool.h @@ -80,8 +80,7 @@ bool InitBlockPool(RegisterCallback cb); // FLAGS_rdma_memory_pool_user_specified_memory is true, user is responsibility // of extending memory blocks , this ensuring flexibility for advanced use // cases. -void* ExtendBlockPoolByUser(void* region_base, size_t region_size, - int block_type); +void* ExtendBlockPoolByUser(void* region_base, size_t region_size, int block_type); // Allocate a buf with length at least @a size (require: size>0) // Return the address allocated, NULL if failed and errno is set. diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h index 7d72a560d2..377819b5db 100644 --- a/src/butil/memory/scope_guard.h +++ b/src/butil/memory/scope_guard.h @@ -104,4 +104,6 @@ operator+(ScopeExitHelper, Callback&& callback) { auto BRPC_ANONYMOUS_VARIABLE(SCOPE_EXIT) = \ ::butil::internal::ScopeExitHelper() + [&]() noexcept +#define BUTIL_SCOPE_EXIT BRPC_SCOPE_EXIT + #endif // BUTIL_SCOPED_GUARD_H