Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 16 additions & 21 deletions tx_service/include/cc/cc_req_misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,19 +417,18 @@ struct FillStoreSliceCc : public CcRequestBase

bool Execute(CcShard &ccs) override;

std::deque<SliceDataItem> &SliceData(uint16_t core_id)
std::deque<SliceDataItem> &SliceData()
{
assert(core_id < partitioned_slice_data_.size());
return partitioned_slice_data_[core_id];
return slice_data_;
}

void AddDataItem(TxKey key,
std::unique_ptr<txservice::TxRecord> &&record,
uint64_t version_ts,
bool is_deleted);

bool SetFinish(CcShard *cc_shard);
bool SetError(CcErrorCode err_code);
void SetFinish(CcShard *cc_shard);
void SetError(CcErrorCode err_code);

void SetKvFinish(bool success);

Expand All @@ -438,12 +437,9 @@ struct FillStoreSliceCc : public CcRequestBase
assert(err_code != CcErrorCode::NO_ERROR);
DLOG(ERROR) << "Abort this FillStoreSliceCc request with error: "
<< CcErrorMessage(err_code);
bool finish_all = SetError(err_code);
SetError(err_code);
// Recycle request
if (finish_all)
{
Free();
}
Free();
}

const TableName &TblName() const
Expand Down Expand Up @@ -476,17 +472,16 @@ struct FillStoreSliceCc : public CcRequestBase
force_load_ = force_load;
}

size_t NextIndex(size_t core_idx) const
size_t NextIndex() const
{
size_t next_idx = next_idxs_[core_idx];
assert(next_idx <= partitioned_slice_data_[core_idx].size());
return next_idx;
assert(next_idx_ <= slice_data_.size());
return next_idx_;
}

void SetNextIndex(size_t core_idx, size_t index)
void SetNextIndex(size_t index)
{
assert(index <= partitioned_slice_data_[core_idx].size());
next_idxs_[core_idx] = index;
assert(index <= slice_data_.size());
next_idx_ = index;
}

NodeGroupId NodeGroup() const
Expand Down Expand Up @@ -524,20 +519,20 @@ struct FillStoreSliceCc : public CcRequestBase
return true;
}

int32_t PartitionId() const;

metrics::TimePoint start_;

private:
const TableName *table_name_;
NodeGroupId cc_ng_id_;
int64_t cc_ng_term_;
bool force_load_;
uint16_t finish_cnt_;
uint16_t core_cnt_;
std::mutex mux_;
CcErrorCode err_code_{CcErrorCode::NO_ERROR};

std::vector<size_t> next_idxs_;
std::vector<std::deque<SliceDataItem>> partitioned_slice_data_;
size_t next_idx_;
std::deque<SliceDataItem> slice_data_;

StoreSlice *range_slice_ = nullptr;
StoreRange *range_ = nullptr;
Expand Down
9 changes: 5 additions & 4 deletions tx_service/include/cc/template_cc_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -6663,9 +6663,9 @@ class TemplateCcMap : public CcMap

bool Execute(FillStoreSliceCc &req) override
{
std::deque<SliceDataItem> &slice_vec = req.SliceData(shard_->core_id_);
std::deque<SliceDataItem> &slice_vec = req.SliceData();

size_t index = req.NextIndex(shard_->core_id_);
size_t index = req.NextIndex();
size_t last_index = std::min(index + FillStoreSliceCc::MaxScanBatchSize,
slice_vec.size());

Expand All @@ -6682,11 +6682,12 @@ class TemplateCcMap : public CcMap
if (index == slice_vec.size())
{
slice_vec.clear();
return req.SetFinish(shard_);
req.SetFinish(shard_);
return true;
}
else
{
req.SetNextIndex(shard_->core_id_, index);
req.SetNextIndex(index);
shard_->Enqueue(shard_->LocalCoreId(), &req);
return false;
}
Expand Down
149 changes: 55 additions & 94 deletions tx_service/src/cc/cc_req_misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -591,14 +591,9 @@ void FillStoreSliceCc::Reset(const TableName &table_name,
cc_ng_id_ = cc_ng_id;
cc_ng_term_ = cc_ng_term;
force_load_ = force_load;
finish_cnt_ = 0;
core_cnt_ = cc_shards.Count();

next_idxs_.clear();
next_idxs_.resize(cc_shards.Count(), 0);

partitioned_slice_data_.clear();
partitioned_slice_data_.resize(cc_shards.Count());
next_idx_ = 0;
slice_data_.clear();

range_slice_ = slice;
range_ = range;
Expand All @@ -620,7 +615,7 @@ void FillStoreSliceCc::SetKvFinish(bool success)
{
CODE_FAULT_INJECTOR("LoadRangeSliceRequest_SetFinish_Error", {
success = false;
partitioned_slice_data_.clear();
slice_data_.clear();
slice_size_ = 0;
snapshot_ts_ = 0;
});
Expand Down Expand Up @@ -657,7 +652,8 @@ bool FillStoreSliceCc::Execute(CcShard &ccs)
int64_t cc_ng_term = Sharder::Instance().LeaderTerm(cc_ng_id_);
if (std::max(cc_ng_candid_term, cc_ng_term) != cc_ng_term_)
{
return SetError(CcErrorCode::NG_TERM_CHANGED);
SetError(CcErrorCode::NG_TERM_CHANGED);
return true;
}

CcMap *ccm = ccs.GetCcm(*table_name_, cc_ng_id_);
Expand Down Expand Up @@ -706,106 +702,65 @@ void FillStoreSliceCc::AddDataItem(
rec_cnt_++;
}

size_t hash = key.Hash();
// Uses the lower 10 bits of the hash code to shard the key across
// CPU cores at this node.
uint16_t core_code = hash & 0x3FF;
uint16_t core_id = core_code % core_cnt_;

partitioned_slice_data_[core_id].emplace_back(
slice_data_.emplace_back(
std::move(key), std::move(record), version_ts, is_deleted);
}

bool FillStoreSliceCc::SetFinish(CcShard *cc_shard)
void FillStoreSliceCc::SetFinish(CcShard *cc_shard)
{
bool finish_all = false;
CcErrorCode err_code;
if (err_code_ == CcErrorCode::NO_ERROR)
{
std::lock_guard<std::mutex> lk(mux_);
++finish_cnt_;

if (finish_cnt_ == core_cnt_)
bool init_key_cache =
txservice_enable_key_cache && table_name_->IsBase();
// Cache the pointer since FillStoreSliceCc will be freed after
// CommitLoading.

const TableName *tbl_name = table_name_;
auto cc_ng_id = cc_ng_id_;
auto cc_ng_term = cc_ng_term_;
if (init_key_cache && rec_cnt_ > 0)
{
finish_all = true;
err_code = err_code_;
}
}
LocalCcShards *shards = Sharder::Instance().GetLocalCcShards();
size_t estimate_rec_size = UINT64_MAX;

if (finish_all)
{
if (err_code == CcErrorCode::NO_ERROR)
{
bool init_key_cache =
txservice_enable_key_cache && table_name_->IsBase();
// Cache the pointer since FillStoreSliceCc will be freed after
// CommitLoading.

const TableName *tbl_name = table_name_;
auto cc_ng_id = cc_ng_id_;
auto cc_ng_term = cc_ng_term_;
if (init_key_cache && rec_cnt_ > 0)
{
LocalCcShards *shards = Sharder::Instance().GetLocalCcShards();
size_t estimate_rec_size = UINT64_MAX;

// Get estiamte record size for key cache
auto schema = shards->GetSharedTableSchema(
TableName(table_name_->GetBaseTableNameSV(),
TableType::Primary,
table_name_->Engine()),
cc_ng_id_);
auto stats = schema->StatisticsObject();
assert(slice_size_ > 0);
estimate_rec_size = slice_size_ / rec_cnt_;
if (stats)
{
// Update estimate size in table stats with the loaded
// slice.
stats->SetEstimateRecordSize(estimate_rec_size);
}
}
range_slice_->CommitLoading(*range_, slice_size_);
if (init_key_cache)
// Get estiamte record size for key cache
auto schema = shards->GetSharedTableSchema(
TableName(table_name_->GetBaseTableNameSV(),
TableType::Primary,
table_name_->Engine()),
cc_ng_id_);
auto stats = schema->StatisticsObject();
assert(slice_size_ > 0);
estimate_rec_size = slice_size_ / rec_cnt_;
if (stats)
{
range_slice_->InitKeyCache(
cc_shard, range_, tbl_name, cc_ng_id, cc_ng_term);
// Update estimate size in table stats with the loaded
// slice.
stats->SetEstimateRecordSize(estimate_rec_size);
}
}
else
{
range_slice_->SetLoadingError(*range_, err_code);
}

next_idxs_.clear();
partitioned_slice_data_.clear();
}

return finish_all;
}

bool FillStoreSliceCc::SetError(CcErrorCode err_code)
{
bool finish_all = false;
{
std::lock_guard<std::mutex> lk(mux_);
++finish_cnt_;
err_code_ = err_code;

if (finish_cnt_ == core_cnt_)
range_slice_->CommitLoading(*range_, slice_size_);
if (init_key_cache)
{
finish_all = true;
range_slice_->InitKeyCache(
cc_shard, range_, tbl_name, cc_ng_id, cc_ng_term);
}
}

if (finish_all)
else
{
range_slice_->SetLoadingError(*range_, err_code_);

next_idxs_.clear();
partitioned_slice_data_.clear();
}

return finish_all;
next_idx_ = 0;
slice_data_.clear();
}

void FillStoreSliceCc::SetError(CcErrorCode err_code)
{
err_code_ = err_code;
range_slice_->SetLoadingError(*range_, err_code_);
next_idx_ = 0;
slice_data_.clear();
}

void FillStoreSliceCc::StartFilling()
Expand All @@ -819,8 +774,14 @@ void FillStoreSliceCc::TerminateFilling()
// The slice has not been filled into memory. So, the out-of-memory flag is
// false.
range_slice_->SetLoadingError(*range_, CcErrorCode::DATA_STORE_ERR);
next_idxs_.clear();
partitioned_slice_data_.clear();
next_idx_ = 0;
slice_data_.clear();
}

int32_t FillStoreSliceCc::PartitionId() const
{
assert(range_ != nullptr);
return range_->PartitionId();
}

FetchRecordCc::FetchRecordCc(const TableName *tbl_name,
Expand Down
7 changes: 3 additions & 4 deletions tx_service/src/cc/range_slice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ void StoreSlice::StartLoading(FillStoreSliceCc *fill_req,
assert(pins_ == 0);
status_ = SliceStatus::BeingLoaded;

for (uint16_t core_id = 0; core_id < cc_shards.Count(); ++core_id)
{
cc_shards.EnqueueCcRequest(core_id, fill_req);
}
uint16_t dest_core =
static_cast<uint16_t>(fill_req->PartitionId() % cc_shards.Count());
cc_shards.EnqueueToCcShard(dest_core, fill_req);
}

void StoreSlice::CommitLoading(StoreRange &range, uint32_t slice_size)
Expand Down