From db5d25410ff9ec8b9229aee63db836b569d01861 Mon Sep 17 00:00:00 2001 From: liunyl Date: Fri, 21 Nov 2025 03:10:29 +0000 Subject: [PATCH 1/2] parallelize write requests in upsert ranges --- data_store_service_client.cpp | 498 ++++++++++++++++++++++++++-------- data_store_service_client.h | 71 +++++ 2 files changed, 463 insertions(+), 106 deletions(-) diff --git a/data_store_service_client.cpp b/data_store_service_client.cpp index 97e8c61..3a1daad 100644 --- a/data_store_service_client.cpp +++ b/data_store_service_client.cpp @@ -1289,45 +1289,27 @@ void DataStoreServiceClient::UpdateEncodedRangeSliceKey( sizeof(new_segment_id)); } -/** - * @brief Updates range slices for a table partition. - * - * Stores range slice information by segmenting the slices into manageable - * chunks and writing them to the KV storage system. Handles slice serialization - * with proper key encoding and batch size management. Also updates the range - * information with the new version and segment count. Uses both local and - * remote storage paths based on configuration. - * - * @param table_name The table name for the range slices. - * @param version The version number for the slices. - * @param range_start_key The starting key for the range. - * @param slices Vector of store slices to update. - * @param partition_id The partition ID for the range. - * @param range_version The version of the range. - * @return true if all slices are updated successfully, false if any operation - * fails. - */ -bool DataStoreServiceClient::UpdateRangeSlices( +RangeSliceBatchPlan DataStoreServiceClient::PrepareRangeSliceBatches( const txservice::TableName &table_name, uint64_t version, - txservice::TxKey range_start_key, - std::vector slices, - int32_t partition_id, - uint64_t range_version) + const std::vector &slices, + int32_t partition_id) { auto catalog_factory = GetCatalogFactory(table_name.Engine()); assert(catalog_factory != nullptr); - // 1- store range_slices info into {kv_range_slices_table_name} - std::vector segment_keys; - std::vector segment_records; - uint32_t segment_cnt = 0; + RangeSliceBatchPlan plan; + plan.segment_cnt = 0; + + // Estimate capacity based on slices size + plan.segment_keys.reserve(slices.size() / 10 + 1); // Rough estimate + plan.segment_records.reserve(slices.size() / 10 + 1); std::string segment_key = - EncodeRangeSliceKey(table_name, partition_id, segment_cnt); + EncodeRangeSliceKey(table_name, partition_id, plan.segment_cnt); std::string segment_record; size_t batch_size = segment_key.size() + sizeof(uint64_t); - size_t max_segment_size = 1024 * 1024; + size_t max_segment_size = 1024 * 1024; // 1 MB segment_record.reserve(max_segment_size - segment_key.size()); segment_record.append(reinterpret_cast(&version), sizeof(uint64_t)); @@ -1347,12 +1329,12 @@ bool DataStoreServiceClient::UpdateRangeSlices( if (batch_size >= max_segment_size) { - segment_keys.emplace_back(std::move(segment_key)); - segment_records.emplace_back(std::move(segment_record)); + plan.segment_keys.emplace_back(std::move(segment_key)); + plan.segment_records.emplace_back(std::move(segment_record)); - segment_cnt++; + plan.segment_cnt++; segment_key = - EncodeRangeSliceKey(table_name, partition_id, segment_cnt); + EncodeRangeSliceKey(table_name, partition_id, plan.segment_cnt); batch_size = segment_key.size(); segment_record.clear(); @@ -1369,40 +1351,54 @@ bool DataStoreServiceClient::UpdateRangeSlices( segment_record.append(reinterpret_cast(&slice_size), sizeof(uint32_t)); } + if (segment_record.size() > 0) { - segment_keys.emplace_back(std::move(segment_key)); - segment_records.emplace_back(std::move(segment_record)); - segment_cnt++; + plan.segment_keys.emplace_back(std::move(segment_key)); + plan.segment_records.emplace_back(std::move(segment_record)); + plan.segment_cnt++; } - assert(segment_keys.size() == segment_cnt); + assert(plan.segment_keys.size() == plan.segment_cnt); + return plan; +} - // 2- write the segments to storage - // Calculate kv_partition_id based on table_name. - int32_t kv_partition_id = KvPartitionIdOf(table_name); +void DataStoreServiceClient::DispatchRangeSliceBatches( + std::string_view kv_table_name, + int32_t kv_partition_id, + uint64_t version, + const RangeSliceBatchPlan &plan, + SyncConcurrentRequest *sync_concurrent) +{ uint32_t data_shard_id = GetShardIdByPartitionId(kv_partition_id, false); - std::vector keys; - std::vector records; - std::vector records_ts; - std::vector records_ttl; - std::vector op_types; - SyncCallbackData *callback_data = sync_callback_data_pool_.NextObject(); - PoolableGuard guard(callback_data); - callback_data->Reset(); - - for (size_t i = 0; i < segment_keys.size(); ++i) + for (uint32_t i = 0; i < plan.segment_cnt; ++i) { - keys.emplace_back(segment_keys[i]); - records.emplace_back(segment_records[i]); + // Concurrency control: wait if limit reached, then increment counter + { + std::unique_lock lk(sync_concurrent->mux_); + while (sync_concurrent->unfinished_request_cnt_ >= + SyncConcurrentRequest::max_flying_write_count) + { + sync_concurrent->cv_.wait(lk); + } + sync_concurrent->unfinished_request_cnt_++; + } + + // Build vectors for BatchWriteRecords + std::vector keys; + std::vector records; + std::vector records_ts; + std::vector records_ttl; + std::vector op_types; + + keys.emplace_back(plan.segment_keys[i]); + records.emplace_back(plan.segment_records[i]); records_ts.emplace_back(version); - records_ttl.emplace_back(0); // no ttl + records_ttl.emplace_back(0); // no TTL for range slices op_types.emplace_back(WriteOpType::PUT); - // For segments are splitted based on MAX_WRITE_BATCH_SIZE, execute - // one write request for each segment record. - callback_data->Reset(); - BatchWriteRecords(kv_range_slices_table_name, + // Dispatch the batch + BatchWriteRecords(kv_table_name, kv_partition_id, data_shard_id, std::move(keys), @@ -1411,53 +1407,262 @@ bool DataStoreServiceClient::UpdateRangeSlices( std::move(records_ttl), std::move(op_types), true, - callback_data, - &SyncCallback); - callback_data->Wait(); - keys.clear(); - records.clear(); - records_ts.clear(); - records_ttl.clear(); - op_types.clear(); + sync_concurrent, + SyncConcurrentRequestCallback, + 1, // parts_cnt_per_key + 1); // parts_cnt_per_record + } +} - if (callback_data->Result().error_code() != - EloqDS::remote::DataStoreError::NO_ERROR) +void DataStoreServiceClient::EnqueueRangeMetadataRecord( + const txservice::CatalogFactory *catalog_factory, + const txservice::TableName &table_name, + const txservice::TxKey &range_start_key, + int32_t partition_id, + uint64_t range_version, + uint64_t version, + uint32_t segment_cnt, + RangeMetadataAccumulator &accumulator) +{ + // Compute kv_table_name and kv_partition_id + std::string kv_table_name = std::string(table_name.StringView()); + int32_t kv_partition_id = KvPartitionIdOf(table_name); + + // Encode key and value + std::string key_str = EncodeRangeKey(catalog_factory, table_name, range_start_key); + std::string rec_str = EncodeRangeValue(partition_id, range_version, version, segment_cnt); + + // Get or create entry in accumulator + auto key = std::make_pair(kv_table_name, kv_partition_id); + auto &records_vec = accumulator.records_by_table_partition[key]; + + // Create and append record + RangeMetadataRecord record; + record.encoded_key = std::move(key_str); + record.encoded_value = std::move(rec_str); + record.version = version; + records_vec.emplace_back(std::move(record)); +} + +void DataStoreServiceClient::DispatchRangeMetadataBatches( + std::string_view kv_table_name, + const RangeMetadataAccumulator &accumulator, + SyncConcurrentRequest *sync_concurrent, + size_t max_batch_size) +{ + for (const auto &[table_partition, records_vec] : accumulator.records_by_table_partition) + { + const std::string &kv_table_name_str = table_partition.first; + int32_t kv_partition_id = table_partition.second; + uint32_t data_shard_id = GetShardIdByPartitionId(kv_partition_id, false); + + // Use kv_table_name parameter if provided, otherwise use kv_table_name_str + // For consistency, prefer the parameter + std::string_view target_table_name = kv_table_name.empty() ? kv_table_name_str : kv_table_name; + + // Initialize batch vectors + std::vector keys; + std::vector records; + std::vector records_ts; + std::vector records_ttl; + std::vector op_types; + + keys.reserve(records_vec.size()); + records.reserve(records_vec.size()); + records_ts.reserve(records_vec.size()); + records_ttl.reserve(records_vec.size()); + op_types.reserve(records_vec.size()); + + size_t write_batch_size = 0; + + for (const auto &record : records_vec) { - LOG(WARNING) << "UpdateRangeSlices: Failed to write segments."; - return false; + size_t key_size = record.encoded_key.size(); + size_t value_size = record.encoded_value.size(); + // Overhead: records_ts (8 bytes) + records_ttl (8 bytes) + op_types (4 bytes) ≈ 20 bytes + constexpr size_t overhead_per_record = 20; + size_t record_total_size = key_size + value_size + overhead_per_record; + + // If adding this record would exceed max_batch_size and batch is non-empty, dispatch current batch + if (write_batch_size + record_total_size >= max_batch_size && keys.size() > 0) + { + // Concurrency control: wait if limit reached, then increment counter + { + std::unique_lock lk(sync_concurrent->mux_); + while (sync_concurrent->unfinished_request_cnt_ >= + SyncConcurrentRequest::max_flying_write_count) + { + sync_concurrent->cv_.wait(lk); + } + sync_concurrent->unfinished_request_cnt_++; + } + + // Dispatch current batch + BatchWriteRecords(target_table_name, + kv_partition_id, + data_shard_id, + std::move(keys), + std::move(records), + std::move(records_ts), + std::move(records_ttl), + std::move(op_types), + true, + sync_concurrent, + SyncConcurrentRequestCallback, + 1, // parts_cnt_per_key + 1); // parts_cnt_per_record + + // Clear and re-reserve for next batch + keys.clear(); + records.clear(); + records_ts.clear(); + records_ttl.clear(); + op_types.clear(); + keys.reserve(records_vec.size()); + records.reserve(records_vec.size()); + records_ts.reserve(records_vec.size()); + records_ttl.reserve(records_vec.size()); + op_types.reserve(records_vec.size()); + write_batch_size = 0; + } + + // Append to batch vectors + keys.emplace_back(record.encoded_key); + records.emplace_back(record.encoded_value); + records_ts.emplace_back(record.version); + records_ttl.emplace_back(0); // no TTL for range metadata + op_types.emplace_back(WriteOpType::PUT); + write_batch_size += record_total_size; + } + + // Dispatch final batch for this table/partition if vectors are non-empty + if (keys.size() > 0) + { + // Concurrency control: wait if limit reached, then increment counter + { + std::unique_lock lk(sync_concurrent->mux_); + while (sync_concurrent->unfinished_request_cnt_ >= + SyncConcurrentRequest::max_flying_write_count) + { + sync_concurrent->cv_.wait(lk); + } + sync_concurrent->unfinished_request_cnt_++; + } + + BatchWriteRecords(target_table_name, + kv_partition_id, + data_shard_id, + std::move(keys), + std::move(records), + std::move(records_ts), + std::move(records_ttl), + std::move(op_types), + true, + sync_concurrent, + SyncConcurrentRequestCallback, + 1, // parts_cnt_per_key + 1); // parts_cnt_per_record } } +} - // 3- store range info into {kv_range_table_name} - callback_data->Reset(); +/** + * @brief Updates range slices for a table partition. + * + * Stores range slice information by segmenting the slices into manageable + * chunks and writing them to the KV storage system. Handles slice serialization + * with proper key encoding and batch size management. Also updates the range + * information with the new version and segment count. Uses both local and + * remote storage paths based on configuration. + * + * @param table_name The table name for the range slices. + * @param version The version number for the slices. + * @param range_start_key The starting key for the range. + * @param slices Vector of store slices to update. + * @param partition_id The partition ID for the range. + * @param range_version The version of the range. + * @return true if all slices are updated successfully, false if any operation + * fails. + */ +bool DataStoreServiceClient::UpdateRangeSlices( + const txservice::TableName &table_name, + uint64_t version, + txservice::TxKey range_start_key, + std::vector slices, + int32_t partition_id, + uint64_t range_version) +{ + auto catalog_factory = GetCatalogFactory(table_name.Engine()); + assert(catalog_factory != nullptr); - std::string key_str = - EncodeRangeKey(catalog_factory, table_name, range_start_key); - std::string rec_str = - EncodeRangeValue(partition_id, range_version, version, segment_cnt); + // 1- Prepare slice batches + auto slice_plan = + PrepareRangeSliceBatches(table_name, version, slices, partition_id); + + // 2- Dispatch slice batches concurrently + SyncConcurrentRequest *slice_sync_concurrent = + sync_concurrent_request_pool_.NextObject(); + PoolableGuard slice_guard(slice_sync_concurrent); + slice_sync_concurrent->Reset(); + DispatchRangeSliceBatches(kv_range_slices_table_name, + KvPartitionIdOf(table_name), + version, + slice_plan, + slice_sync_concurrent); + + // 3- Enqueue and dispatch metadata record concurrently + RangeMetadataAccumulator meta_acc; + EnqueueRangeMetadataRecord(catalog_factory, + table_name, + range_start_key, + partition_id, + range_version, + version, + slice_plan.segment_cnt, + meta_acc); + + SyncConcurrentRequest *meta_sync_concurrent = + sync_concurrent_request_pool_.NextObject(); + PoolableGuard meta_guard(meta_sync_concurrent); + meta_sync_concurrent->Reset(); + DispatchRangeMetadataBatches(kv_range_table_name, + meta_acc, + meta_sync_concurrent); + + // 4- Wait for slice requests to complete + { + std::unique_lock lk(slice_sync_concurrent->mux_); + slice_sync_concurrent->all_request_started_ = true; + while (slice_sync_concurrent->unfinished_request_cnt_ != 0) + { + slice_sync_concurrent->cv_.wait(lk); + } + } - keys.emplace_back(key_str); - records.emplace_back(rec_str); + // 5- Wait for metadata requests to complete + { + std::unique_lock lk(meta_sync_concurrent->mux_); + meta_sync_concurrent->all_request_started_ = true; + while (meta_sync_concurrent->unfinished_request_cnt_ != 0) + { + meta_sync_concurrent->cv_.wait(lk); + } + } - records_ts.emplace_back(version); - records_ttl.emplace_back(0); // no ttl - op_types.emplace_back(WriteOpType::PUT); - BatchWriteRecords(kv_range_table_name, - kv_partition_id, - data_shard_id, - std::move(keys), - std::move(records), - std::move(records_ts), - std::move(records_ttl), - std::move(op_types), - true, - callback_data, - &SyncCallback); - callback_data->Wait(); - if (callback_data->Result().error_code() != - EloqDS::remote::DataStoreError::NO_ERROR) + // 6- Check for errors + if (slice_sync_concurrent->result_.error_code() != + remote::DataStoreError::NO_ERROR) { - LOG(WARNING) << "UpdateRangeSlices: Failed to write range info."; + LOG(WARNING) << "UpdateRangeSlices: Failed to write segments. Error: " + << slice_sync_concurrent->result_.error_msg(); + return false; + } + + if (meta_sync_concurrent->result_.error_code() != + remote::DataStoreError::NO_ERROR) + { + LOG(WARNING) << "UpdateRangeSlices: Failed to write range info. Error: " + << meta_sync_concurrent->result_.error_msg(); return false; } @@ -1467,10 +1672,10 @@ bool DataStoreServiceClient::UpdateRangeSlices( /** * @brief Upserts range information for a table. * - * Updates range slices for multiple ranges by calling UpdateRangeSlices for - * each range in the provided vector. After updating all ranges, flushes the - * range table data to ensure persistence. Validates that the table name is not - * empty and handles errors from individual range updates. + * Updates range slices for multiple ranges by batching metadata across all + * ranges and parallelizing slice writes. After updating all ranges, flushes + * the range table data to ensure persistence. Validates that the table name + * is not empty and handles errors from individual range updates. * * @param table_name The table name for the ranges. * @param range_info Vector of split range information to upsert. @@ -1485,19 +1690,100 @@ bool DataStoreServiceClient::UpsertRanges( { assert(table_name.StringView() != txservice::empty_sv); + if (range_info.empty()) + { + return true; + } + + auto catalog_factory = GetCatalogFactory(table_name.Engine()); + assert(catalog_factory != nullptr); + + // 1- First pass: Prepare slice batches and accumulate metadata for all ranges + std::vector slice_plans; + slice_plans.reserve(range_info.size()); + RangeMetadataAccumulator meta_acc; + for (auto &range : range_info) { - if (!UpdateRangeSlices(table_name, - version, - std::move(range.start_key_), - std::move(range.slices_), - range.partition_id_, - version)) + // Prepare slice batches for this range + auto slice_plan = PrepareRangeSliceBatches( + table_name, version, range.slices_, range.partition_id_); + slice_plans.emplace_back(std::move(slice_plan)); + + // Enqueue metadata record for this range + EnqueueRangeMetadataRecord(catalog_factory, + table_name, + range.start_key_, + range.partition_id_, + version, // range_version (using version for now) + version, + slice_plans.back().segment_cnt, + meta_acc); + } + + // 2- Dispatch metadata batches concurrently (batched by table/partition) + SyncConcurrentRequest *meta_sync_concurrent = + sync_concurrent_request_pool_.NextObject(); + PoolableGuard meta_guard(meta_sync_concurrent); + meta_sync_concurrent->Reset(); + DispatchRangeMetadataBatches(kv_range_table_name, + meta_acc, + meta_sync_concurrent); + + // 3- Dispatch slice batches for all ranges concurrently (shared SyncConcurrentRequest) + SyncConcurrentRequest *slice_sync_concurrent = + sync_concurrent_request_pool_.NextObject(); + PoolableGuard slice_guard(slice_sync_concurrent); + slice_sync_concurrent->Reset(); + + int32_t kv_partition_id = KvPartitionIdOf(table_name); + for (size_t i = 0; i < slice_plans.size(); ++i) + { + DispatchRangeSliceBatches(kv_range_slices_table_name, + kv_partition_id, + version, + slice_plans[i], + slice_sync_concurrent); + } + + // 4- Wait for metadata requests to complete + { + std::unique_lock lk(meta_sync_concurrent->mux_); + meta_sync_concurrent->all_request_started_ = true; + while (meta_sync_concurrent->unfinished_request_cnt_ != 0) { - return false; + meta_sync_concurrent->cv_.wait(lk); } } + // 5- Wait for slice requests to complete + { + std::unique_lock lk(slice_sync_concurrent->mux_); + slice_sync_concurrent->all_request_started_ = true; + while (slice_sync_concurrent->unfinished_request_cnt_ != 0) + { + slice_sync_concurrent->cv_.wait(lk); + } + } + + // 6- Check for errors + if (meta_sync_concurrent->result_.error_code() != + remote::DataStoreError::NO_ERROR) + { + LOG(WARNING) << "UpsertRanges: Failed to write range metadata. Error: " + << meta_sync_concurrent->result_.error_msg(); + return false; + } + + if (slice_sync_concurrent->result_.error_code() != + remote::DataStoreError::NO_ERROR) + { + LOG(WARNING) << "UpsertRanges: Failed to write range slices. Error: " + << slice_sync_concurrent->result_.error_msg(); + return false; + } + + // 7- Flush data SyncCallbackData *callback_data = sync_callback_data_pool_.NextObject(); PoolableGuard guard(callback_data); callback_data->Reset(); diff --git a/data_store_service_client.h b/data_store_service_client.h index 1471d01..0d9a959 100644 --- a/data_store_service_client.h +++ b/data_store_service_client.h @@ -47,6 +47,7 @@ namespace EloqDS struct PartitionFlushState; struct PartitionBatchRequest; struct PartitionCallbackData; +struct SyncConcurrentRequest; class DataStoreServiceClient; class BatchWriteRecordsClosure; class ReadClosure; @@ -58,6 +59,42 @@ class ScanNextClosure; class CreateSnapshotForBackupClosure; class SinglePartitionScanner; +// Range batching helper structs +struct RangeSliceBatchPlan +{ + uint32_t segment_cnt; + std::vector segment_keys; // Owned string buffers + std::vector segment_records; // Owned string buffers + + // Clear method for reuse + void Clear() + { + segment_cnt = 0; + segment_keys.clear(); + segment_records.clear(); + } +}; + +struct RangeMetadataRecord +{ + std::string encoded_key; + std::string encoded_value; + uint64_t version; // Stored separately for records_ts in BatchWriteRecords +}; + +struct RangeMetadataAccumulator +{ + // Key: (kv_table_name, kv_partition_id) as string pair + // Value: vector of metadata records for that table/partition + std::map, + std::vector> records_by_table_partition; + + void Clear() + { + records_by_table_partition.clear(); + } +}; + class DssClusterConfig; typedef void (*DataStoreCallback)(void *data, @@ -547,6 +584,40 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler uint16_t parts_cnt_per_record, uint64_t now); + /** + * Helper methods for range slice batching + */ + RangeSliceBatchPlan PrepareRangeSliceBatches( + const txservice::TableName &table_name, + uint64_t version, + const std::vector &slices, + int32_t partition_id); + + void DispatchRangeSliceBatches(std::string_view kv_table_name, + int32_t kv_partition_id, + uint64_t version, + const RangeSliceBatchPlan &plan, + SyncConcurrentRequest *sync_concurrent); + + /** + * Helper methods for range metadata batching + */ + void EnqueueRangeMetadataRecord( + const txservice::CatalogFactory *catalog_factory, + const txservice::TableName &table_name, + const txservice::TxKey &range_start_key, + int32_t partition_id, + uint64_t range_version, + uint64_t version, + uint32_t segment_cnt, + RangeMetadataAccumulator &accumulator); + + void DispatchRangeMetadataBatches( + std::string_view kv_table_name, + const RangeMetadataAccumulator &accumulator, + SyncConcurrentRequest *sync_concurrent, + size_t max_batch_size = 64 * 1024 * 1024); // 64MB + /** * Delete range and flush data are not frequent calls, all calls are sent * with rpc. From 8cdbbaffc6bee375d61cfedd85e35381a688dc32 Mon Sep 17 00:00:00 2001 From: liunyl Date: Fri, 21 Nov 2025 04:21:14 +0000 Subject: [PATCH 2/2] fix comment --- data_store_service_client.cpp | 93 +++++++++++++++++------------------ 1 file changed, 46 insertions(+), 47 deletions(-) diff --git a/data_store_service_client.cpp b/data_store_service_client.cpp index 3a1daad..ee561ff 100644 --- a/data_store_service_client.cpp +++ b/data_store_service_client.cpp @@ -1609,8 +1609,26 @@ bool DataStoreServiceClient::UpdateRangeSlices( version, slice_plan, slice_sync_concurrent); + // 3- Wait for slice requests to complete. Make sure meta data is updated + // after all slice info is written. + { + std::unique_lock lk(slice_sync_concurrent->mux_); + slice_sync_concurrent->all_request_started_ = true; + while (slice_sync_concurrent->unfinished_request_cnt_ != 0) + { + slice_sync_concurrent->cv_.wait(lk); + } + } - // 3- Enqueue and dispatch metadata record concurrently + if (slice_sync_concurrent->result_.error_code() != + remote::DataStoreError::NO_ERROR) + { + LOG(WARNING) << "UpdateRangeSlices: Failed to write segments. Error: " + << slice_sync_concurrent->result_.error_msg(); + return false; + } + + // 4- Enqueue and dispatch metadata record concurrently RangeMetadataAccumulator meta_acc; EnqueueRangeMetadataRecord(catalog_factory, table_name, @@ -1629,16 +1647,6 @@ bool DataStoreServiceClient::UpdateRangeSlices( meta_acc, meta_sync_concurrent); - // 4- Wait for slice requests to complete - { - std::unique_lock lk(slice_sync_concurrent->mux_); - slice_sync_concurrent->all_request_started_ = true; - while (slice_sync_concurrent->unfinished_request_cnt_ != 0) - { - slice_sync_concurrent->cv_.wait(lk); - } - } - // 5- Wait for metadata requests to complete { std::unique_lock lk(meta_sync_concurrent->mux_); @@ -1650,14 +1658,6 @@ bool DataStoreServiceClient::UpdateRangeSlices( } // 6- Check for errors - if (slice_sync_concurrent->result_.error_code() != - remote::DataStoreError::NO_ERROR) - { - LOG(WARNING) << "UpdateRangeSlices: Failed to write segments. Error: " - << slice_sync_concurrent->result_.error_msg(); - return false; - } - if (meta_sync_concurrent->result_.error_code() != remote::DataStoreError::NO_ERROR) { @@ -1721,16 +1721,7 @@ bool DataStoreServiceClient::UpsertRanges( meta_acc); } - // 2- Dispatch metadata batches concurrently (batched by table/partition) - SyncConcurrentRequest *meta_sync_concurrent = - sync_concurrent_request_pool_.NextObject(); - PoolableGuard meta_guard(meta_sync_concurrent); - meta_sync_concurrent->Reset(); - DispatchRangeMetadataBatches(kv_range_table_name, - meta_acc, - meta_sync_concurrent); - - // 3- Dispatch slice batches for all ranges concurrently (shared SyncConcurrentRequest) + // 2- Dispatch slice batches for all ranges concurrently (shared SyncConcurrentRequest) SyncConcurrentRequest *slice_sync_concurrent = sync_concurrent_request_pool_.NextObject(); PoolableGuard slice_guard(slice_sync_concurrent); @@ -1746,23 +1737,39 @@ bool DataStoreServiceClient::UpsertRanges( slice_sync_concurrent); } - // 4- Wait for metadata requests to complete + // 3- Wait for slice requests to complete { - std::unique_lock lk(meta_sync_concurrent->mux_); - meta_sync_concurrent->all_request_started_ = true; - while (meta_sync_concurrent->unfinished_request_cnt_ != 0) + std::unique_lock lk(slice_sync_concurrent->mux_); + slice_sync_concurrent->all_request_started_ = true; + while (slice_sync_concurrent->unfinished_request_cnt_ != 0) { - meta_sync_concurrent->cv_.wait(lk); + slice_sync_concurrent->cv_.wait(lk); } } + if (slice_sync_concurrent->result_.error_code() != + remote::DataStoreError::NO_ERROR) + { + LOG(WARNING) << "UpsertRanges: Failed to write range slices. Error: " + << slice_sync_concurrent->result_.error_msg(); + return false; + } + + // 4- Dispatch metadata batches concurrently (batched by table/partition) + SyncConcurrentRequest *meta_sync_concurrent = + sync_concurrent_request_pool_.NextObject(); + PoolableGuard meta_guard(meta_sync_concurrent); + meta_sync_concurrent->Reset(); + DispatchRangeMetadataBatches(kv_range_table_name, + meta_acc, + meta_sync_concurrent); - // 5- Wait for slice requests to complete + // 5- Wait for metadata requests to complete { - std::unique_lock lk(slice_sync_concurrent->mux_); - slice_sync_concurrent->all_request_started_ = true; - while (slice_sync_concurrent->unfinished_request_cnt_ != 0) + std::unique_lock lk(meta_sync_concurrent->mux_); + meta_sync_concurrent->all_request_started_ = true; + while (meta_sync_concurrent->unfinished_request_cnt_ != 0) { - slice_sync_concurrent->cv_.wait(lk); + meta_sync_concurrent->cv_.wait(lk); } } @@ -1775,14 +1782,6 @@ bool DataStoreServiceClient::UpsertRanges( return false; } - if (slice_sync_concurrent->result_.error_code() != - remote::DataStoreError::NO_ERROR) - { - LOG(WARNING) << "UpsertRanges: Failed to write range slices. Error: " - << slice_sync_concurrent->result_.error_msg(); - return false; - } - // 7- Flush data SyncCallbackData *callback_data = sync_callback_data_pool_.NextObject(); PoolableGuard guard(callback_data);