From 5f1b7293f50dd08a3f9ac288d45ce2db25819967 Mon Sep 17 00:00:00 2001 From: liunyl Date: Mon, 24 Nov 2025 08:04:12 +0000 Subject: [PATCH] put slices in different ranges into a single batch write --- data_store_service_client.cpp | 142 +++++++++++++++++++++++++++------- data_store_service_client.h | 2 +- 2 files changed, 116 insertions(+), 28 deletions(-) diff --git a/data_store_service_client.cpp b/data_store_service_client.cpp index 8726611..a2bf0bc 100644 --- a/data_store_service_client.cpp +++ b/data_store_service_client.cpp @@ -1371,15 +1371,116 @@ RangeSliceBatchPlan DataStoreServiceClient::PrepareRangeSliceBatches( return plan; } +/** + * @brief Dispatches range slice batches from multiple plans, batching segments + * together up to MAX_WRITE_BATCH_SIZE. + * + * All plans must share the same kv_table_name, kv_partition_id, and version. + * Segments from all plans are merged into batches and dispatched via + * BatchWriteRecords calls, reducing the number of RPC calls. + * + * @param kv_table_name The KV table name (must be same for all plans) + * @param kv_partition_id The partition ID (must be same for all plans) + * @param version The version (must be same for all plans) + * @param plans Vector of RangeSliceBatchPlan to dispatch + * @param sync_concurrent SyncConcurrentRequest for concurrency control + */ void DataStoreServiceClient::DispatchRangeSliceBatches( std::string_view kv_table_name, int32_t kv_partition_id, uint64_t version, - const RangeSliceBatchPlan &plan, + const std::vector &plans, SyncConcurrentRequest *sync_concurrent) { uint32_t data_shard_id = GetShardIdByPartitionId(kv_partition_id, false); - for (uint32_t i = 0; i < plan.segment_cnt; ++i) + + // Initialize batch vectors + std::vector keys; + std::vector records; + std::vector records_ts; + std::vector records_ttl; + std::vector op_types; + + // Estimate total segments across all plans + size_t total_segments = 0; + for (const auto &plan : plans) + { + total_segments += plan.segment_cnt; + } + + keys.reserve(total_segments); + records.reserve(total_segments); + records_ts.reserve(total_segments); + records_ttl.reserve(total_segments); + op_types.reserve(total_segments); + + size_t write_batch_size = 0; + constexpr size_t overhead_per_segment = 20; // records_ts (8) + records_ttl (8) + op_types (4) + + // Iterate through all plans and collect segments + for (const auto &plan : plans) + { + for (uint32_t i = 0; i < plan.segment_cnt; ++i) + { + size_t key_size = plan.segment_keys[i].size(); + size_t record_size = plan.segment_records[i].size(); + size_t segment_total_size = key_size + record_size + overhead_per_segment; + + // If adding this segment would exceed MAX_WRITE_BATCH_SIZE and batch is non-empty, dispatch current batch + if (write_batch_size + segment_total_size >= MAX_WRITE_BATCH_SIZE && keys.size() > 0) + { + // Concurrency control: wait if limit reached, then increment counter + { + std::unique_lock lk(sync_concurrent->mux_); + while (sync_concurrent->unfinished_request_cnt_ >= + SyncConcurrentRequest::max_flying_write_count) + { + sync_concurrent->cv_.wait(lk); + } + sync_concurrent->unfinished_request_cnt_++; + } + + // Dispatch current batch + BatchWriteRecords(kv_table_name, + kv_partition_id, + data_shard_id, + std::move(keys), + std::move(records), + std::move(records_ts), + std::move(records_ttl), + std::move(op_types), + true, + sync_concurrent, + SyncConcurrentRequestCallback, + 1, // parts_cnt_per_key + 1); // parts_cnt_per_record + + // Clear and re-reserve for next batch + keys.clear(); + records.clear(); + records_ts.clear(); + records_ttl.clear(); + op_types.clear(); + keys.reserve(total_segments); + records.reserve(total_segments); + records_ts.reserve(total_segments); + records_ttl.reserve(total_segments); + op_types.reserve(total_segments); + write_batch_size = 0; + } + + // Append to batch vectors + keys.emplace_back(plan.segment_keys[i]); + records.emplace_back(plan.segment_records[i]); + records_ts.emplace_back(version); + records_ttl.emplace_back(0); // no TTL for range slices + op_types.emplace_back(WriteOpType::PUT); + write_batch_size += segment_total_size; + } + } + + // Dispatch final batch if vectors are non-empty + if (keys.size() > 0) { // Concurrency control: wait if limit reached, then increment counter { @@ -1391,21 +1492,7 @@ void DataStoreServiceClient::DispatchRangeSliceBatches( } sync_concurrent->unfinished_request_cnt_++; } - - // Build vectors for BatchWriteRecords - std::vector keys; - std::vector records; - std::vector records_ts; - std::vector records_ttl; - std::vector op_types; - - keys.emplace_back(plan.segment_keys[i]); - records.emplace_back(plan.segment_records[i]); - records_ts.emplace_back(version); - records_ttl.emplace_back(0); // no TTL for range slices - op_types.emplace_back(WriteOpType::PUT); - - // Dispatch the batch + BatchWriteRecords(kv_table_name, kv_partition_id, data_shard_id, @@ -1612,10 +1699,13 @@ bool DataStoreServiceClient::UpdateRangeSlices( sync_concurrent_request_pool_.NextObject(); PoolableGuard slice_guard(slice_sync_concurrent); slice_sync_concurrent->Reset(); + std::vector slice_plans; + slice_plans.emplace_back(std::move(slice_plan)); + const uint32_t segment_cnt = slice_plans[0].segment_cnt; DispatchRangeSliceBatches(kv_range_slices_table_name, KvPartitionIdOf(table_name), version, - slice_plan, + slice_plans, slice_sync_concurrent); // 3- Wait for slice requests to complete. Make sure meta data is updated // after all slice info is written. @@ -1644,7 +1734,7 @@ bool DataStoreServiceClient::UpdateRangeSlices( partition_id, range_version, version, - slice_plan.segment_cnt, + segment_cnt, meta_acc); SyncConcurrentRequest *meta_sync_concurrent = @@ -1736,14 +1826,12 @@ bool DataStoreServiceClient::UpsertRanges( slice_sync_concurrent->Reset(); int32_t kv_partition_id = KvPartitionIdOf(table_name); - for (size_t i = 0; i < slice_plans.size(); ++i) - { - DispatchRangeSliceBatches(kv_range_slices_table_name, - kv_partition_id, - version, - slice_plans[i], - slice_sync_concurrent); - } + // Call DispatchRangeSliceBatches once with all plans + DispatchRangeSliceBatches(kv_range_slices_table_name, + kv_partition_id, + version, + slice_plans, + slice_sync_concurrent); // 3- Wait for slice requests to complete { diff --git a/data_store_service_client.h b/data_store_service_client.h index 0d9a959..2f82a93 100644 --- a/data_store_service_client.h +++ b/data_store_service_client.h @@ -596,7 +596,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler void DispatchRangeSliceBatches(std::string_view kv_table_name, int32_t kv_partition_id, uint64_t version, - const RangeSliceBatchPlan &plan, + const std::vector &plans, SyncConcurrentRequest *sync_concurrent); /**