From 6d2d424e2271693593a3dfb07f9bea57bd00f4b2 Mon Sep 17 00:00:00 2001 From: Yang Liming Date: Wed, 27 Aug 2025 14:53:29 +0800 Subject: [PATCH 1/2] rdma rumtime extend block restriction --- src/brpc/rdma/block_pool.cpp | 38 +++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/src/brpc/rdma/block_pool.cpp b/src/brpc/rdma/block_pool.cpp index 543cdb6dda..826fc5d181 100644 --- a/src/brpc/rdma/block_pool.cpp +++ b/src/brpc/rdma/block_pool.cpp @@ -36,7 +36,7 @@ DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024, "Initial size of memory pool for RDMA (MB)"); DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024, "Increased size of memory pool for RDMA (MB)"); -DEFINE_int32(rdma_memory_pool_max_regions, 1, "Max number of regions"); +DEFINE_int32(rdma_memory_pool_max_regions, 3, "Max number of regions"); DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race"); DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls"); DEFINE_bool(rdma_memory_pool_user_specified_memory, false, @@ -96,6 +96,7 @@ struct GlobalInfo { std::vector idle_list[BLOCK_SIZE_COUNT]; std::vector lock[BLOCK_SIZE_COUNT]; std::vector idle_size[BLOCK_SIZE_COUNT]; + int region_num[BLOCK_SIZE_COUNT]; butil::Mutex extend_lock; }; static GlobalInfo* g_info = NULL; @@ -128,10 +129,30 @@ uint32_t GetRegionId(const void* buf) { return r->id; } +// When both rdma_memory_pool_max_regions and rdma_memory_pool_buckets are +// greater than 1, dynamic memory expansion may cause concurrent modification +// issues in the memory linked list due to lock contention problems. To address +// this, we increase the region_num count for each block_type. Dynamic memory +// expansion is only permitted when both of the following conditions are met: +// rdma_memory_pool_buckets equals 1 +// g_info->region_num[block_type] is less than 1 +static bool CanExtendBlockRuntime(int block_type) { + return FLAGS_rdma_memory_pool_buckets == 1 || + g_info->region_num[block_type] < 1; +} + static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, int block_type) { + if (CanExtendBlockRuntime(block_type) == false) { + LOG(INFO) << "Runtime extend memory only support one bucket or region " + "num is zero for per block_type"; + free(region_base); + errno = ENOMEM; + return NULL; + } if (g_region_num == FLAGS_rdma_memory_pool_max_regions) { LOG(INFO) << "Memory pool reaches max regions"; + free(region_base); errno = ENOMEM; return NULL; } @@ -167,6 +188,7 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, g_info->idle_list[block_type][i] = node[i]; g_info->idle_size[block_type][i] += node[i]->len; } + g_info->region_num[block_type]++; return region_base; } @@ -214,12 +236,6 @@ void* ExtendBlockPoolByUser(void* region_base, size_t region_size, uint64_t index = butil::fast_rand() % g_buckets; BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]); BAIDU_SCOPED_LOCK(g_info->extend_lock); - - if (g_region_num > 1 && FLAGS_rdma_memory_pool_buckets > 1) { - LOG_EVERY_SECOND(ERROR) - << "Runtime extend memory only support single bucket"; - return NULL; - } region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets; region_size *= g_block_size[block_type] * g_buckets; @@ -274,14 +290,7 @@ bool InitBlockPool(RegisterCallback cb) { errno = EINVAL; return false; } - // runtime extend block pool only support 1 bucket - if (FLAGS_rdma_memory_pool_max_regions > 1 && - FLAGS_rdma_memory_pool_buckets > 1) { - LOG(WARNING) << "rdma runtime extend block pool only support 1 bucket"; - return false; - } g_buckets = FLAGS_rdma_memory_pool_buckets; - g_info = new (std::nothrow) GlobalInfo; if (!g_info) { return false; @@ -300,6 +309,7 @@ bool InitBlockPool(RegisterCallback cb) { if (g_info->idle_size[i].size() != g_buckets) { return false; } + g_info->region_num[i] = 0; for (size_t j = 0; j < g_buckets; ++j) { g_info->lock[i][j] = new (std::nothrow) butil::Mutex; if (!g_info->lock[i][j]) { From 4533167615370a4277486e103ffbdba9b7fc27c8 Mon Sep 17 00:00:00 2001 From: Yang Liming Date: Fri, 5 Sep 2025 09:20:23 +0800 Subject: [PATCH 2/2] rdma polling mode process message in new bthread --- src/brpc/input_messenger.cpp | 7 ++++++- src/brpc/rdma/rdma_endpoint.h | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index b1e8d1e008..1feb287d65 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -331,7 +331,12 @@ int InputMessenger::ProcessNewMessage( "destroyed when authentication failed"; } } - if (!m->is_read_progressive()) { +#if BRPC_WITH_RDMA + if (!m->is_read_progressive() && !rdma::FLAGS_rdma_use_polling) +#else + if (!m->is_read_progressive()) +#endif + { // Transfer ownership to last_msg last_msg.reset(msg.release()); } else { diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index 75730fddf2..de7cd5f6d8 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -36,6 +36,7 @@ namespace brpc { class Socket; namespace rdma { +DECLARE_bool(rdma_use_polling); DECLARE_int32(rdma_poller_num); DECLARE_bool(rdma_edisp_unsched); DECLARE_bool(rdma_disable_bthread);