Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 91 additions & 37 deletions src/brpc/rdma/block_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
"Initial size of memory pool for RDMA (MB)");
DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
"Increased size of memory pool for RDMA (MB)");
DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
DEFINE_int32(rdma_memory_pool_max_regions, 1, "Max number of regions");
Comment thread
chenBright marked this conversation as resolved.
DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls");
DEFINE_bool(rdma_memory_pool_user_specified_memory, false,
"If true, the user must call UserExtendBlockPool() to extend "
"memory. bRPC will not handle memory extension.");

static RegisterCallback g_cb = NULL;

Expand Down Expand Up @@ -125,31 +128,13 @@ uint32_t GetRegionId(const void* buf) {
return r->id;
}

// Extend the block pool with a new region (with different region ID)
static void* ExtendBlockPool(size_t region_size, int block_type) {
if (region_size < 1) {
errno = EINVAL;
return NULL;
}

static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
int block_type) {
if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
LOG(INFO) << "Memory pool reaches max regions";
errno = ENOMEM;
return NULL;
}

// Regularize region size
region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
region_size *= g_block_size[block_type] * g_buckets;

LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << "MB";

void* region_base = NULL;
if (posix_memalign(&region_base, 4096, region_size) != 0) {
PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
return NULL;
}

uint32_t id = g_cb(region_base, region_size);
if (id == 0) {
free(region_base);
Expand All @@ -168,7 +153,7 @@ static void* ExtendBlockPool(size_t region_size, int block_type) {
return NULL;
}
}

Region* region = &g_regions[g_region_num++];
region->start = (uintptr_t)region_base;
region->size = region_size;
Expand All @@ -178,23 +163,79 @@ static void* ExtendBlockPool(size_t region_size, int block_type) {
for (size_t i = 0; i < g_buckets; ++i) {
node[i]->start = (void*)(region->start + i * (region_size / g_buckets));
node[i]->len = region_size / g_buckets;
node[i]->next = NULL;
node[i]->next = g_info->idle_list[block_type][i];
g_info->idle_list[block_type][i] = node[i];
g_info->idle_size[block_type][i] += node[i]->len;
}

return region_base;
}

void* InitBlockPool(RegisterCallback cb) {
if (!cb) {
// Extend the block pool with a new region (with different region ID)
static void* ExtendBlockPool(size_t region_size, int block_type) {
if (region_size < 1) {
errno = EINVAL;
return NULL;
}

if (FLAGS_rdma_memory_pool_user_specified_memory) {
LOG_EVERY_SECOND(ERROR) << "Fail to extend new region, "
"rdma_memory_pool_user_specified_memory is "
"true, ExtendBlockPool is disabled";
return NULL;
}

// Regularize region size
region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
region_size *= g_block_size[block_type] * g_buckets;

LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << "MB";

void* region_base = NULL;
if (posix_memalign(&region_base, 4096, region_size) != 0) {
PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
return NULL;
}

return ExtendBlockPoolImpl(region_base, region_size, block_type);
}

void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
int block_type) {
if (FLAGS_rdma_memory_pool_user_specified_memory == false) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!FLAGS_rdma_memory_pool_user_specified_memory

@yanglimingcn yanglimingcn Jul 21, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FLAGS_rdma_memory_pool_user_specified_memory == false, means not allowed to ExtentBlockPoolByUser

LOG_EVERY_SECOND(ERROR) << "User extend memory is disabled";
return NULL;
}
if (reinterpret_cast<uintptr_t>(region_base) % 4096 != 0) {
LOG_EVERY_SECOND(ERROR) << "region_base must be 4096 aligned";
return NULL;
}

uint64_t index = butil::fast_rand() % g_buckets;
BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
Comment thread
chenBright marked this conversation as resolved.
BAIDU_SCOPED_LOCK(g_info->extend_lock);

if (g_region_num > 1 && FLAGS_rdma_memory_pool_buckets > 1) {
Comment thread
chenBright marked this conversation as resolved.
LOG_EVERY_SECOND(ERROR)
<< "Runtime extend memory only support single bucket";
return NULL;
}
region_size =
region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
region_size *= g_block_size[block_type] * g_buckets;

return ExtendBlockPoolImpl(region_base, region_size, block_type);
}

bool InitBlockPool(RegisterCallback cb) {
if (!cb) {
errno = EINVAL;
return false;
}
if (g_cb) {
LOG(WARNING) << "Do not initialize block pool repeatedly";
errno = EINVAL;
return NULL;
return false;
}
g_cb = cb;
if (FLAGS_rdma_memory_pool_max_regions < RDMA_MEMORY_POOL_MIN_REGIONS ||
Expand All @@ -204,7 +245,7 @@ void* InitBlockPool(RegisterCallback cb) {
<< RDMA_MEMORY_POOL_MIN_REGIONS << ","
<< RDMA_MEMORY_POOL_MAX_REGIONS << "]!";
errno = EINVAL;
return NULL;
return false;
}
if (FLAGS_rdma_memory_pool_initial_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
FLAGS_rdma_memory_pool_initial_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
Expand All @@ -213,7 +254,7 @@ void* InitBlockPool(RegisterCallback cb) {
<< RDMA_MEMORY_POOL_MIN_SIZE << ","
<< RDMA_MEMORY_POOL_MAX_SIZE << "]!";
errno = EINVAL;
return NULL;
return false;
}
if (FLAGS_rdma_memory_pool_increase_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
FLAGS_rdma_memory_pool_increase_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
Expand All @@ -222,7 +263,7 @@ void* InitBlockPool(RegisterCallback cb) {
<< RDMA_MEMORY_POOL_MIN_SIZE << ","
<< RDMA_MEMORY_POOL_MAX_SIZE << "]!";
errno = EINVAL;
return NULL;
return false;
}
if (FLAGS_rdma_memory_pool_buckets < RDMA_MEMORY_POOL_MIN_BUCKETS ||
FLAGS_rdma_memory_pool_buckets > RDMA_MEMORY_POOL_MAX_BUCKETS) {
Expand All @@ -231,41 +272,54 @@ void* InitBlockPool(RegisterCallback cb) {
<< RDMA_MEMORY_POOL_MIN_BUCKETS << ","
<< RDMA_MEMORY_POOL_MAX_BUCKETS << "]!";
errno = EINVAL;
return NULL;
return false;
}
// runtime extend block pool only support 1 bucket
if (FLAGS_rdma_memory_pool_max_regions > 1 &&
FLAGS_rdma_memory_pool_buckets > 1) {
LOG(WARNING) << "rdma runtime extend block pool only support 1 bucket";
return false;
}
g_buckets = FLAGS_rdma_memory_pool_buckets;

g_info = new (std::nothrow) GlobalInfo;
if (!g_info) {
return NULL;
return false;
}

for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
g_info->idle_list[i].resize(g_buckets, NULL);
if (g_info->idle_list[i].size() != g_buckets) {
return NULL;
return false;
}
g_info->lock[i].resize(g_buckets, NULL);
if (g_info->lock[i].size() != g_buckets) {
return NULL;
return false;
}
g_info->idle_size[i].resize(g_buckets, 0);
if (g_info->idle_size[i].size() != g_buckets) {
return NULL;
return false;
}
for (size_t j = 0; j < g_buckets; ++j) {
g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
if (!g_info->lock[i][j]) {
return NULL;
return false;
}
}
}

g_dump_mutex = new butil::Mutex;
g_tls_info_mutex = new butil::Mutex;

return ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
BLOCK_DEFAULT);
if (FLAGS_rdma_memory_pool_user_specified_memory) {
return true;
}

if (ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
BLOCK_DEFAULT) != NULL) {
return true;
}
return false;
}

static void* AllocBlockFrom(int block_type) {
Expand Down
10 changes: 9 additions & 1 deletion src/brpc/rdma/block_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,15 @@ typedef uint32_t (*RegisterCallback)(void*, size_t);
// region. It should be the memory registration in brpc. However,
// in block_pool, we just abstract it into a function to get region id.
// Return the first region's address, NULL if failed and errno is set.
void* InitBlockPool(RegisterCallback cb);
bool InitBlockPool(RegisterCallback cb);

// In scenarios where users need to manually specify memory regions (e.g., using
// hugepages or custom memory pools), when
// FLAGS_rdma_memory_pool_user_specified_memory is true, user is responsibility
// of extending memory blocks , this ensuring flexibility for advanced use
Comment thread
chenBright marked this conversation as resolved.
// cases.
void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
int block_type);

// Allocate a buf with length at least @a size (require: size>0)
// Return the address allocated, NULL if failed and errno is set.
Expand Down
5 changes: 5 additions & 0 deletions src/brpc/rdma/rdma_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ static void GlobalRelease() {
}
}

void* UserExtendBlockPool(void* region_base, size_t region_size,
int block_type) {
return ExtendBlockPoolByUser(region_base, region_size, block_type);
}

uint32_t RdmaRegisterMemory(void* buf, size_t size) {
// Register the memory as callback in block_pool
// The thread-safety should be guaranteed by the caller
Expand Down
12 changes: 6 additions & 6 deletions test/brpc_block_pool_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ TEST_F(BlockPoolTest, single_thread) {
FLAGS_rdma_memory_pool_increase_size_mb = 1024;
FLAGS_rdma_memory_pool_max_regions = 16;
FLAGS_rdma_memory_pool_buckets = 4;
EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL);
EXPECT_TRUE(InitBlockPool(DummyCallback));

size_t num = 1024;
void* buf[num];
Expand Down Expand Up @@ -108,7 +108,7 @@ TEST_F(BlockPoolTest, multiple_thread) {
FLAGS_rdma_memory_pool_increase_size_mb = 1024;
FLAGS_rdma_memory_pool_max_regions = 16;
FLAGS_rdma_memory_pool_buckets = 4;
EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL);
EXPECT_TRUE(InitBlockPool(DummyCallback));

uintptr_t thread_num = 32;
bthread_t tid[thread_num];
Expand All @@ -130,7 +130,7 @@ TEST_F(BlockPoolTest, extend) {
FLAGS_rdma_memory_pool_increase_size_mb = 64;
FLAGS_rdma_memory_pool_max_regions = 16;
FLAGS_rdma_memory_pool_buckets = 1;
EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL);
EXPECT_TRUE(InitBlockPool(DummyCallback));

EXPECT_EQ(1, GetRegionNum());
size_t num = 15 * 64 * 1024 * 1024 / GetBlockSize(2);
Expand All @@ -153,7 +153,7 @@ TEST_F(BlockPoolTest, memory_not_enough) {
FLAGS_rdma_memory_pool_increase_size_mb = 64;
FLAGS_rdma_memory_pool_max_regions = 2;
FLAGS_rdma_memory_pool_buckets = 1;
EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL);
EXPECT_TRUE(InitBlockPool(DummyCallback));

EXPECT_EQ(1, GetRegionNum());
size_t num = 64 * 1024 * 1024 / GetBlockSize(2);
Expand All @@ -179,7 +179,7 @@ TEST_F(BlockPoolTest, invalid_use) {
FLAGS_rdma_memory_pool_increase_size_mb = 64;
FLAGS_rdma_memory_pool_max_regions = 2;
FLAGS_rdma_memory_pool_buckets = 1;
EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL);
EXPECT_TRUE(InitBlockPool(DummyCallback));

void* buf = AllocBlock(0);
EXPECT_EQ(NULL, buf);
Expand All @@ -201,7 +201,7 @@ TEST_F(BlockPoolTest, dump_info) {
FLAGS_rdma_memory_pool_increase_size_mb = 64;
FLAGS_rdma_memory_pool_max_regions = 2;
FLAGS_rdma_memory_pool_buckets = 4;
EXPECT_TRUE(InitBlockPool(DummyCallback) != NULL);
EXPECT_TRUE(InitBlockPool(DummyCallback));
DumpMemoryPoolInfo(std::cout);
void* buf = AllocBlock(8192);
DumpMemoryPoolInfo(std::cout);
Expand Down
Loading