Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 115 additions & 27 deletions data_store_service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1371,15 +1371,116 @@ RangeSliceBatchPlan DataStoreServiceClient::PrepareRangeSliceBatches(
return plan;
}

/**
* @brief Dispatches range slice batches from multiple plans, batching segments
* together up to MAX_WRITE_BATCH_SIZE.
*
* All plans must share the same kv_table_name, kv_partition_id, and version.
* Segments from all plans are merged into batches and dispatched via
* BatchWriteRecords calls, reducing the number of RPC calls.
*
* @param kv_table_name The KV table name (must be same for all plans)
* @param kv_partition_id The partition ID (must be same for all plans)
* @param version The version (must be same for all plans)
* @param plans Vector of RangeSliceBatchPlan to dispatch
* @param sync_concurrent SyncConcurrentRequest for concurrency control
*/
void DataStoreServiceClient::DispatchRangeSliceBatches(
std::string_view kv_table_name,
int32_t kv_partition_id,
uint64_t version,
const RangeSliceBatchPlan &plan,
const std::vector<RangeSliceBatchPlan> &plans,
SyncConcurrentRequest *sync_concurrent)
{
uint32_t data_shard_id = GetShardIdByPartitionId(kv_partition_id, false);
for (uint32_t i = 0; i < plan.segment_cnt; ++i)

// Initialize batch vectors
std::vector<std::string_view> keys;
std::vector<std::string_view> records;
std::vector<uint64_t> records_ts;
std::vector<uint64_t> records_ttl;
std::vector<WriteOpType> op_types;

// Estimate total segments across all plans
size_t total_segments = 0;
for (const auto &plan : plans)
{
total_segments += plan.segment_cnt;
}

keys.reserve(total_segments);
records.reserve(total_segments);
records_ts.reserve(total_segments);
records_ttl.reserve(total_segments);
op_types.reserve(total_segments);

size_t write_batch_size = 0;
constexpr size_t overhead_per_segment = 20; // records_ts (8) + records_ttl (8) + op_types (4)

// Iterate through all plans and collect segments
for (const auto &plan : plans)
{
for (uint32_t i = 0; i < plan.segment_cnt; ++i)
{
size_t key_size = plan.segment_keys[i].size();
size_t record_size = plan.segment_records[i].size();
size_t segment_total_size = key_size + record_size + overhead_per_segment;

// If adding this segment would exceed MAX_WRITE_BATCH_SIZE and batch is non-empty, dispatch current batch
if (write_batch_size + segment_total_size >= MAX_WRITE_BATCH_SIZE && keys.size() > 0)
{
// Concurrency control: wait if limit reached, then increment counter
{
std::unique_lock<bthread::Mutex> lk(sync_concurrent->mux_);
while (sync_concurrent->unfinished_request_cnt_ >=
SyncConcurrentRequest::max_flying_write_count)
{
sync_concurrent->cv_.wait(lk);
}
sync_concurrent->unfinished_request_cnt_++;
}

// Dispatch current batch
BatchWriteRecords(kv_table_name,
kv_partition_id,
data_shard_id,
std::move(keys),
std::move(records),
std::move(records_ts),
std::move(records_ttl),
std::move(op_types),
true,
sync_concurrent,
SyncConcurrentRequestCallback,
1, // parts_cnt_per_key
1); // parts_cnt_per_record

// Clear and re-reserve for next batch
keys.clear();
records.clear();
records_ts.clear();
records_ttl.clear();
op_types.clear();
keys.reserve(total_segments);
records.reserve(total_segments);
records_ts.reserve(total_segments);
records_ttl.reserve(total_segments);
op_types.reserve(total_segments);
write_batch_size = 0;
}

// Append to batch vectors
keys.emplace_back(plan.segment_keys[i]);
records.emplace_back(plan.segment_records[i]);
records_ts.emplace_back(version);
records_ttl.emplace_back(0); // no TTL for range slices
op_types.emplace_back(WriteOpType::PUT);
write_batch_size += segment_total_size;
}
}

// Dispatch final batch if vectors are non-empty
if (keys.size() > 0)
{
// Concurrency control: wait if limit reached, then increment counter
{
Expand All @@ -1391,21 +1492,7 @@ void DataStoreServiceClient::DispatchRangeSliceBatches(
}
sync_concurrent->unfinished_request_cnt_++;
}

// Build vectors for BatchWriteRecords
std::vector<std::string_view> keys;
std::vector<std::string_view> records;
std::vector<uint64_t> records_ts;
std::vector<uint64_t> records_ttl;
std::vector<WriteOpType> op_types;

keys.emplace_back(plan.segment_keys[i]);
records.emplace_back(plan.segment_records[i]);
records_ts.emplace_back(version);
records_ttl.emplace_back(0); // no TTL for range slices
op_types.emplace_back(WriteOpType::PUT);

// Dispatch the batch

BatchWriteRecords(kv_table_name,
kv_partition_id,
data_shard_id,
Expand Down Expand Up @@ -1612,10 +1699,13 @@ bool DataStoreServiceClient::UpdateRangeSlices(
sync_concurrent_request_pool_.NextObject();
PoolableGuard slice_guard(slice_sync_concurrent);
slice_sync_concurrent->Reset();
std::vector<RangeSliceBatchPlan> slice_plans;
slice_plans.emplace_back(std::move(slice_plan));
const uint32_t segment_cnt = slice_plans[0].segment_cnt;
DispatchRangeSliceBatches(kv_range_slices_table_name,
KvPartitionIdOf(table_name),
version,
slice_plan,
slice_plans,
slice_sync_concurrent);
// 3- Wait for slice requests to complete. Make sure meta data is updated
// after all slice info is written.
Expand Down Expand Up @@ -1644,7 +1734,7 @@ bool DataStoreServiceClient::UpdateRangeSlices(
partition_id,
range_version,
version,
slice_plan.segment_cnt,
segment_cnt,
meta_acc);

SyncConcurrentRequest *meta_sync_concurrent =
Expand Down Expand Up @@ -1736,14 +1826,12 @@ bool DataStoreServiceClient::UpsertRanges(
slice_sync_concurrent->Reset();

int32_t kv_partition_id = KvPartitionIdOf(table_name);
for (size_t i = 0; i < slice_plans.size(); ++i)
{
DispatchRangeSliceBatches(kv_range_slices_table_name,
kv_partition_id,
version,
slice_plans[i],
slice_sync_concurrent);
}
// Call DispatchRangeSliceBatches once with all plans
DispatchRangeSliceBatches(kv_range_slices_table_name,
kv_partition_id,
version,
slice_plans,
slice_sync_concurrent);

// 3- Wait for slice requests to complete
{
Expand Down
2 changes: 1 addition & 1 deletion data_store_service_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
void DispatchRangeSliceBatches(std::string_view kv_table_name,
int32_t kv_partition_id,
uint64_t version,
const RangeSliceBatchPlan &plan,
const std::vector<RangeSliceBatchPlan> &plans,
SyncConcurrentRequest *sync_concurrent);

/**
Expand Down