diff --git a/data_store_service_client.cpp b/data_store_service_client.cpp index f86ebdb..858ad37 100644 --- a/data_store_service_client.cpp +++ b/data_store_service_client.cpp @@ -27,18 +27,23 @@ #include #include #include +#include #include #include +#include #include #include #include +#include "cc_req_misc.h" #include "data_store_service_client_closure.h" #include "data_store_service_scanner.h" #include "eloq_data_store_service/object_pool.h" // ObjectPool #include "eloq_data_store_service/thread_worker_pool.h" #include "metrics.h" +#include "sharder.h" #include "store_util.h" // host_to_big_endian +#include "tx_key.h" #include "tx_service/include/cc/local_cc_shards.h" #include "tx_service/include/error_messages.h" #include "tx_service/include/sequences/sequences.h" @@ -186,7 +191,7 @@ bool DataStoreServiceClient::PutAll( } SyncCallbackData sync_putall; - uint16_t parts_cnt_per_key = 1; + uint16_t parts_cnt_per_key = table_name.IsHashPartitioned() ? 2 : 1; uint16_t parts_cnt_per_record = table_name.IsHashPartitioned() ? 1 : 5; // Write data for hash_partitioned table @@ -204,6 +209,7 @@ bool DataStoreServiceClient::PutAll( op_types.reserve(recs_cnt); for (auto idx : flush_recs) { + // TODO(lokax): encode bucket id txservice::FlushRecord &ckpt_rec = entries.at(idx.first)->data_sync_vec_->at(idx.second); txservice::TxKey tx_key = ckpt_rec.Key(); @@ -265,9 +271,13 @@ bool DataStoreServiceClient::PutAll( txservice::RecordStatus::Normal && (!ckpt_rec.Payload()->HasTTL() || ttl > now)) { + key_parts.emplace_back( + EncodeBucketId(txservice::Sharder::MapKeyHashToBucketId( + tx_key.Hash()))); + key_parts.emplace_back( std::string_view(tx_key.Data(), tx_key.Size())); - write_batch_size += tx_key.Size(); + write_batch_size += tx_key.Size() + sizeof(uint16_t); const txservice::TxRecord *rec = ckpt_rec.Payload(); // Upserts a key to the k-v store @@ -286,9 +296,13 @@ bool DataStoreServiceClient::PutAll( } else { + key_parts.emplace_back( + EncodeBucketId(txservice::Sharder::MapKeyHashToBucketId( + tx_key.Hash()))); + key_parts.emplace_back( std::string_view(tx_key.Data(), tx_key.Size())); - write_batch_size += tx_key.Size(); + write_batch_size += tx_key.Size() + sizeof(uint16_t); record_parts.emplace_back(std::string_view()); @@ -546,6 +560,7 @@ void DataStoreServiceClient::FetchTableCatalog( std::string_view key = fetch_cc->CatalogName().StringView(); Read(kv_table_catalogs_name, kv_partition_id, + "", key, fetch_cc, &FetchTableCatalogCallback); @@ -561,6 +576,7 @@ void DataStoreServiceClient::FetchCurrentTableStatistics( fetch_cc->SetStoreHandler(this); Read(kv_table_statistics_version_name, kv_partition_id, + "", sv, fetch_cc, &FetchCurrentTableStatsCallback); @@ -594,6 +610,7 @@ void DataStoreServiceClient::FetchTableStatistics( callback_data->start_key_, callback_data->end_key_, callback_data->session_id_, + true, false, false, true, @@ -839,6 +856,7 @@ void DataStoreServiceClient::FetchTableRanges( callback_data->end_key_, callback_data->session_id_, true, + true, false, true, callback_data->batch_size_, @@ -873,6 +891,7 @@ void DataStoreServiceClient::FetchRangeSlices( Read(callback_data->kv_range_table_name_, callback_data->kv_partition_id_, + "", callback_data->key_, callback_data, &FetchRangeSlicesCallback); @@ -932,6 +951,7 @@ bool DataStoreServiceClient::Read(const txservice::TableName &table_name, return true; } +// TODO(lokax): delete std::unique_ptr DataStoreServiceClient::ScanForward( const txservice::TableName &table_name, @@ -1046,7 +1066,8 @@ DataStoreServiceClient::LoadRangeSlice( kv_partition_id, callback_data->last_key_, callback_data->end_key_, - "", // session_id + "", // session_id + true, true, // include start_key false, // include end_key true, // scan forward @@ -1327,6 +1348,7 @@ bool DataStoreServiceClient::FetchTable(const txservice::TableName &table_name, FetchTableCallbackData callback_data(schema_image, found, version_ts); Read(kv_table_catalogs_name, 0, + "", table_name.StringView(), &callback_data, &FetchTableCallback); @@ -1360,6 +1382,7 @@ bool DataStoreServiceClient::DiscoverAllTableNames( callback_data.start_key_, callback_data.end_key_, callback_data.session_id_, + true, false, false, true, @@ -1471,6 +1494,7 @@ bool DataStoreServiceClient::FetchDatabase( Read(kv_database_catalogs_name, 0, + "", db, &callback_data, &FetchDatabaseCallback); @@ -1492,6 +1516,7 @@ bool DataStoreServiceClient::FetchAllDatabase( callback_data.start_key_, callback_data.end_key_, callback_data.session_id_, + true, false, false, true, @@ -1687,6 +1712,39 @@ uint32_t DataStoreServiceClient::HashArchiveKey( return partition_id; } +std::string DataStoreServiceClient::EncodeKvKeyForHashPart(uint16_t bucket_id) +{ + std::string kv_key; + uint16_t be_bucket_id = EloqShare::host_to_big_endian(bucket_id); + kv_key.append(reinterpret_cast(&be_bucket_id), + sizeof(be_bucket_id)); + return kv_key; +} + +std::string DataStoreServiceClient::EncodeKvKeyForHashPart( + uint16_t bucket_id, const txservice::TxKey &tx_key) +{ + std::string kv_key; + uint16_t be_bucket_id = EloqShare::host_to_big_endian(bucket_id); + kv_key.reserve(sizeof(uint16_t) + tx_key.Size()); + kv_key.append(reinterpret_cast(&be_bucket_id), + sizeof(be_bucket_id)); + if (tx_key.Type() == txservice::KeyType::Normal) + { + kv_key.append(tx_key.Data(), tx_key.Size()); + } + return kv_key; +} + +std::string_view DataStoreServiceClient::DecodeKvKeyForHashPart( + const char *data, size_t size) +{ + assert(size >= sizeof(uint16_t)); + const char *tx_key_start = data + sizeof(uint16_t); + size_t tx_key_len = size - sizeof(uint16_t); + return std::string_view(tx_key_start, tx_key_len); +} + std::string DataStoreServiceClient::EncodeArchiveKey( std::string_view table_name, std::string_view key, uint64_t be_commit_ts) { @@ -2071,15 +2129,26 @@ bool DataStoreServiceClient::CopyBaseToArchive( { txservice::TxKey &tx_key = base_vec[base_idx].first; assert(tx_key.Data() != nullptr && tx_key.Size() > 0); + uint32_t partition_id = base_vec[base_idx].second; auto *callback_data = &callback_datas[base_idx]; callback_data->ResetResult(); size_t flying_cnt = callback_data->AddFlyingReadCount(); + + std::string_view be_bucket_id = + table_name.IsHashPartitioned() + ? EncodeBucketId( + txservice::Sharder::MapKeyHashToBucketId( + tx_key.Hash())) + : std::string_view(); + Read(base_kv_table_name, KvPartitionIdOf(partition_id, true), + be_bucket_id, std::string_view(tx_key.Data(), tx_key.Size()), callback_data, &SyncBatchReadForArchiveCallback); + if (flying_cnt >= MAX_FLYING_READ_COUNT) { callback_data->Wait(); @@ -2104,10 +2173,17 @@ bool DataStoreServiceClient::CopyBaseToArchive( for (size_t i = 0; i < base_vec.size(); i++) { auto &callback_data = callback_datas[i]; + std::string_view tx_key_view = callback_data.key_str_; + if (table_name.IsHashPartitioned()) + { + tx_key_view = DecodeKvKeyForHashPart(tx_key_view.data(), + tx_key_view.size()); + } + txservice::TxKey tx_key = txservice::TxKeyFactory::CreateTxKey( - callback_data.key_str_.data(), - callback_data.key_str_.size()); - batch_size += callback_data.key_str_.size(); + tx_key_view.data(), tx_key_view.size()); + + batch_size += tx_key_view.size(); batch_size += callback_data.value_str_.size(); std::string_view val = callback_data.value_str_; size_t offset = 0; @@ -2223,6 +2299,7 @@ bool DataStoreServiceClient::FetchArchives( lower_bound_key, upper_bound_key, callback_data.session_id_, + true, true, // include start key false, // include end key callback_data.scan_forward_, // scan forward: true @@ -2307,6 +2384,7 @@ bool DataStoreServiceClient::FetchVisibleArchive( lower_bound_key, upper_bound_key, callback_data.session_id_, + true, true, // include start key false, // include end key callback_data.scan_forward_, // scan forward: false @@ -2386,6 +2464,7 @@ DataStoreServiceClient::FetchArchives(txservice::FetchRecordCc *fetch_cc) callback_data->start_key_, callback_data->end_key_, callback_data->session_id_, + true, true, // include start key false, // include end key false, // scan forward: false @@ -2424,6 +2503,7 @@ DataStoreServiceClient::FetchVisibleArchive( callback_data->start_key_, callback_data->end_key_, callback_data->session_id_, + true, true, // include start key false, // include end key false, // scan forward: false @@ -2497,9 +2577,16 @@ DataStoreServiceClient::FetchRecord( return FetchArchives(fetch_cc); } + std::string_view be_bucket_id = + fetch_cc->table_name_.IsHashPartitioned() + ? EncodeBucketId(txservice::Sharder::MapKeyHashToBucketId( + fetch_cc->tx_key_.Hash())) + : std::string_view(); + Read(fetch_cc->kv_table_name_, KvPartitionIdOf(fetch_cc->partition_id_, !fetch_cc->table_name_.IsHashPartitioned()), + be_bucket_id, std::string_view(fetch_cc->tx_key_.Data(), fetch_cc->tx_key_.Size()), fetch_cc, &FetchRecordCallback); @@ -2507,6 +2594,41 @@ DataStoreServiceClient::FetchRecord( return txservice::store::DataStoreHandler::DataStoreOpStatus::Success; } +txservice::store::DataStoreHandler::DataStoreOpStatus +DataStoreServiceClient::FetchBucketData( + txservice::FetchBucketDataCc *fetch_bucket_data_cc) +{ + assert(fetch_bucket_data_cc != nullptr); + assert(fetch_bucket_data_cc->table_name_.IsHashPartitioned()); + + int32_t kv_partition_id = + KvPartitionIdOf(txservice::Sharder::MapBucketIdToKvPartitionId( + fetch_bucket_data_cc->bucket_id_), + false); + + auto *callback_data = new FetchBucketDataCallbackData(fetch_bucket_data_cc); + callback_data->bucket_kv_start_key_ = EncodeKvKeyForHashPart( + fetch_bucket_data_cc->bucket_id_, fetch_bucket_data_cc->start_key_); + callback_data->bucket_kv_end_key_ = + EncodeKvKeyForHashPart(fetch_bucket_data_cc->bucket_id_ + 1); + + ScanNext(fetch_bucket_data_cc->kv_table_name_, + kv_partition_id, + callback_data->bucket_kv_start_key_, + callback_data->bucket_kv_end_key_, + callback_data->session_id_, + false, + fetch_bucket_data_cc->start_key_inclusive_, + false, + true, + fetch_bucket_data_cc->batch_size_, + &callback_data->search_cond_, + callback_data, + &FetchBucketDataCallback); + + return txservice::store::DataStoreHandler::DataStoreOpStatus::Success; +} + txservice::store::DataStoreHandler::DataStoreOpStatus DataStoreServiceClient::FetchSnapshot(txservice::FetchSnapshotCc *fetch_cc) { @@ -2525,9 +2647,16 @@ DataStoreServiceClient::FetchSnapshot(txservice::FetchSnapshotCc *fetch_cc) return FetchVisibleArchive(fetch_cc); } + std::string_view be_bucket_id = + fetch_cc->table_name_.IsHashPartitioned() + ? EncodeBucketId(txservice::Sharder::MapKeyHashToBucketId( + fetch_cc->tx_key_.Hash())) + : std::string_view(); + Read(fetch_cc->kv_table_name_, KvPartitionIdOf(fetch_cc->partition_id_, !fetch_cc->table_name_.IsHashPartitioned()), + be_bucket_id, std::string_view(fetch_cc->tx_key_.Data(), fetch_cc->tx_key_.Size()), fetch_cc, &FetchSnapshotCallback); @@ -2537,13 +2666,19 @@ DataStoreServiceClient::FetchSnapshot(txservice::FetchSnapshotCc *fetch_cc) void DataStoreServiceClient::Read(const std::string_view kv_table_name, const uint32_t partition_id, + std::string_view be_bucket_id, const std::string_view key, void *callback_data, DataStoreCallback callback) { ReadClosure *read_clouse = read_closure_pool_.NextObject(); - read_clouse->Reset( - this, kv_table_name, partition_id, key, callback_data, callback); + read_clouse->Reset(this, + kv_table_name, + partition_id, + be_bucket_id, + key, + callback_data, + callback); ReadInternal(read_clouse); } @@ -2768,6 +2903,7 @@ void DataStoreServiceClient::ScanNext( const std::string_view start_key, const std::string_view end_key, const std::string_view session_id, + bool generate_session_id, bool inclusive_start, bool inclusive_end, bool scan_forward, @@ -2786,6 +2922,7 @@ void DataStoreServiceClient::ScanNext( inclusive_end, scan_forward, session_id, + generate_session_id, batch_size, search_conditions, callback_data, @@ -2811,6 +2948,7 @@ void DataStoreServiceClient::ScanNextInternal( scan_next_closure->LocalSearchConditionsPtr(), &scan_next_closure->LocalItemsRef(), &scan_next_closure->LocalSessionIdRef(), + scan_next_closure->GenerateSessionId(), &scan_next_closure->Result(), scan_next_closure); } @@ -2855,6 +2993,7 @@ void DataStoreServiceClient::ScanClose(const std::string_view table_name, false, // inclusive_end true, // scan_forward session_id, + false, 0, // batch_size 0 for close nullptr, callback_data, diff --git a/data_store_service_client.h b/data_store_service_client.h index 09bc8e3..79f8add 100644 --- a/data_store_service_client.h +++ b/data_store_service_client.h @@ -21,6 +21,7 @@ */ #pragma once +#include #include #include #include @@ -32,6 +33,8 @@ #include "eloq_data_store_service/data_store_service.h" #include "eloq_data_store_service/ds_request.pb.h" #include "eloq_data_store_service/thread_worker_pool.h" +#include "store_util.h" +#include "tx_key.h" #include "tx_service/include/cc/cc_shard.h" #include "tx_service/include/sequences/sequences.h" #include "tx_service/include/sharder.h" @@ -80,6 +83,15 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler << txservice::Sequences::table_name_sv_; AppendPreBuiltTable(txservice::Sequences::table_name_); + + be_bucket_ids_.reserve(txservice::Sharder::ToTalRangeBuckets()); + for (size_t bucket_id = 0; + bucket_id < txservice::Sharder::ToTalRangeBuckets(); + ++bucket_id) + { + uint16_t be_bucket_id = EloqShare::host_to_big_endian(bucket_id); + be_bucket_ids_.push_back(be_bucket_id); + } } // The maximum number of retries for RPC requests. @@ -187,6 +199,9 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler uint64_t &version_ts, const txservice::TableSchema *table_schema) override; + txservice::store::DataStoreHandler::DataStoreOpStatus FetchBucketData( + txservice::FetchBucketDataCc *fetch_bucket_data_cc) override; + DataStoreOpStatus FetchRecord( txservice::FetchRecordCc *fetch_cc, txservice::FetchSnapshotCc *fetch_snapshot_cc = nullptr) override; @@ -204,8 +219,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler * @brief Only Fetch visible archive asynchronously. (This is called in * FetchSnapshot) */ - DataStoreOpStatus - FetchVisibleArchive(txservice::FetchSnapshotCc *fetch_cc); + DataStoreOpStatus FetchVisibleArchive(txservice::FetchSnapshotCc *fetch_cc); std::unique_ptr ScanForward( const txservice::TableName &table_name, @@ -393,6 +407,13 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler static uint32_t HashArchiveKey(const std::string &kv_table_name, const txservice::TxKey &tx_key); + static std::string EncodeKvKeyForHashPart(uint16_t bucket_id); + static std::string EncodeKvKeyForHashPart(uint16_t bucket_id, + const txservice::TxKey &tx_key); + + static std::string_view DecodeKvKeyForHashPart(const char *data, + size_t size); + // NOTICE: be_commit_ts is the big endian encode value of commit_ts static std::string EncodeArchiveKey(std::string_view table_name, std::string_view key, @@ -440,11 +461,6 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler uint64_t write_time); private: - int32_t MapKeyHashToPartitionId(const txservice::TxKey &key) const - { - return (key.Hash() >> 10) & 0x3FF; - } - // ===================================================== // Group: KV Interface // Functions that decide if the request is local or remote @@ -452,6 +468,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler void Read(const std::string_view kv_table_name, const uint32_t partition_id, + const std::string_view be_bucket_id, const std::string_view key, void *callback_data, DataStoreCallback callback); @@ -504,6 +521,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler const std::string_view start_key, const std::string_view end_key, const std::string_view session_id, + bool generate_session, bool inclusive_start, bool inclusive_end, bool scan_forward, @@ -552,6 +570,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler #ifdef USE_ONE_ELOQDSS_PARTITION return 0; #else + // TODO(lokax): std::string_view sv = table.StringView(); return (std::hash()(sv)) & 0x3FF; #endif @@ -560,6 +579,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler int32_t KvPartitionIdOf(int32_t key_partition, bool is_range_partition = true) { + // TODO(lokax): #ifdef USE_ONE_ELOQDSS_PARTITION if (is_range_partition) { @@ -593,6 +613,13 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler */ bool IsLocalPartition(int32_t partition_id); + std::string_view EncodeBucketId(uint16_t bucket_id) + { + uint16_t &be_bucket_id = be_bucket_ids_[bucket_id]; + return std::string_view(reinterpret_cast(&be_bucket_id), + sizeof(uint16_t)); + } + bthread::Mutex ds_service_mutex_; bthread::ConditionVariable ds_service_cv_; std::atomic ds_serv_shutdown_indicator_; @@ -612,6 +639,8 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler pre_built_table_names_; ThreadWorkerPool upsert_table_worker_{1}; + std::vector be_bucket_ids_; + friend class ReadClosure; friend class BatchWriteRecordsClosure; friend class FlushDataClosure; @@ -649,8 +678,10 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler DataStoreServiceClient &client, const remote::CommonResult &result); friend void FetchRecordArchivesCallback( - void *data, ::google::protobuf::Closure *closure, - DataStoreServiceClient &client, const remote::CommonResult &result); + void *data, + ::google::protobuf::Closure *closure, + DataStoreServiceClient &client, + const remote::CommonResult &result); }; struct UpsertTableData diff --git a/data_store_service_client_closure.cpp b/data_store_service_client_closure.cpp index a838417..f2039f7 100644 --- a/data_store_service_client_closure.cpp +++ b/data_store_service_client_closure.cpp @@ -24,9 +24,12 @@ #include #include +#include #include -#include "store_util.h" // host_to_big_endian +#include "cc_req_misc.h" +#include "error_messages.h" +#include "store_util.h" // host_to_big_endian #include "tx_service/include/cc/cc_request.h" #include "tx_service/include/cc/local_cc_shards.h" @@ -57,11 +60,11 @@ void SyncBatchReadForArchiveCallback(void *data, if (err_code == remote::DataStoreError::KEY_NOT_FOUND) { LOG(ERROR) << "BatchReadForArchiveCallback, key not found: " - << read_closure->Key(); + << read_closure->Key().back(); // callback_data->SetErrorCode( // static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); // assert(false); - std::string_view key_str = read_closure->Key(); + std::string_view key_str = read_closure->Key().back(); uint64_t ts = 1U; uint64_t ttl = 0U; std::string value_str = client.SerializeTxRecord(true, nullptr); @@ -84,7 +87,7 @@ void SyncBatchReadForArchiveCallback(void *data, } else { - std::string_view key_str = read_closure->Key(); + std::string_view key_str = read_closure->Key().back(); std::string &value_str = read_closure->ValueStringRef(); uint64_t ts = read_closure->Ts(); uint64_t ttl = read_closure->Ttl(); @@ -104,8 +107,8 @@ void FetchRecordCallback(void *data, const remote::CommonResult &result) { auto *read_closure = static_cast(closure); - - auto *fetch_cc = static_cast(data); + txservice::FetchRecordCc *fetch_cc = + static_cast(data); auto err_code = result.error_code(); { @@ -166,7 +169,9 @@ void FetchRecordCallback(void *data, val, is_deleted, offset)) { LOG(ERROR) << "====fetch record===decode error==" - << " key: " << read_closure->Key() + << " key: " + << std::string_view(fetch_cc->tx_key_.Data(), + fetch_cc->tx_key_.Size()) << " status: " << (int) fetch_cc->rec_status_; std::abort(); } @@ -174,128 +179,187 @@ void FetchRecordCallback(void *data, if (is_deleted) { fetch_cc->rec_status_ = txservice::RecordStatus::Deleted; - fetch_cc->rec_ts_= read_closure->Ts(); + fetch_cc->rec_ts_ = read_closure->Ts(); } else { - fetch_cc->rec_status_= txservice::RecordStatus::Normal; - fetch_cc->rec_ts_= read_closure->Ts(); - fetch_cc->rec_str_.assign(val.data() + offset, - val.size() - offset); + fetch_cc->rec_status_ = txservice::RecordStatus::Normal; + fetch_cc->rec_ts_ = read_closure->Ts(); + fetch_cc->rec_str_.assign(val.data() + offset, + val.size() - offset); } if (fetch_cc->snapshot_read_ts_ > 0 && fetch_cc->snapshot_read_ts_ < fetch_cc->rec_ts_) { - auto op_st= client.FetchArchives(fetch_cc); - - if (op_st != txservice::store::DataStoreHandler:: - DataStoreOpStatus::Success) - { - LOG(ERROR) << "FetchArchives failed, key: " - << fetch_cc->tx_key_.ToString(); - // Through fetch archive failed, we can also backfill the - // base version. - fetch_cc->SetFinish(0); - } + auto op_st = client.FetchArchives(fetch_cc); + + if (op_st != txservice::store::DataStoreHandler:: + DataStoreOpStatus::Success) + { + LOG(ERROR) << "FetchArchives failed, key: " + << fetch_cc->tx_key_.ToString(); + // Through fetch archive failed, we can also backfill the + // base version. + fetch_cc->SetFinish(0); + } } else { - fetch_cc->SetFinish(0); + fetch_cc->SetFinish(0); } } } else { - fetch_cc->SetFinish( - static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); + fetch_cc->SetFinish( + static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); } } -void FetchSnapshotCallback(void *data, ::google::protobuf::Closure *closure, - DataStoreServiceClient &client, - const remote::CommonResult &result) +void FetchBucketDataCallback(void *data, + ::google::protobuf::Closure *closure, + DataStoreServiceClient &client, + const remote::CommonResult &result) { - auto *read_closure= static_cast(closure); - - auto *fetch_cc= static_cast(data); - auto err_code= result.error_code(); + assert(data != nullptr); + FetchBucketDataCallbackData *callback_data = + static_cast(data); + ScanNextClosure *scan_next_closure = + static_cast(closure); + assert(!scan_next_closure->GenerateSessionId()); + auto *fetch_bucket_data_cc = callback_data->fetch_bucket_data_cc_; - if (metrics::enable_kv_metrics) - { - metrics::kv_meter->CollectDuration(metrics::NAME_KV_READ_DURATION, - fetch_cc->start_); - metrics::kv_meter->Collect(metrics::NAME_KV_READ_TOTAL, 1); - } + if (result.error_code() != EloqDS::remote::DataStoreError::NO_ERROR) + { + LOG(ERROR) << "DataStoreHandler: Failed to do FetchBucketData. " + << result.error_msg(); + fetch_bucket_data_cc->SetFinish( + static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); - if (err_code == remote::DataStoreError::KEY_NOT_FOUND) - { - fetch_cc->rec_status_= txservice::RecordStatus::Deleted; - fetch_cc->rec_ts_= 1U; + delete callback_data; + return; + } - fetch_cc->SetFinish(0); - } - else if (err_code == remote::DataStoreError::NO_ERROR) - { - std::string_view val= read_closure->Value(); + assert(fetch_bucket_data_cc->table_name_.IsHashPartitioned()); - if (fetch_cc->table_name_.IsHashPartitioned()) - { - LOG(WARNING) << "FetchSnapshot on hash partition not supported"; - assert(false); - fetch_cc->SetFinish( - static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); - } - else + uint32_t items_size = scan_next_closure->ItemsSize(); + std::string key_str; + std::string value_str; + uint64_t ts = UINT64_MAX; + uint64_t ttl = UINT64_MAX; + uint64_t now = txservice::LocalCcShards::ClockTsInMillseconds(); + for (uint32_t item_idx = 0; item_idx < items_size; ++item_idx) { - // Range partition - if (fetch_cc->snapshot_read_ts_ >= read_closure->Ts()) - { - bool is_deleted= false; - size_t offset= 0; - if (!DataStoreServiceClient::DeserializeTxRecordStr(val, is_deleted, - offset)) - { - LOG(ERROR) << "====fetch snapshot===decode error==" - << " key: " << read_closure->Key() - << " status: " << (int) fetch_cc->rec_status_; - std::abort(); - } + scan_next_closure->GetItem(item_idx, key_str, value_str, ts, ttl); - if (is_deleted) + std::string tx_key( + client.DecodeKvKeyForHashPart(key_str.data(), key_str.size())); + if (ttl > 0 && ttl < now) { - fetch_cc->rec_status_= txservice::RecordStatus::Deleted; - fetch_cc->rec_ts_= read_closure->Ts(); + fetch_bucket_data_cc->AddDataItem(std::move(tx_key), "", 1, true); } else { - fetch_cc->rec_status_= txservice::RecordStatus::Normal; - fetch_cc->rec_ts_= read_closure->Ts(); - fetch_cc->rec_str_.assign(val.data() + offset, val.size() - offset); + fetch_bucket_data_cc->AddDataItem( + std::move(tx_key), std::move(value_str), ts, false); } + } + + // callback_data->session_id_ = scan_next_closure->GetSessionId(); + fetch_bucket_data_cc->SetFinish( + static_cast(txservice::CcErrorCode::NO_ERROR)); + + delete callback_data; +} + +void FetchSnapshotCallback(void *data, + ::google::protobuf::Closure *closure, + DataStoreServiceClient &client, + const remote::CommonResult &result) +{ + auto *read_closure = static_cast(closure); + auto *fetch_cc = static_cast(data); + auto err_code = result.error_code(); + + if (metrics::enable_kv_metrics) + { + metrics::kv_meter->CollectDuration(metrics::NAME_KV_READ_DURATION, + fetch_cc->start_); + metrics::kv_meter->Collect(metrics::NAME_KV_READ_TOTAL, 1); + } + + if (err_code == remote::DataStoreError::KEY_NOT_FOUND) + { + fetch_cc->rec_status_ = txservice::RecordStatus::Deleted; + fetch_cc->rec_ts_ = 1U; + fetch_cc->SetFinish(0); - } - else - { - auto op_st= client.FetchVisibleArchive(fetch_cc); - assert(op_st == - txservice::store::DataStoreHandler::DataStoreOpStatus::Success); - if (op_st != - txservice::store::DataStoreHandler::DataStoreOpStatus::Success) + } + else if (err_code == remote::DataStoreError::NO_ERROR) + { + std::string_view val = read_closure->Value(); + + if (fetch_cc->table_name_.IsHashPartitioned()) + { + LOG(WARNING) << "FetchSnapshot on hash partition not supported"; + assert(false); + fetch_cc->SetFinish( + static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); + } + else { - LOG(ERROR) << "FetchSnapshot failed on FetchArchive, key: " - << fetch_cc->tx_key_.ToString(); - fetch_cc->SetFinish( - static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); + // Range partition + if (fetch_cc->snapshot_read_ts_ >= read_closure->Ts()) + { + bool is_deleted = false; + size_t offset = 0; + if (!DataStoreServiceClient::DeserializeTxRecordStr( + val, is_deleted, offset)) + { + LOG(ERROR) << "====fetch snapshot===decode error==" + << " key: " + << std::string_view(fetch_cc->tx_key_.Data(), + fetch_cc->tx_key_.Size()) + << " status: " << (int) fetch_cc->rec_status_; + std::abort(); + } + + if (is_deleted) + { + fetch_cc->rec_status_ = txservice::RecordStatus::Deleted; + fetch_cc->rec_ts_ = read_closure->Ts(); + } + else + { + fetch_cc->rec_status_ = txservice::RecordStatus::Normal; + fetch_cc->rec_ts_ = read_closure->Ts(); + fetch_cc->rec_str_.assign(val.data() + offset, + val.size() - offset); + } + fetch_cc->SetFinish(0); + } + else + { + auto op_st = client.FetchVisibleArchive(fetch_cc); + assert(op_st == txservice::store::DataStoreHandler:: + DataStoreOpStatus::Success); + if (op_st != txservice::store::DataStoreHandler:: + DataStoreOpStatus::Success) + { + LOG(ERROR) << "FetchSnapshot failed on FetchArchive, key: " + << fetch_cc->tx_key_.ToString(); + fetch_cc->SetFinish(static_cast( + txservice::CcErrorCode::DATA_STORE_ERR)); + } + } } - } } - } - else - { - fetch_cc->SetFinish( - static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); - } + else + { + fetch_cc->SetFinish( + static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); + } } void AsyncDropTableCallback(void *data, @@ -480,6 +544,7 @@ void FetchAllDatabaseCallback(void *data, fetch_data->dbnames_.back(), fetch_data->end_key_, fetch_data->session_id_, + true, false, false, true, @@ -549,6 +614,7 @@ void DiscoverAllTableNamesCallback(void *data, fetch_data->table_names_.back(), fetch_data->end_key_, fetch_data->session_id_, + true, false, false, true, @@ -682,6 +748,7 @@ void FetchTableRangesCallback(void *data, fetch_data->start_key_, fetch_data->end_key_, fetch_data->session_id_, + true, false, false, true, @@ -780,6 +847,7 @@ void FetchRangeSlicesCallback(void *data, fetch_req->CurrentSegmentId()); client.Read(fetch_data->kv_range_slices_table_name_, fetch_data->kv_partition_id_, + "", fetch_data->key_, fetch_data, &FetchRangeSlicesCallback); @@ -878,6 +946,7 @@ void FetchRangeSlicesCallback(void *data, fetch_req->CurrentSegmentId()); client.Read(fetch_data->kv_range_slices_table_name_, fetch_data->kv_partition_id_, + "", fetch_data->key_, fetch_data, &FetchRangeSlicesCallback); @@ -997,6 +1066,7 @@ void FetchTableStatsCallback(void *data, fetch_data->start_key_, fetch_data->end_key_, fetch_data->session_id_, + true, false, false, true, @@ -1098,6 +1168,7 @@ void LoadRangeSliceCallback(void *data, callback_data->last_key_, callback_data->end_key_, callback_data->sesssion_id_, + true, false, // include start_key false, // include end_key true, // scan forward @@ -1165,6 +1236,7 @@ void FetchArchivesCallback(void *data, fetch_data->start_key_, fetch_data->end_key_, scan_next_closure->SessionId(), + true, false, false, fetch_data->scan_forward_, @@ -1180,106 +1252,131 @@ void FetchRecordArchivesCallback(void *data, DataStoreServiceClient &client, const remote::CommonResult &result) { - FetchRecordArchivesCallbackData *fetch_data= - static_cast(data); - txservice::FetchRecordCc *fetch_cc= fetch_data->fetch_cc_; - - ScanNextClosure *scan_next_closure= static_cast(closure); - auto err_code= result.error_code(); - - if (err_code != remote::DataStoreError::NO_ERROR) - { - assert(err_code != remote::DataStoreError::KEY_NOT_FOUND); - DLOG(INFO) << "FetchRecordArchivesCallback, error_code:" << err_code - << ", error_msg: " << result.error_msg(); - fetch_cc->SetFinish( - static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); - delete fetch_data; - return; - } - - uint32_t items_size= scan_next_closure->ItemsSize(); - DLOG(INFO) << "FetchRecordArchivesCallback, items_size:" << items_size; - std::string archive_key; - std::string archive_value; - uint64_t commit_ts; - uint64_t ttl; - - if (fetch_cc->archive_records_ == nullptr) - { - fetch_cc->archive_records_= std::make_unique>>(); - } - auto &archive_records= *fetch_cc->archive_records_; - - archive_records.reserve(archive_records.size() + items_size); - for (uint32_t i= 0; i < items_size; i++) - { - scan_next_closure->GetItem(i, archive_key, archive_value, commit_ts, ttl); - // parse archive_value - bool is_deleted= false; - size_t value_offset= 0; - client.DecodeArchiveValue(archive_value, is_deleted, value_offset); - if (is_deleted) + FetchRecordArchivesCallbackData *fetch_data = + static_cast(data); + txservice::FetchRecordCc *fetch_cc = fetch_data->fetch_cc_; + + ScanNextClosure *scan_next_closure = + static_cast(closure); + auto err_code = result.error_code(); + + if (err_code != remote::DataStoreError::NO_ERROR) { - archive_records.emplace_back(commit_ts, txservice::RecordStatus::Deleted, - ""); + assert(err_code != remote::DataStoreError::KEY_NOT_FOUND); + DLOG(INFO) << "FetchRecordArchivesCallback, error_code:" << err_code + << ", error_msg: " << result.error_msg(); + fetch_cc->SetFinish( + static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); + delete fetch_data; + return; } - else + + uint32_t items_size = scan_next_closure->ItemsSize(); + DLOG(INFO) << "FetchRecordArchivesCallback, items_size:" << items_size; + std::string archive_key; + std::string archive_value; + uint64_t commit_ts; + uint64_t ttl; + + if (fetch_cc->archive_records_ == nullptr) + { + fetch_cc->archive_records_ = std::make_unique>>(); + } + auto &archive_records = *fetch_cc->archive_records_; + + archive_records.reserve(archive_records.size() + items_size); + for (uint32_t i = 0; i < items_size; i++) { - std::string record_str(archive_value.data() + value_offset, - archive_value.size() - value_offset); - assert(record_str.size() > 0); - archive_records.emplace_back(commit_ts, txservice::RecordStatus::Normal, - std::move(record_str)); + scan_next_closure->GetItem( + i, archive_key, archive_value, commit_ts, ttl); + // parse archive_value + bool is_deleted = false; + size_t value_offset = 0; + client.DecodeArchiveValue(archive_value, is_deleted, value_offset); + if (is_deleted) + { + archive_records.emplace_back( + commit_ts, txservice::RecordStatus::Deleted, ""); + } + else + { + std::string record_str(archive_value.data() + value_offset, + archive_value.size() - value_offset); + assert(record_str.size() > 0); + archive_records.emplace_back(commit_ts, + txservice::RecordStatus::Normal, + std::move(record_str)); + } } - } - if (scan_next_closure->BatchSize() == 1 && !scan_next_closure->ScanForward()) - { - if (items_size == 0) + if (scan_next_closure->BatchSize() == 1 && + !scan_next_closure->ScanForward()) + { + if (items_size == 0) + { + // Not found the visible archive version in the archives table. + assert(archive_records.size() == 0); + archive_records.emplace_back( + 1U, txservice::RecordStatus::Deleted, ""); + + fetch_data->start_key_ = client.EncodeArchiveKey( + fetch_cc->kv_table_name_, + std::string_view(fetch_cc->tx_key_.Data(), + fetch_cc->tx_key_.Size()), + EloqShare::host_to_big_endian(1U)); + } + else + { + fetch_data->start_key_ = std::move(archive_key); + } + + // Fetched the visible version, next scan is fetching all the + // archives whose commit_ts is bigger than the visible version. + fetch_data->end_key_ = + client.EncodeArchiveKey(fetch_cc->kv_table_name_, + std::string_view(fetch_cc->tx_key_.Data(), + fetch_cc->tx_key_.Size()), + EloqShare::host_to_big_endian(UINT64_MAX)); + + client.ScanNext(fetch_data->kv_table_name_, + fetch_data->partition_id_, + fetch_data->start_key_, + fetch_data->end_key_, + scan_next_closure->SessionId(), + true, + false, + false, + true, + 100, + nullptr, + fetch_data, + &FetchRecordArchivesCallback); + } + else if (items_size < scan_next_closure->BatchSize()) { - // Not found the visible archive version in the archives table. - assert(archive_records.size() == 0); - archive_records.emplace_back(1U, txservice::RecordStatus::Deleted, ""); - - fetch_data->start_key_= client.EncodeArchiveKey( - fetch_cc->kv_table_name_, - std::string_view(fetch_cc->tx_key_.Data(), fetch_cc->tx_key_.Size()), - EloqShare::host_to_big_endian(1U)); + assert(archive_records.size() > 0); + fetch_cc->SetFinish(0); + delete fetch_data; } else { - fetch_data->start_key_= std::move(archive_key); + // set the start key of next scan batch + fetch_data->start_key_ = std::move(archive_key); + client.ScanNext(fetch_data->kv_table_name_, + fetch_data->partition_id_, + fetch_data->start_key_, + fetch_data->end_key_, + scan_next_closure->SessionId(), + true, + false, + false, + true, + 100, + nullptr, + fetch_data, + &FetchRecordArchivesCallback); } - - // Fetched the visible version, next scan is fetching all the - // archives whose commit_ts is bigger than the visible version. - fetch_data->end_key_= client.EncodeArchiveKey( - fetch_cc->kv_table_name_, - std::string_view(fetch_cc->tx_key_.Data(), fetch_cc->tx_key_.Size()), - EloqShare::host_to_big_endian(UINT64_MAX)); - - client.ScanNext(fetch_data->kv_table_name_, fetch_data->partition_id_, - fetch_data->start_key_, fetch_data->end_key_, - scan_next_closure->SessionId(), false, false, true, 100, - nullptr, fetch_data, &FetchRecordArchivesCallback); - } - else if (items_size < scan_next_closure->BatchSize()) - { - assert(archive_records.size() > 0); - fetch_cc->SetFinish(0); - delete fetch_data; - } - else - { - // set the start key of next scan batch - fetch_data->start_key_= std::move(archive_key); - client.ScanNext(fetch_data->kv_table_name_, fetch_data->partition_id_, - fetch_data->start_key_, fetch_data->end_key_, - scan_next_closure->SessionId(), false, false, true, 100, - nullptr, fetch_data, &FetchRecordArchivesCallback); - } } void FetchSnapshotArchiveCallback(void *data, @@ -1287,62 +1384,64 @@ void FetchSnapshotArchiveCallback(void *data, DataStoreServiceClient &client, const remote::CommonResult &result) { - FetchSnapshotArchiveCallbackData *fetch_data= - static_cast(data); - txservice::FetchSnapshotCc *fetch_cc= fetch_data->fetch_cc_; - - ScanNextClosure *scan_next_closure= static_cast(closure); - auto err_code= result.error_code(); - - if (err_code != remote::DataStoreError::NO_ERROR) - { - assert(err_code != remote::DataStoreError::KEY_NOT_FOUND); - DLOG(INFO) << "FetchSnapshotArchiveCallback, error_code:" << err_code - << ", error_msg: " << result.error_msg(); - fetch_cc->SetFinish( - static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); - delete fetch_data; - return; - } - - uint32_t items_size= scan_next_closure->ItemsSize(); - DLOG(INFO) << "FetchSnapshotArchiveCallback, items_size:" << items_size; - - if (items_size == 1) - { - std::string archive_key; - std::string archive_value; - uint64_t commit_ts; - uint64_t ttl; - assert(items_size <= 1); + FetchSnapshotArchiveCallbackData *fetch_data = + static_cast(data); + txservice::FetchSnapshotCc *fetch_cc = fetch_data->fetch_cc_; + + ScanNextClosure *scan_next_closure = + static_cast(closure); + auto err_code = result.error_code(); + + if (err_code != remote::DataStoreError::NO_ERROR) + { + assert(err_code != remote::DataStoreError::KEY_NOT_FOUND); + DLOG(INFO) << "FetchSnapshotArchiveCallback, error_code:" << err_code + << ", error_msg: " << result.error_msg(); + fetch_cc->SetFinish( + static_cast(txservice::CcErrorCode::DATA_STORE_ERR)); + delete fetch_data; + return; + } - scan_next_closure->GetItem(0, archive_key, archive_value, commit_ts, ttl); + uint32_t items_size = scan_next_closure->ItemsSize(); + DLOG(INFO) << "FetchSnapshotArchiveCallback, items_size:" << items_size; - // parse archive_value - bool is_deleted= false; - size_t value_offset= 0; - client.DecodeArchiveValue(archive_value, is_deleted, value_offset); - if (is_deleted) + if (items_size == 1) { - fetch_cc->rec_ts_= commit_ts; - fetch_cc->rec_status_= txservice::RecordStatus::Deleted; + std::string archive_key; + std::string archive_value; + uint64_t commit_ts; + uint64_t ttl; + assert(items_size <= 1); + + scan_next_closure->GetItem( + 0, archive_key, archive_value, commit_ts, ttl); + + // parse archive_value + bool is_deleted = false; + size_t value_offset = 0; + client.DecodeArchiveValue(archive_value, is_deleted, value_offset); + if (is_deleted) + { + fetch_cc->rec_ts_ = commit_ts; + fetch_cc->rec_status_ = txservice::RecordStatus::Deleted; + } + else + { + fetch_cc->rec_str_.assign(archive_value.data() + value_offset, + archive_value.size() - value_offset); + fetch_cc->rec_ts_ = commit_ts; + fetch_cc->rec_status_ = txservice::RecordStatus::Normal; + } } else { - fetch_cc->rec_str_.assign(archive_value.data() + value_offset, - archive_value.size() - value_offset); - fetch_cc->rec_ts_= commit_ts; - fetch_cc->rec_status_= txservice::RecordStatus::Normal; + // Not found the visible archive version in the archives table. + fetch_cc->rec_ts_ = 1U; + fetch_cc->rec_status_ = txservice::RecordStatus::Deleted; } - } - else - { - // Not found the visible archive version in the archives table. - fetch_cc->rec_ts_= 1U; - fetch_cc->rec_status_= txservice::RecordStatus::Deleted; - } - - fetch_cc->SetFinish(0); + + fetch_cc->SetFinish(0); } } // namespace EloqDS diff --git a/data_store_service_client_closure.h b/data_store_service_client_closure.h index 244cce1..d042d2f 100644 --- a/data_store_service_client_closure.h +++ b/data_store_service_client_closure.h @@ -333,7 +333,8 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable void Reset(DataStoreServiceClient *client, const std::string_view table_name, const uint32_t partition_id, - const std::string_view key, + std::string_view be_bucket_id, + std::string_view key, void *callback_data, DataStoreCallback callback) { @@ -342,7 +343,11 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable retry_count_ = 0; table_name_ = table_name; partition_id_ = partition_id; - key_ = key; + if (!be_bucket_id.empty()) + { + key_parts_.emplace_back(be_bucket_id); + } + key_parts_.emplace_back(key); ds_service_client_ = client; callback_data_ = callback_data; callback_ = callback; @@ -361,7 +366,7 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable channel_.reset(); table_name_ = ""; partition_id_ = 0; - key_ = ""; + key_parts_.clear(); result_.Clear(); value_.clear(); ts_ = 0; @@ -388,7 +393,14 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable request_.Clear(); request_.set_kv_table_name(table_name_.data(), table_name_.size()); request_.set_partition_id(partition_id_); - request_.set_key_str(key_.data(), key_.size()); + + for (size_t idx = 0; idx < key_parts_.size(); ++idx) + { + std::string *key_part = request_.add_key_str(); + key_part->append(key_parts_[idx].data(), + key_parts_[idx].size()); + } + rpc_request_prepare_ = true; } } @@ -501,9 +513,9 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable return partition_id_; } - const std::string_view Key() + const std::vector &Key() { - return key_; + return key_parts_; } std::string &LocalValueRef() @@ -619,7 +631,7 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable // serve local call std::string_view table_name_; uint32_t partition_id_; - std::string_view key_; + std::vector key_parts_; ::EloqDS::remote::CommonResult result_; std::string value_; uint64_t ts_; @@ -1636,6 +1648,7 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable inclusive_end_ = false; scan_forward_ = true; session_id_ = ""; + generate_session_id_ = true; batch_size_ = 0; search_conditions_ = nullptr; result_.Clear(); @@ -1655,6 +1668,7 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable bool inclusive_end, bool scan_forward, const std::string_view session_id, + bool generate_session_id, const uint32_t batch_size, const std::vector *search_conditions, void *callback_data, @@ -1672,6 +1686,7 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable inclusive_end_ = inclusive_end; scan_forward_ = scan_forward; session_id_ = session_id; + generate_session_id_ = generate_session_id; batch_size_ = batch_size; search_conditions_ = search_conditions; callback_data_ = callback_data; @@ -1702,12 +1717,14 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable request_.set_kv_table_name_str(table_name_.data(), table_name_.size()); request_.set_partition_id(partition_id_); + request_.set_start_key(start_key_.data(), start_key_.size()); request_.set_inclusive_start(inclusive_start_); request_.set_inclusive_end(inclusive_end_); request_.set_end_key(end_key_.data(), end_key_.size()); request_.set_scan_forward(scan_forward_); request_.set_session_id(session_id_); + request_.set_generate_session_id(generate_session_id_); request_.set_batch_size(batch_size_); if (search_conditions_) { @@ -1866,6 +1883,11 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable return session_id_; } + bool GenerateSessionId() const + { + return generate_session_id_; + } + const std::string &SessionId() const { if (is_local_request_) @@ -1970,6 +1992,7 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable bool inclusive_end_{false}; bool scan_forward_{true}; std::string session_id_; + bool generate_session_id_{true}; uint32_t batch_size_; const std::vector *search_conditions_; @@ -2349,6 +2372,26 @@ void FetchArchivesCallback(void *data, DataStoreServiceClient &client, const remote::CommonResult &result); +void FetchBucketDataCallback(void *data, + ::google::protobuf::Closure *closure, + DataStoreServiceClient &client, + const remote::CommonResult &result); + +struct FetchBucketDataCallbackData +{ + FetchBucketDataCallbackData( + txservice::FetchBucketDataCc *fetch_bucket_data_cc) + : fetch_bucket_data_cc_(fetch_bucket_data_cc) + { + } + + txservice::FetchBucketDataCc *fetch_bucket_data_cc_; + std::string bucket_kv_start_key_; // key owner + std::string bucket_kv_end_key_; + std::string session_id_; + std::vector search_cond_; +}; + struct FetchRecordArchivesCallbackData { FetchRecordArchivesCallbackData(txservice::FetchRecordCc *fetch_cc, diff --git a/data_store_service_scanner.cpp b/data_store_service_scanner.cpp index 5cdfac0..473c6e5 100644 --- a/data_store_service_scanner.cpp +++ b/data_store_service_scanner.cpp @@ -67,6 +67,7 @@ bool SinglePartitionScanner::FetchNextBatch() last_key_, scanner_->GetEndKey(), session_id_, + true, // only set inclusive_start for the first batch first_batch_fetched_ ? false : scanner_->IsInclusiveStart(), scanner_->IsInclusiveEnd(), diff --git a/eloq_data_store_service/data_store_service.cpp b/eloq_data_store_service/data_store_service.cpp index 196e2f4..8b806c3 100644 --- a/eloq_data_store_service/data_store_service.cpp +++ b/eloq_data_store_service/data_store_service.cpp @@ -381,7 +381,7 @@ void DataStoreService::Read(::google::protobuf::RpcController *controller, void DataStoreService::Read(const std::string_view table_name, const uint32_t partition_id, - const std::string_view key, + const std::vector &key, std::string *record, uint64_t *ts, uint64_t *ttl, @@ -410,7 +410,7 @@ void DataStoreService::Read(const std::string_view table_name, ReadLocalRequest *req = local_read_request_pool_.NextObject(); req->Reset( - this, table_name, partition_id, key, record, ts, ttl, result, done); + this, table_name, partition_id, &key, record, ts, ttl, result, done); data_store_map_[shard_id]->Read(req); } @@ -821,6 +821,7 @@ void DataStoreService::ScanNext( const std::vector *search_conditions, std::vector *items, std::string *session_id, + bool generate_session_id, ::EloqDS::remote::CommonResult *result, ::google::protobuf::Closure *done) { @@ -866,6 +867,7 @@ void DataStoreService::ScanNext( search_conditions, items, session_id, + generate_session_id, result, done); @@ -994,7 +996,7 @@ void DataStoreService::ScanClose(const std::string_view table_name, } ScanLocalRequest *req = local_scan_request_pool_.NextObject(); - req->Reset(this, table_name, partition_id, session_id, result, done); + req->Reset(this, table_name, partition_id, session_id, false, result, done); data_store_map_[shard_id]->ScanClose(req); } diff --git a/eloq_data_store_service/data_store_service.h b/eloq_data_store_service/data_store_service.h index 0f74640..bda606a 100644 --- a/eloq_data_store_service/data_store_service.h +++ b/eloq_data_store_service/data_store_service.h @@ -229,7 +229,7 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService */ void Read(const std::string_view table_name, const uint32_t partition_id, - const std::string_view key, + const std::vector &key, std::string *record, uint64_t *ts, uint64_t *ttl, @@ -415,6 +415,7 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService const std::vector *search_conditions, std::vector *items, std::string *session_id, + bool generate_session_id, ::EloqDS::remote::CommonResult *result, ::google::protobuf::Closure *done); diff --git a/eloq_data_store_service/ds_request.proto b/eloq_data_store_service/ds_request.proto index cbc3413..f159a32 100644 --- a/eloq_data_store_service/ds_request.proto +++ b/eloq_data_store_service/ds_request.proto @@ -86,7 +86,7 @@ message CommonResult { message ReadRequest { uint32 partition_id = 1; string kv_table_name = 2; - bytes key_str = 3; + repeated bytes key_str = 3; } message ReadResponse { @@ -306,6 +306,7 @@ message ScanRequest { bool scan_forward = 8; // Scan direction uint32 batch_size = 9; // Number of items to return in scan repeated SearchCondition search_conditions = 10; + bool generate_session_id = 11; } // Response message for scan operations diff --git a/eloq_data_store_service/internal_request.h b/eloq_data_store_service/internal_request.h index d64a845..ad50821 100644 --- a/eloq_data_store_service/internal_request.h +++ b/eloq_data_store_service/internal_request.h @@ -601,7 +601,9 @@ class ReadRequest : public Poolable // paramters in virtual const std::string_view GetTableName() const = 0; - virtual const std::string_view GetKey() const = 0; + virtual std::string_view GetKey(size_t index) const = 0; + + virtual size_t PartsCountPerKey() const = 0; virtual uint32_t GetPartitionId() const = 0; @@ -648,9 +650,14 @@ class ReadRpcRequest : public ReadRequest return req_->kv_table_name(); } - const std::string_view GetKey() const override + std::string_view GetKey(size_t index) const override + { + return req_->key_str(index); + } + + size_t PartsCountPerKey() const override { - return req_->key_str(); + return req_->key_str_size(); } uint32_t GetPartitionId() const override @@ -703,7 +710,7 @@ class ReadLocalRequest : public ReadRequest void Reset(DataStoreService *ds_service, const std::string_view table_name, const uint32_t partition_id, - const std::string_view key, + const std::vector *key_parts, std::string *record, uint64_t *record_ts, uint64_t *record_ttl, @@ -712,7 +719,7 @@ class ReadLocalRequest : public ReadRequest { ds_service_ = ds_service; table_name_ = table_name; - key_ = key; + key_parts_ = key_parts; partition_id_ = partition_id; record_ = record; record_ts_ = record_ts; @@ -725,7 +732,7 @@ class ReadLocalRequest : public ReadRequest { ds_service_ = nullptr; table_name_ = ""; - key_ = ""; + key_parts_ = nullptr; partition_id_ = 0; record_ = nullptr; record_ts_ = nullptr; @@ -739,9 +746,14 @@ class ReadLocalRequest : public ReadRequest return table_name_; } - const std::string_view GetKey() const override + std::string_view GetKey(size_t index) const override + { + return (*key_parts_)[index]; + } + + size_t PartsCountPerKey() const override { - return key_; + return key_parts_->size(); } uint32_t GetPartitionId() const override @@ -779,7 +791,7 @@ class ReadLocalRequest : public ReadRequest private: DataStoreService *ds_service_{nullptr}; std::string_view table_name_{""}; - std::string_view key_{""}; + const std::vector *key_parts_{nullptr}; uint32_t partition_id_{0}; std::string *record_{nullptr}; uint64_t *record_ts_{nullptr}; @@ -1050,6 +1062,8 @@ class ScanRequest : public Poolable virtual void SetSessionId(const std::string &session_id) = 0; + virtual bool GenerateSessionId() const = 0; + virtual void ClearSessionId() = 0; virtual const std::string &GetSessionId() = 0; @@ -1169,6 +1183,11 @@ class ScanRpcRequest : public ScanRequest return req_->session_id(); } + bool GenerateSessionId() const override + { + return req_->generate_session_id(); + } + void SetFinish(const ::EloqDS::remote::DataStoreError error_code, const std::string error_message) override { @@ -1212,6 +1231,7 @@ class ScanLocalRequest : public ScanRequest const std::vector *search_conditions, std::vector *items, std::string *session_id, + bool generate_session_id, ::EloqDS::remote::CommonResult *result, google::protobuf::Closure *done) { @@ -1227,6 +1247,7 @@ class ScanLocalRequest : public ScanRequest search_conditions_ = search_conditions; items_ = items; session_id_ = session_id; + generate_session_id_ = generate_session_id; result_ = result; done_ = done; } @@ -1235,6 +1256,7 @@ class ScanLocalRequest : public ScanRequest const std::string_view table_name, const uint32_t partition_id, std::string *session_id, + bool generate_session_id, ::EloqDS::remote::CommonResult *result, google::protobuf::Closure *done) { @@ -1242,6 +1264,7 @@ class ScanLocalRequest : public ScanRequest table_name_ = table_name; partition_id_ = partition_id; session_id_ = session_id; + generate_session_id_ = generate_session_id; result_ = result; done_ = done; } @@ -1260,6 +1283,7 @@ class ScanLocalRequest : public ScanRequest search_conditions_ = nullptr; items_ = nullptr; session_id_ = nullptr; + generate_session_id_ = true; result_ = nullptr; done_ = nullptr; } @@ -1341,6 +1365,11 @@ class ScanLocalRequest : public ScanRequest return *session_id_; } + bool GenerateSessionId() const override + { + return generate_session_id_; + } + void SetFinish(const ::EloqDS::remote::DataStoreError error_code, const std::string error_message) override { @@ -1367,6 +1396,7 @@ class ScanLocalRequest : public ScanRequest const std::vector *search_conditions_{nullptr}; std::vector *items_{nullptr}; std::string *session_id_{nullptr}; + bool generate_session_id_{true}; EloqDS::remote::CommonResult *result_{nullptr}; google::protobuf::Closure *done_{nullptr}; }; diff --git a/eloq_data_store_service/rocksdb_data_store_common.cpp b/eloq_data_store_service/rocksdb_data_store_common.cpp index cfc0f7f..241a2c4 100644 --- a/eloq_data_store_service/rocksdb_data_store_common.cpp +++ b/eloq_data_store_service/rocksdb_data_store_common.cpp @@ -1,7 +1,8 @@ +#include "rocksdb_data_store_common.h" + #include #include "internal_request.h" -#include "rocksdb_data_store_common.h" namespace EloqDS { @@ -409,7 +410,6 @@ void RocksDBDataStoreCommon::Read(ReadRequest *req) auto table_name = req->GetTableName(); uint32_t partition_id = req->GetPartitionId(); - auto key = req->GetKey(); std::shared_lock db_lk(db_mux_); @@ -420,7 +420,7 @@ void RocksDBDataStoreCommon::Read(ReadRequest *req) return; } - std::string key_str = BuildKey(table_name, partition_id, key); + std::string key_str = this->BuildKey(table_name, partition_id, req); std::string value; rocksdb::ReadOptions read_options; rocksdb::Status status = db->Get(read_options, key_str, &value); @@ -966,7 +966,7 @@ void RocksDBDataStoreCommon::ScanNext(ScanRequest *scan_req) // Set session id carry over to the response scan_req->SetSessionId(session_id); } - else + else if (scan_req->GenerateSessionId()) { // Otherwise, save the iterator in the session map auto iter_wrapper = @@ -979,6 +979,10 @@ void RocksDBDataStoreCommon::ScanNext(ScanRequest *scan_req) data_store_service_->EmplaceScanIter( shard_id_, session_id, std::move(iter_wrapper)); } + else + { + delete iter; + } } scan_req->SetFinish(::EloqDS::remote::DataStoreError::NO_ERROR); @@ -1076,10 +1080,9 @@ void RocksDBDataStoreCommon::SwitchToReadWrite() } } // Build key in RocksDB -const std::string RocksDBDataStoreCommon::BuildKey( - const std::string_view table_name, - uint32_t partition_id, - const std::string_view key) +std::string RocksDBDataStoreCommon::BuildKey(const std::string_view table_name, + uint32_t partition_id, + const std::string_view key) { std::string tmp_key; tmp_key.reserve(table_name.size() + 2 + key.size()); @@ -1091,6 +1094,32 @@ const std::string RocksDBDataStoreCommon::BuildKey( return tmp_key; } +std::string RocksDBDataStoreCommon::BuildKey(const std::string_view table_name, + uint32_t partition_id, + const ReadRequest *read_request) +{ + size_t total_key_size = 0; + for (size_t idx = 0; idx < read_request->PartsCountPerKey(); ++idx) + { + total_key_size += read_request->GetKey(idx).size(); + } + + total_key_size += table_name.size() + 2; + + std::string tmp_key; + tmp_key.reserve(total_key_size); + tmp_key.append(table_name); + tmp_key.append(KEY_SEPARATOR); + tmp_key.append(std::to_string(partition_id)); + tmp_key.append(KEY_SEPARATOR); + + for (size_t idx = 0; idx < read_request->PartsCountPerKey(); ++idx) + { + tmp_key.append(read_request->GetKey(idx)); + } + return tmp_key; +} + const std::string RocksDBDataStoreCommon::BuildKeyForDebug( const std::unique_ptr &key_slices, size_t slice_size) { diff --git a/eloq_data_store_service/rocksdb_data_store_common.h b/eloq_data_store_service/rocksdb_data_store_common.h index ead10f1..f558036 100644 --- a/eloq_data_store_service/rocksdb_data_store_common.h +++ b/eloq_data_store_service/rocksdb_data_store_common.h @@ -251,9 +251,14 @@ class RocksDBDataStoreCommon : public DataStore #endif protected: - const std::string BuildKey(const std::string_view table_name, - uint32_t partition_id, - const std::string_view key); + std::string BuildKey(const std::string_view table_name, + uint32_t partition_id, + const std::string_view key); + + std::string BuildKey(const std::string_view table_name, + uint32_t partition_id, + const ReadRequest *read_request); + const std::string BuildKeyForDebug( const std::unique_ptr &key_slices, size_t slice_size); @@ -262,6 +267,7 @@ class RocksDBDataStoreCommon : public DataStore void BuildKey(const std::string_view prefix, const std::string_view key, std::string &key_out); + void BuildKeyPrefixSlices( const std::string_view table_name, const std::string_view partition_id, @@ -296,7 +302,6 @@ class RocksDBDataStoreCommon : public DataStore rocksdb::InfoLogLevel StringToInfoLogLevel( const std::string &log_level_str); - protected: rocksdb::InfoLogLevel info_log_level_; const bool enable_stats_; diff --git a/rocksdb_handler.cpp b/rocksdb_handler.cpp index 7e3b9f5..b2ff86c 100644 --- a/rocksdb_handler.cpp +++ b/rocksdb_handler.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -70,6 +71,7 @@ #include "rocksdb/rate_limiter.h" #include "rocksdb_scanner.h" #include "store_util.h" +#include "tx_key.h" #include "tx_record.h" #include "tx_service/include/cc/cc_req_pool.h" #include "tx_service/include/error_messages.h" @@ -425,28 +427,29 @@ bool RocksDBHandler::PutAll( { for (auto &flush_rec : *flush_task_entry->data_sync_vec_) { + // TODO(lokax): encode bucket id txservice::TxKey key = flush_rec.Key(); - const EloqKV::EloqKey *redis_key = - key.GetKey(); + std::string rocksdb_key = EncodeToKvKey(key); + if (flush_rec.payload_status_ == txservice::RecordStatus::Normal && flush_rec.Payload()->GetTTL() > now) { std::vector rec_buf; SerializeFlushRecord(flush_rec, rec_buf); - write_batch_size += redis_key->Length(); + write_batch_size += rocksdb_key.size(); write_batch_size += rec_buf.size(); write_batch.Put( cfh, - rocksdb::Slice(redis_key->Buf(), redis_key->Length()), + rocksdb::Slice(rocksdb_key.data(), rocksdb_key.size()), rocksdb::Slice(rec_buf.data(), rec_buf.size())); } else { - write_batch_size += redis_key->Length(); + write_batch_size += rocksdb_key.size(); write_batch.Delete( cfh, - rocksdb::Slice(redis_key->Buf(), redis_key->Length())); + rocksdb::Slice(rocksdb_key.data(), rocksdb_key.size())); } if (write_batch_size >= batch_write_size_) @@ -1116,8 +1119,9 @@ RocksDBHandler::FetchRecord(txservice::FetchRecordCc *fetch_cc, LOG_IF(ERROR, fetch_snapshot_cc != nullptr) << "RocksDBHandler::FetchRecord with FetchSnapshotCc not implemented"; - const EloqKey *redis_key_ptr = fetch_cc->tx_key_.GetKey(); - EloqKey redis_key_copy(*redis_key_ptr); + // TODO(lokax): encode bucket id + std::string rocksdb_key = EncodeToKvKey(fetch_cc->tx_key_); + if (metrics::enable_kv_metrics) { fetch_cc->start_ = metrics::Clock::now(); @@ -1126,7 +1130,7 @@ RocksDBHandler::FetchRecord(txservice::FetchRecordCc *fetch_cc, query_worker_pool_->SubmitWork( [this, fetch_cc, - redis_key = std::move(redis_key_copy), + rocksdb_key = std::move(rocksdb_key), kv_cf_name = fetch_cc->table_schema_->GetKVCatalogInfo()->kv_table_name_]() { @@ -1156,7 +1160,7 @@ RocksDBHandler::FetchRecord(txservice::FetchRecordCc *fetch_cc, rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cfh, - rocksdb::Slice(redis_key.Buf(), redis_key.Length()), + rocksdb::Slice(rocksdb_key.data(), rocksdb_key.size()), &value); if (metrics::enable_kv_metrics) { @@ -1219,6 +1223,98 @@ rocksdb::ColumnFamilyHandle *RocksDBHandler::GetColumnFamilyHandler( return nullptr; } +txservice::store::DataStoreHandler::DataStoreOpStatus +RocksDBHandler::FetchBucketData( + txservice::FetchBucketDataCc *fetch_bucket_data_cc) +{ + assert(fetch_bucket_data_cc != nullptr); + + query_worker_pool_->SubmitWork( + [this, fetch_bucket_data_cc]() + { + std::shared_lock db_lk(db_mux_); + auto db = GetDBPtr(); + if (!db) + { + fetch_bucket_data_cc->SetFinish(static_cast( + txservice::CcErrorCode::DATA_STORE_ERR)); + return; + } + + rocksdb::ColumnFamilyHandle *cfh = + GetColumnFamilyHandler(fetch_bucket_data_cc->kv_table_name_); + + if (cfh == nullptr) + { + LOG(ERROR) << "Failed to get column family, cf name: " + << fetch_bucket_data_cc->kv_table_name_; + fetch_bucket_data_cc->SetFinish(static_cast( + txservice::CcErrorCode::DATA_STORE_ERR)); + return; + } + + assert(cfh != nullptr); + + std::string kv_bucket_start_key = + EncodeToKvKey(fetch_bucket_data_cc->bucket_id_, + fetch_bucket_data_cc->start_key_); + std::string kv_bucket_end_key = + EncodeToKvKey(fetch_bucket_data_cc->bucket_id_ + 1); + + rocksdb::ReadOptions read_options; + // NOTICE: do not enable async_io if compiling rocksdbcloud + // without iouring. + read_options.async_io = false; + rocksdb::Iterator *iter = db->NewIterator(read_options, cfh); + rocksdb::Slice key(kv_bucket_start_key); + iter->Seek(key); + if (!fetch_bucket_data_cc->start_key_inclusive_ && iter->Valid()) + { + rocksdb::Slice curr_key = iter->key(); + if (curr_key == key) + { + iter->Next(); + } + } + + size_t record_count = 0; + while (iter->Valid() && + record_count < fetch_bucket_data_cc->batch_size_) + { + if (iter->key().ToStringView() >= kv_bucket_end_key) + { + break; + } + + // TODO(lokax): support search condition pushdown + + bool is_deleted = false; + int64_t version_ts = 0; + std::string rec_str; + std::string key_str = DecodeTxKeyFromKvKey(iter->key().data(), + iter->key().size()); + DeserializeRecord(iter->value().data(), + iter->value().size(), + rec_str, + is_deleted, + version_ts); + + fetch_bucket_data_cc->AddDataItem(std::move(key_str), + std::move(rec_str), + version_ts, + is_deleted); + iter->Next(); + record_count++; + } + + delete iter; + fetch_bucket_data_cc->SetFinish( + static_cast(txservice::CcErrorCode::NO_ERROR)); + }); + + return DataStoreOpStatus::Success; +} + std::unique_ptr RocksDBHandler::ScanForward( const txservice::TableName &table_name, uint32_t ng_id, @@ -1544,12 +1640,62 @@ void RocksDBHandler::ParallelIterateTable( core_cnt, cancel_data_loading_on_error.get()); + std::optional current_batch_bucket_id; + size_t current_batch_key_cnt = 0; + size_t cnt = 0; for (it->SeekToFirst(); it->Valid(); it->Next()) { - rocksdb::Slice key = it->key(); - std::string key_str = std::string(key.data(), key.size()); + // TODO(lokax): decode bucket id + rocksdb::Slice rocksdb_key = it->key(); + uint16_t bucket_id = + DecodeBucketIdFromKvKey(rocksdb_key.data(), rocksdb_key.size()); + if (!current_batch_bucket_id.has_value()) + { + // first key + current_batch_bucket_id = bucket_id; + } + + // current bucket is drained + if (current_batch_bucket_id.value() != bucket_id) + { + // if we need to upload data to ccmap + if (current_batch_key_cnt > 0) + { + if (cancel_data_loading_on_error->load( + std::memory_order_acquire) != + txservice::CcErrorCode::NO_ERROR) + { + cc->Free(); + break; + } + for (uint16_t core = 0; core < core_cnt; core++) + { + local_cc_shards->EnqueueToCcShard(core, cc); + } + cc = cc_pool.NextRequest(); + while (cc == nullptr) + { + // sleep 100u if on fly cc count reachs limit + bthread_usleep(100); + cc = cc_pool.NextRequest(); + } + cc->Reset(&table_name, + cc_ng_id, + cc_ng_term, + core_cnt, + cancel_data_loading_on_error.get()); + } + + // update bucket id + current_batch_bucket_id = bucket_id; + current_batch_key_cnt = 0; + } + assert(current_batch_bucket_id == bucket_id); + + std::string key_str = + DecodeTxKeyFromKvKey(rocksdb_key.data(), rocksdb_key.size()); rocksdb::Slice val = it->value(); std::string val_str = std::string(val.data(), val.size()); size_t hash = EloqKey::Hash(key_str.data(), key_str.size()); @@ -1572,10 +1718,14 @@ void RocksDBHandler::ParallelIterateTable( std::move(rec_str), version_ts, is_deleted); + current_batch_key_cnt++; cnt++; } - if (cnt % batch_size == 0) + assert(current_batch_key_cnt <= batch_size); + + // TODO(lokax): + if (current_batch_key_cnt >= batch_size) { if (cancel_data_loading_on_error->load( std::memory_order_acquire) != @@ -1600,11 +1750,13 @@ void RocksDBHandler::ParallelIterateTable( cc_ng_term, core_cnt, cancel_data_loading_on_error.get()); + + current_batch_key_cnt = 0; } } // submit remaining data item for processing - if (cnt % batch_size != 0) + if (current_batch_key_cnt > 0) { if (cancel_data_loading_on_error->load(std::memory_order_acquire) == txservice::CcErrorCode::NO_ERROR) @@ -1882,6 +2034,54 @@ bool RocksDBHandler::OnLeaderStart(uint32_t *next_leader_node) return succ; } +std::string RocksDBHandler::EncodeToKvKey(uint16_t bucket_id) +{ + std::string rocksdb_key; + uint16_t be_bucket_id = EloqShare::host_to_big_endian(bucket_id); + rocksdb_key.append(reinterpret_cast(&be_bucket_id), + sizeof(be_bucket_id)); + return rocksdb_key; +} + +std::string RocksDBHandler::EncodeToKvKey(uint16_t bucket_id, + const txservice::TxKey &tx_key) +{ + std::string rocksdb_key; + uint16_t be_bucket_id = EloqShare::host_to_big_endian(bucket_id); + rocksdb_key.reserve(sizeof(uint16_t) + tx_key.Size()); + rocksdb_key.append(reinterpret_cast(&be_bucket_id), + sizeof(be_bucket_id)); + if (tx_key.Type() == txservice::KeyType::Normal) + { + rocksdb_key.append(tx_key.Data(), tx_key.Size()); + } + + return rocksdb_key; +} + +std::string RocksDBHandler::EncodeToKvKey(const txservice::TxKey &tx_key) +{ + uint16_t bucket_id = + txservice::Sharder::Instance().MapKeyHashToBucketId(tx_key.Hash()); + return EncodeToKvKey(bucket_id, tx_key); +} + +std::string RocksDBHandler::DecodeTxKeyFromKvKey(const char *data, size_t size) +{ + assert(size >= sizeof(uint16_t)); + const char *tx_key_start = data + sizeof(uint16_t); + size_t tx_key_len = size - sizeof(uint16_t); + return std::string(tx_key_start, tx_key_len); +} + +uint16_t RocksDBHandler::DecodeBucketIdFromKvKey(const char *data, size_t size) +{ + assert(size >= sizeof(uint16_t)); + uint16_t be_bucket_id; + std::memcpy(&be_bucket_id, data, sizeof(be_bucket_id)); + return EloqShare::big_endian_to_host(be_bucket_id); +} + void RocksDBHandler::OnStartFollowing() { // shutdown previous opened db diff --git a/rocksdb_handler.h b/rocksdb_handler.h index a7a1a3c..94518f1 100644 --- a/rocksdb_handler.h +++ b/rocksdb_handler.h @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -37,10 +38,12 @@ #include "cc_map.h" #include "cc_req_base.h" +#include "cc_req_misc.h" #include "cc_shard.h" #include "error_messages.h" #include "kv_store.h" #include "rocksdb/compaction_filter.h" +#include "tx_key.h" #if (defined(ROCKSDB_CLOUD_FS_TYPE) && \ (ROCKSDB_CLOUD_FS_TYPE == ROCKSDB_CLOUD_FS_TYPE_S3 || \ ROCKSDB_CLOUD_FS_TYPE == ROCKSDB_CLOUD_FS_TYPE_GCS)) @@ -230,7 +233,7 @@ struct RocksDBCatalogInfo : public txservice::KVCatalogInfo { } RocksDBCatalogInfo(const std::string &kv_table_name, - const std::string &kv_index_names){}; + const std::string &kv_index_names) {}; ~RocksDBCatalogInfo() { } @@ -268,9 +271,10 @@ class RocksDBHandler : public txservice::store::DataStoreHandler * @param node_group * @return whether all entries are written to data store successfully */ - bool PutAll(std::unordered_map>> - &batch) override; + bool PutAll(std::unordered_map< + std::string_view, + std::vector>> &batch) + override; /** * @brief indicate end of flush entries in a single ckpt for \@param @@ -350,7 +354,6 @@ class RocksDBHandler : public txservice::store::DataStoreHandler const txservice::TxKey *start_key, const txservice::TableSchema *table_schema) override; - bool Read(const txservice::TableName &table_name, const txservice::TxKey &key, txservice::TxRecord &rec, @@ -374,9 +377,13 @@ class RocksDBHandler : public txservice::store::DataStoreHandler txservice::store::DataStoreHandler::DataStoreOpStatus FetchRecord( txservice::FetchRecordCc *fetch_cc, txservice::FetchSnapshotCc *fetch_snapshot_cc = nullptr) override; + rocksdb::ColumnFamilyHandle *GetColumnFamilyHandler(const std::string &cf); + txservice::store::DataStoreHandler::DataStoreOpStatus FetchBucketData( + txservice::FetchBucketDataCc *fetch_bucket_data_cc) override; + std::unique_ptr ScanForward( const txservice::TableName &table_name, uint32_t ng_id, @@ -454,14 +461,15 @@ class RocksDBHandler : public txservice::store::DataStoreHandler bool PutArchivesAll(std::unordered_map< std::string_view, std::vector>> - &batch) override; + &batch) override; /** * @brief Copy record from base/sk table to mvcc_archives. */ bool CopyBaseToArchive( - std::unordered_map>> - &batch) override; + std::unordered_map< + std::string_view, + std::vector>> &batch) + override; /** * @brief Get the latest visible(commit_ts <= upper_bound_ts) @@ -513,6 +521,13 @@ class RocksDBHandler : public txservice::store::DataStoreHandler cancel_data_loading_on_error, std::shared_ptr> on_flying_count); + static std::string EncodeToKvKey(uint16_t bucket_id); + static std::string EncodeToKvKey(uint16_t bucket_id, + const txservice::TxKey &tx_key); + static std::string EncodeToKvKey(const txservice::TxKey &tx_key); + static std::string DecodeTxKeyFromKvKey(const char *data, size_t size); + static uint16_t DecodeBucketIdFromKvKey(const char *data, size_t size); + bool OnLeaderStart(uint32_t *next_leader_node) override; void OnStartFollowing() override;