From 0917015f5fff480bba57abc91222e2648624e0e7 Mon Sep 17 00:00:00 2001 From: yi-xmu Date: Thu, 5 Mar 2026 18:47:08 +0800 Subject: [PATCH] Update FillStoreSliceCc --- tx_service/include/cc/cc_req_misc.h | 37 +++--- tx_service/include/cc/template_cc_map.h | 9 +- tx_service/src/cc/cc_req_misc.cpp | 149 +++++++++--------------- tx_service/src/cc/range_slice.cpp | 7 +- 4 files changed, 79 insertions(+), 123 deletions(-) diff --git a/tx_service/include/cc/cc_req_misc.h b/tx_service/include/cc/cc_req_misc.h index f19e7a95..d6de980c 100644 --- a/tx_service/include/cc/cc_req_misc.h +++ b/tx_service/include/cc/cc_req_misc.h @@ -417,10 +417,9 @@ struct FillStoreSliceCc : public CcRequestBase bool Execute(CcShard &ccs) override; - std::deque &SliceData(uint16_t core_id) + std::deque &SliceData() { - assert(core_id < partitioned_slice_data_.size()); - return partitioned_slice_data_[core_id]; + return slice_data_; } void AddDataItem(TxKey key, @@ -428,8 +427,8 @@ struct FillStoreSliceCc : public CcRequestBase uint64_t version_ts, bool is_deleted); - bool SetFinish(CcShard *cc_shard); - bool SetError(CcErrorCode err_code); + void SetFinish(CcShard *cc_shard); + void SetError(CcErrorCode err_code); void SetKvFinish(bool success); @@ -438,12 +437,9 @@ struct FillStoreSliceCc : public CcRequestBase assert(err_code != CcErrorCode::NO_ERROR); DLOG(ERROR) << "Abort this FillStoreSliceCc request with error: " << CcErrorMessage(err_code); - bool finish_all = SetError(err_code); + SetError(err_code); // Recycle request - if (finish_all) - { - Free(); - } + Free(); } const TableName &TblName() const @@ -476,17 +472,16 @@ struct FillStoreSliceCc : public CcRequestBase force_load_ = force_load; } - size_t NextIndex(size_t core_idx) const + size_t NextIndex() const { - size_t next_idx = next_idxs_[core_idx]; - assert(next_idx <= partitioned_slice_data_[core_idx].size()); - return next_idx; + assert(next_idx_ <= slice_data_.size()); + return next_idx_; } - void SetNextIndex(size_t core_idx, size_t index) + void SetNextIndex(size_t index) { - assert(index <= partitioned_slice_data_[core_idx].size()); - next_idxs_[core_idx] = index; + assert(index <= slice_data_.size()); + next_idx_ = index; } NodeGroupId NodeGroup() const @@ -524,6 +519,8 @@ struct FillStoreSliceCc : public CcRequestBase return true; } + int32_t PartitionId() const; + metrics::TimePoint start_; private: @@ -531,13 +528,11 @@ struct FillStoreSliceCc : public CcRequestBase NodeGroupId cc_ng_id_; int64_t cc_ng_term_; bool force_load_; - uint16_t finish_cnt_; - uint16_t core_cnt_; std::mutex mux_; CcErrorCode err_code_{CcErrorCode::NO_ERROR}; - std::vector next_idxs_; - std::vector> partitioned_slice_data_; + size_t next_idx_; + std::deque slice_data_; StoreSlice *range_slice_ = nullptr; StoreRange *range_ = nullptr; diff --git a/tx_service/include/cc/template_cc_map.h b/tx_service/include/cc/template_cc_map.h index f5dded3e..7178e3d3 100644 --- a/tx_service/include/cc/template_cc_map.h +++ b/tx_service/include/cc/template_cc_map.h @@ -6663,9 +6663,9 @@ class TemplateCcMap : public CcMap bool Execute(FillStoreSliceCc &req) override { - std::deque &slice_vec = req.SliceData(shard_->core_id_); + std::deque &slice_vec = req.SliceData(); - size_t index = req.NextIndex(shard_->core_id_); + size_t index = req.NextIndex(); size_t last_index = std::min(index + FillStoreSliceCc::MaxScanBatchSize, slice_vec.size()); @@ -6682,11 +6682,12 @@ class TemplateCcMap : public CcMap if (index == slice_vec.size()) { slice_vec.clear(); - return req.SetFinish(shard_); + req.SetFinish(shard_); + return true; } else { - req.SetNextIndex(shard_->core_id_, index); + req.SetNextIndex(index); shard_->Enqueue(shard_->LocalCoreId(), &req); return false; } diff --git a/tx_service/src/cc/cc_req_misc.cpp b/tx_service/src/cc/cc_req_misc.cpp index 26f7bc57..ba67e1f7 100644 --- a/tx_service/src/cc/cc_req_misc.cpp +++ b/tx_service/src/cc/cc_req_misc.cpp @@ -591,14 +591,9 @@ void FillStoreSliceCc::Reset(const TableName &table_name, cc_ng_id_ = cc_ng_id; cc_ng_term_ = cc_ng_term; force_load_ = force_load; - finish_cnt_ = 0; - core_cnt_ = cc_shards.Count(); - next_idxs_.clear(); - next_idxs_.resize(cc_shards.Count(), 0); - - partitioned_slice_data_.clear(); - partitioned_slice_data_.resize(cc_shards.Count()); + next_idx_ = 0; + slice_data_.clear(); range_slice_ = slice; range_ = range; @@ -620,7 +615,7 @@ void FillStoreSliceCc::SetKvFinish(bool success) { CODE_FAULT_INJECTOR("LoadRangeSliceRequest_SetFinish_Error", { success = false; - partitioned_slice_data_.clear(); + slice_data_.clear(); slice_size_ = 0; snapshot_ts_ = 0; }); @@ -657,7 +652,8 @@ bool FillStoreSliceCc::Execute(CcShard &ccs) int64_t cc_ng_term = Sharder::Instance().LeaderTerm(cc_ng_id_); if (std::max(cc_ng_candid_term, cc_ng_term) != cc_ng_term_) { - return SetError(CcErrorCode::NG_TERM_CHANGED); + SetError(CcErrorCode::NG_TERM_CHANGED); + return true; } CcMap *ccm = ccs.GetCcm(*table_name_, cc_ng_id_); @@ -706,106 +702,65 @@ void FillStoreSliceCc::AddDataItem( rec_cnt_++; } - size_t hash = key.Hash(); - // Uses the lower 10 bits of the hash code to shard the key across - // CPU cores at this node. - uint16_t core_code = hash & 0x3FF; - uint16_t core_id = core_code % core_cnt_; - - partitioned_slice_data_[core_id].emplace_back( + slice_data_.emplace_back( std::move(key), std::move(record), version_ts, is_deleted); } -bool FillStoreSliceCc::SetFinish(CcShard *cc_shard) +void FillStoreSliceCc::SetFinish(CcShard *cc_shard) { - bool finish_all = false; - CcErrorCode err_code; + if (err_code_ == CcErrorCode::NO_ERROR) { - std::lock_guard lk(mux_); - ++finish_cnt_; - - if (finish_cnt_ == core_cnt_) + bool init_key_cache = + txservice_enable_key_cache && table_name_->IsBase(); + // Cache the pointer since FillStoreSliceCc will be freed after + // CommitLoading. + + const TableName *tbl_name = table_name_; + auto cc_ng_id = cc_ng_id_; + auto cc_ng_term = cc_ng_term_; + if (init_key_cache && rec_cnt_ > 0) { - finish_all = true; - err_code = err_code_; - } - } + LocalCcShards *shards = Sharder::Instance().GetLocalCcShards(); + size_t estimate_rec_size = UINT64_MAX; - if (finish_all) - { - if (err_code == CcErrorCode::NO_ERROR) - { - bool init_key_cache = - txservice_enable_key_cache && table_name_->IsBase(); - // Cache the pointer since FillStoreSliceCc will be freed after - // CommitLoading. - - const TableName *tbl_name = table_name_; - auto cc_ng_id = cc_ng_id_; - auto cc_ng_term = cc_ng_term_; - if (init_key_cache && rec_cnt_ > 0) - { - LocalCcShards *shards = Sharder::Instance().GetLocalCcShards(); - size_t estimate_rec_size = UINT64_MAX; - - // Get estiamte record size for key cache - auto schema = shards->GetSharedTableSchema( - TableName(table_name_->GetBaseTableNameSV(), - TableType::Primary, - table_name_->Engine()), - cc_ng_id_); - auto stats = schema->StatisticsObject(); - assert(slice_size_ > 0); - estimate_rec_size = slice_size_ / rec_cnt_; - if (stats) - { - // Update estimate size in table stats with the loaded - // slice. - stats->SetEstimateRecordSize(estimate_rec_size); - } - } - range_slice_->CommitLoading(*range_, slice_size_); - if (init_key_cache) + // Get estiamte record size for key cache + auto schema = shards->GetSharedTableSchema( + TableName(table_name_->GetBaseTableNameSV(), + TableType::Primary, + table_name_->Engine()), + cc_ng_id_); + auto stats = schema->StatisticsObject(); + assert(slice_size_ > 0); + estimate_rec_size = slice_size_ / rec_cnt_; + if (stats) { - range_slice_->InitKeyCache( - cc_shard, range_, tbl_name, cc_ng_id, cc_ng_term); + // Update estimate size in table stats with the loaded + // slice. + stats->SetEstimateRecordSize(estimate_rec_size); } } - else - { - range_slice_->SetLoadingError(*range_, err_code); - } - - next_idxs_.clear(); - partitioned_slice_data_.clear(); - } - - return finish_all; -} - -bool FillStoreSliceCc::SetError(CcErrorCode err_code) -{ - bool finish_all = false; - { - std::lock_guard lk(mux_); - ++finish_cnt_; - err_code_ = err_code; - - if (finish_cnt_ == core_cnt_) + range_slice_->CommitLoading(*range_, slice_size_); + if (init_key_cache) { - finish_all = true; + range_slice_->InitKeyCache( + cc_shard, range_, tbl_name, cc_ng_id, cc_ng_term); } } - - if (finish_all) + else { range_slice_->SetLoadingError(*range_, err_code_); - - next_idxs_.clear(); - partitioned_slice_data_.clear(); } - return finish_all; + next_idx_ = 0; + slice_data_.clear(); +} + +void FillStoreSliceCc::SetError(CcErrorCode err_code) +{ + err_code_ = err_code; + range_slice_->SetLoadingError(*range_, err_code_); + next_idx_ = 0; + slice_data_.clear(); } void FillStoreSliceCc::StartFilling() @@ -819,8 +774,14 @@ void FillStoreSliceCc::TerminateFilling() // The slice has not been filled into memory. So, the out-of-memory flag is // false. range_slice_->SetLoadingError(*range_, CcErrorCode::DATA_STORE_ERR); - next_idxs_.clear(); - partitioned_slice_data_.clear(); + next_idx_ = 0; + slice_data_.clear(); +} + +int32_t FillStoreSliceCc::PartitionId() const +{ + assert(range_ != nullptr); + return range_->PartitionId(); } FetchRecordCc::FetchRecordCc(const TableName *tbl_name, diff --git a/tx_service/src/cc/range_slice.cpp b/tx_service/src/cc/range_slice.cpp index 91b1973b..db45452e 100644 --- a/tx_service/src/cc/range_slice.cpp +++ b/tx_service/src/cc/range_slice.cpp @@ -70,10 +70,9 @@ void StoreSlice::StartLoading(FillStoreSliceCc *fill_req, assert(pins_ == 0); status_ = SliceStatus::BeingLoaded; - for (uint16_t core_id = 0; core_id < cc_shards.Count(); ++core_id) - { - cc_shards.EnqueueCcRequest(core_id, fill_req); - } + uint16_t dest_core = + static_cast(fill_req->PartitionId() % cc_shards.Count()); + cc_shards.EnqueueToCcShard(dest_core, fill_req); } void StoreSlice::CommitLoading(StoreRange &range, uint32_t slice_size)