diff --git a/data_store_service_client.cpp b/data_store_service_client.cpp index eab681d..5706afa 100644 --- a/data_store_service_client.cpp +++ b/data_store_service_client.cpp @@ -85,13 +85,6 @@ static const std::string_view KEY_SEPARATOR("\\"); DataStoreServiceClient::~DataStoreServiceClient() { - { - std::unique_lock lk(ds_service_mutex_); - ds_serv_shutdown_indicator_.store(true, std::memory_order_release); - ds_service_cv_.notify_all(); - LOG(INFO) << "Notify ds_serv_shutdown_indicator"; - } - upsert_table_worker_.Shutdown(); } @@ -109,15 +102,35 @@ DataStoreServiceClient::~DataStoreServiceClient() void DataStoreServiceClient::SetupConfig( const DataStoreServiceClusterManager &cluster_manager) { - for (const auto &[_, group] : cluster_manager.GetAllShards()) + assert(cluster_manager.GetShardCount() == 1); + auto current_version = + dss_topology_version_.load(std::memory_order_acquire); + auto new_version = cluster_manager.GetTopologyVersion(); + if (current_version <= cluster_manager.GetTopologyVersion() && + dss_topology_version_.compare_exchange_strong(current_version, + new_version)) { - for (const auto &node : group.nodes_) + for (const auto &[_, group] : cluster_manager.GetAllShards()) { - LOG(INFO) << "Node Hostname: " << node.host_name_ - << ", Port: " << node.port_; + for (const auto &node : group.nodes_) + { + LOG(INFO) << "Node Hostname: " << node.host_name_ + << ", Port: " << node.port_; + } + // The first node is the owner of shard. + assert(group.nodes_.size() > 0); + while (!UpgradeShardVersion(group.shard_id_, + group.version_, + group.nodes_[0].host_name_, + group.nodes_[0].port_)) + { + LOG(INFO) << "UpgradeShardVersion failed, retry"; + bthread_usleep(1000000); + } + LOG(INFO) << "UpgradeShardVersion success, shard_id:" + << group.shard_id_ << ", version:" << group.version_; } } - cluster_manager_ = cluster_manager; } /** @@ -2749,12 +2762,12 @@ bool DataStoreServiceClient::CreateSnapshotForBackup( { CreateSnapshotForBackupClosure *closure = create_snapshot_for_backup_closure_pool_.NextObject(); - auto shards = cluster_manager_.GetAllShards(); + uint32_t shard_cnt = AllDataShardCount(); std::vector shard_ids; - shard_ids.reserve(shards.size()); - for (auto &[s_id, _] : shards) + shard_ids.reserve(shard_cnt); + for (uint32_t shard_id = 0; shard_id < shard_cnt; shard_id++) { - shard_ids.push_back(s_id); + shard_ids.push_back(shard_id); } CreateSnapshotForBackupCallbackData *callback_data = @@ -2813,17 +2826,11 @@ void DataStoreServiceClient::CreateSnapshotForBackupInternal( { // Handle remote shard closure->PrepareRequest(false); - auto channel = GetDataStoreServiceChannelByShardId(shard_id); - if (!channel) - { - LOG(WARNING) << "Failed to get channel for shard " << shard_id; - // Continue with next shard - CreateSnapshotForBackupInternal(closure); - return; - } + uint32_t node_index = GetOwnerNodeIndexOfShard(shard_id); + closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); - closure->SetChannel(channel); - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + EloqDS::remote::DataStoreRpcService_Stub stub(channel); brpc::Controller &cntl = *closure->Controller(); cntl.set_timeout_ms(30000); // Longer timeout for backup operations auto *req = closure->RemoteRequest(); @@ -2910,9 +2917,12 @@ void DataStoreServiceClient::OnShutdown() */ bool DataStoreServiceClient::IsLocalShard(uint32_t shard_id) { - // this is a temporary solution for scale up scenario (from one smaller - // node to another bigger node) - return cluster_manager_.IsOwnerOfShard(shard_id); + if (data_store_service_ != nullptr) + { + return data_store_service_->IsOwnerOfShard(shard_id); + } + + return false; } /** @@ -2927,7 +2937,186 @@ bool DataStoreServiceClient::IsLocalShard(uint32_t shard_id) */ bool DataStoreServiceClient::IsLocalPartition(int32_t partition_id) { - return cluster_manager_.IsOwnerOfPartition(partition_id); + return IsLocalShard(GetShardIdByPartitionId(partition_id)); +} + +uint32_t DataStoreServiceClient::GetShardIdByPartitionId( + int32_t partition_id) const +{ + // Now, only support one shard. + return 0; +} + +uint32_t DataStoreServiceClient::AllDataShardCount() const +{ + return dss_shards_.size(); +} + +uint32_t DataStoreServiceClient::GetOwnerNodeIndexOfShard( + uint32_t shard_id) const +{ + assert(dss_shards_[shard_id].load(std::memory_order_acquire) != UINT32_MAX); + return dss_shards_[shard_id].load(std::memory_order_acquire); +} + +bool DataStoreServiceClient::UpdateOwnerNodeIndexOfShard( + uint32_t shard_id, uint32_t old_node_index, uint32_t &new_node_index) +{ + new_node_index = dss_shards_[shard_id].load(std::memory_order_acquire); + if (new_node_index != old_node_index) + { + return true; + } + + uint64_t expect_val = 0; + uint64_t current_ts = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + if (dss_nodes_[old_node_index].expired_ts_.compare_exchange_strong( + expect_val, current_ts)) + { + // The old node channle is not updated by other, update it. + uint32_t free_index = FindFreeNodeIndex(); + if (free_index == dss_nodes_.size()) + { + LOG(ERROR) << "Find free node index failed"; + dss_nodes_[old_node_index].expired_ts_.store( + expect_val, std::memory_order_release); + return false; + } + auto &node = dss_nodes_[free_index]; + node.Reset(dss_nodes_[old_node_index].HostName(), + dss_nodes_[old_node_index].Port(), + dss_nodes_[old_node_index].ShardVersion()); + if (dss_shards_[shard_id].compare_exchange_strong(old_node_index, + free_index)) + { + new_node_index = free_index; + return true; + } + else + { + DLOG(INFO) << "Other thread updated the data shard, shard_id:" + << shard_id; + node.expired_ts_.store(1, std::memory_order_release); + new_node_index = old_node_index; + return true; + } + } + else + { + // Other thread is updating the shard. Waiting. + return false; + } +} + +uint32_t DataStoreServiceClient::FindFreeNodeIndex() +{ + uint64_t current_ts = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + for (uint32_t i = 0; i < dss_nodes_.size(); i++) + { + uint64_t expired_ts = + dss_nodes_[i].expired_ts_.load(std::memory_order_acquire); + if (expired_ts > 0 && expired_ts < current_ts && + (current_ts - expired_ts) > NodeExpiredTime && + dss_nodes_[i].expired_ts_.compare_exchange_strong(expired_ts, 0)) + { + return i; + } + } + // not found + return dss_nodes_.size(); +} + +void DataStoreServiceClient::HandleShardingError( + const ::EloqDS::remote::CommonResult &result) +{ + assert(result.error_code() == + static_cast( + ::EloqDS::remote::DataStoreError::REQUESTED_NODE_NOT_OWNER)); + + auto &new_key_sharding = result.new_key_sharding(); + auto error_type = new_key_sharding.type(); + if (error_type == + ::EloqDS::remote::KeyShardingErrorType::PrimaryNodeChanged) + { + uint32_t shard_id = new_key_sharding.shard_id(); + uint64_t shard_version = new_key_sharding.shard_version(); + auto &primary_node = new_key_sharding.new_primary_node(); + DSSNode new_primary_node; + while (!UpgradeShardVersion(shard_id, + shard_version, + primary_node.host_name(), + primary_node.port())) + { + DLOG(INFO) << "Upgrade shard version failed, shard_id: " + << shard_id; + bthread_usleep(10000); + continue; + } + } + else + { + assert(false); + // the whole node group has changed + LOG(FATAL) << "The topology of data shards is changed"; + // TODO(lzx): handle the topology of cluster change. + } +} + +bool DataStoreServiceClient::UpgradeShardVersion(uint32_t shard_id, + uint64_t shard_version, + const std::string &host_name, + uint16_t port) +{ + if (shard_id >= dss_shards_.size()) + { + assert(false); + // Now only support one shard. + LOG(FATAL) << "Shard id not found, shard_id: " << shard_id; + return true; + } + + uint32_t node_index = dss_shards_[shard_id].load(std::memory_order_acquire); + auto &node_ref = dss_nodes_[node_index]; + if (node_ref.ShardVersion() < shard_version) + { + uint64_t expect_val = 0; + uint64_t current_ts = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + if (!node_ref.expired_ts_.compare_exchange_strong(expect_val, + current_ts)) + { + // Other thread is updating the shard, retry. + DLOG(INFO) << "Other thread is updating the data shard, shard_id: " + << shard_id; + return false; + } + + uint32_t free_node_index = FindFreeNodeIndex(); + if (free_node_index == dss_nodes_.size()) + { + DLOG(INFO) << "Find free node index failed"; + node_ref.expired_ts_.store(expect_val, std::memory_order_release); + return false; + } + auto &free_node_ref = dss_nodes_[free_node_index]; + free_node_ref.Reset(host_name, port, shard_version); + if (!dss_shards_[shard_id].compare_exchange_strong(node_index, + free_node_index)) + { + assert(false); + free_node_ref.expired_ts_.store(1, std::memory_order_release); + } + } + return true; } txservice::store::DataStoreHandler::DataStoreOpStatus @@ -3000,46 +3189,40 @@ void DataStoreServiceClient::Read(const std::string_view kv_table_name, void *callback_data, DataStoreCallback callback) { - ReadClosure *read_clouse = read_closure_pool_.NextObject(); - read_clouse->Reset( + ReadClosure *read_closure = read_closure_pool_.NextObject(); + read_closure->Reset( this, kv_table_name, partition_id, key, callback_data, callback); - ReadInternal(read_clouse); + ReadInternal(read_closure); } -void DataStoreServiceClient::ReadInternal(ReadClosure *read_clouse) +void DataStoreServiceClient::ReadInternal(ReadClosure *read_closure) { - if (IsLocalPartition(read_clouse->PartitionId())) + if (IsLocalPartition(read_closure->PartitionId())) { - read_clouse->PrepareRequest(true); - data_store_service_->Read(read_clouse->TableName(), - read_clouse->PartitionId(), - read_clouse->Key(), - &read_clouse->LocalValueRef(), - &read_clouse->LocalTsRef(), - &read_clouse->LocalTtlRef(), - &read_clouse->LocalResultRef(), - read_clouse); + read_closure->PrepareRequest(true); + data_store_service_->Read(read_closure->TableName(), + read_closure->PartitionId(), + read_closure->Key(), + &read_closure->LocalValueRef(), + &read_closure->LocalTsRef(), + &read_closure->LocalTtlRef(), + &read_closure->LocalResultRef(), + read_closure); } else { - read_clouse->PrepareRequest(false); - auto channel = - GetDataStoreServiceChannelByPartitionId(read_clouse->PartitionId()); - if (!channel) - { - brpc::ClosureGuard guard(read_clouse); - ::EloqDS::remote::CommonResult &result = read_clouse->Result(); - result.set_error_code( - ::EloqDS::remote::DataStoreError::NETWORK_ERROR); - return; - } - - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); - brpc::Controller &cntl = *read_clouse->Controller(); + read_closure->PrepareRequest(false); + uint32_t node_index = GetOwnerNodeIndexOfShard( + GetShardIdByPartitionId(read_closure->PartitionId())); + read_closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); + + EloqDS::remote::DataStoreRpcService_Stub stub(channel); + brpc::Controller &cntl = *read_closure->Controller(); cntl.set_timeout_ms(5000); - auto *req = read_clouse->ReadRequest(); - auto *resp = read_clouse->ReadResponse(); - stub.Read(&cntl, req, resp, read_clouse); + auto *req = read_closure->ReadRequest(); + auto *resp = read_closure->ReadResponse(); + stub.Read(&cntl, req, resp, read_closure); } } @@ -3082,19 +3265,12 @@ void DataStoreServiceClient::DeleteRangeInternal( else { delete_range_clouse->PrepareRequest(false); - auto channel = GetDataStoreServiceChannelByPartitionId( - delete_range_clouse->PartitionId()); - if (!channel) - { - brpc::ClosureGuard guard(delete_range_clouse); - ::EloqDS::remote::CommonResult &result = - delete_range_clouse->Result(); - result.set_error_code( - ::EloqDS::remote::DataStoreError::NETWORK_ERROR); - return; - } + uint32_t node_index = GetOwnerNodeIndexOfShard( + GetShardIdByPartitionId(delete_range_clouse->PartitionId())); + delete_range_clouse->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + EloqDS::remote::DataStoreRpcService_Stub stub(channel); brpc::Controller &cntl = *delete_range_clouse->Controller(); cntl.set_timeout_ms(5000); auto *req = delete_range_clouse->DeleteRangeRequest(); @@ -3109,12 +3285,12 @@ void DataStoreServiceClient::FlushData( DataStoreCallback callback) { FlushDataClosure *closure = flush_data_closure_pool_.NextObject(); - auto shards = cluster_manager_.GetAllShards(); + uint32_t shard_cnt = AllDataShardCount(); std::vector shard_ids; - shard_ids.reserve(shards.size()); - for (auto &[s_id, _] : shards) + shard_ids.reserve(shard_cnt); + for (uint32_t shard_id = 0; shard_id < shard_cnt; shard_id++) { - shard_ids.push_back(s_id); + shard_ids.push_back(shard_id); } closure->Reset( @@ -3139,18 +3315,11 @@ void DataStoreServiceClient::FlushDataInternal( else { flush_data_closure->PrepareRequest(false); - auto channel = GetDataStoreServiceChannelByShardId(shard_id); - if (!channel) - { - brpc::ClosureGuard guard(flush_data_closure); - ::EloqDS::remote::CommonResult &result = - flush_data_closure->Result(); - result.set_error_code( - ::EloqDS::remote::DataStoreError::NETWORK_ERROR); - return; - } + uint32_t node_index = GetOwnerNodeIndexOfShard(shard_id); + flush_data_closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + EloqDS::remote::DataStoreRpcService_Stub stub(channel); brpc::Controller &cntl = *flush_data_closure->Controller(); cntl.set_timeout_ms(5000); auto *req = flush_data_closure->FlushDataRequest(); @@ -3167,12 +3336,12 @@ void DataStoreServiceClient::DropTable(std::string_view table_name, DLOG(INFO) << "DropTableWithRetry for table: " << table_name; DropTableClosure *closure = drop_table_closure_pool_.NextObject(); - auto shards = cluster_manager_.GetAllShards(); + uint32_t shard_cnt = AllDataShardCount(); std::vector shard_ids; - shard_ids.reserve(shards.size()); - for (auto &[s_id, _] : shards) + shard_ids.reserve(shard_cnt); + for (uint32_t shard_id = 0; shard_id < shard_cnt; shard_id++) { - shard_ids.push_back(s_id); + shard_ids.push_back(shard_id); } closure->Reset( @@ -3197,18 +3366,11 @@ void DataStoreServiceClient::DropTableInternal( else { drop_table_closure->PrepareRequest(false); - auto channel = GetDataStoreServiceChannelByShardId(shard_id); - if (!channel) - { - brpc::ClosureGuard guard(drop_table_closure); - ::EloqDS::remote::CommonResult &result = - drop_table_closure->Result(); - result.set_error_code( - ::EloqDS::remote::DataStoreError::NETWORK_ERROR); - return; - } + uint32_t node_index = GetOwnerNodeIndexOfShard(shard_id); + drop_table_closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + EloqDS::remote::DataStoreRpcService_Stub stub(channel); brpc::Controller &cntl = *drop_table_closure->Controller(); cntl.set_timeout_ms(5000); auto *req = drop_table_closure->DropTableRequest(); @@ -3272,19 +3434,12 @@ void DataStoreServiceClient::ScanNextInternal( else { scan_next_closure->PrepareRequest(false); - auto channel = GetDataStoreServiceChannelByPartitionId( - scan_next_closure->PartitionId()); - if (!channel) - { - brpc::ClosureGuard guard(scan_next_closure); - ::EloqDS::remote::CommonResult &result = - scan_next_closure->Result(); - result.set_error_code( - ::EloqDS::remote::DataStoreError::NETWORK_ERROR); - return; - } + uint32_t node_index = GetOwnerNodeIndexOfShard( + GetShardIdByPartitionId(scan_next_closure->PartitionId())); + scan_next_closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + EloqDS::remote::DataStoreRpcService_Stub stub(channel); brpc::Controller &cntl = *scan_next_closure->Controller(); cntl.set_timeout_ms(5000); auto *req = scan_next_closure->ScanNextRequest(); @@ -3331,19 +3486,12 @@ void DataStoreServiceClient::ScanCloseInternal( else { scan_next_closure->PrepareRequest(false); - auto channel = GetDataStoreServiceChannelByPartitionId( - scan_next_closure->PartitionId()); - if (!channel) - { - brpc::ClosureGuard guard(scan_next_closure); - ::EloqDS::remote::CommonResult &result = - scan_next_closure->Result(); - result.set_error_code( - ::EloqDS::remote::DataStoreError::NETWORK_ERROR); - return; - } + uint32_t node_index = GetOwnerNodeIndexOfShard( + GetShardIdByPartitionId(scan_next_closure->PartitionId())); + scan_next_closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + EloqDS::remote::DataStoreRpcService_Stub stub(channel); brpc::Controller &cntl = *scan_next_closure->Controller(); cntl.set_timeout_ms(5000); auto *req = scan_next_closure->ScanNextRequest(); @@ -3597,47 +3745,6 @@ bool DataStoreServiceClient::DeleteTableStatistics( return true; } -std::shared_ptr -DataStoreServiceClient::GetDataStoreServiceChannelByPartitionId( - uint32_t partition_id) -{ - return cluster_manager_.GetDataStoreServiceChannelByPartitionId( - partition_id); -} - -std::shared_ptr -DataStoreServiceClient::UpdateDataStoreServiceChannelByPartitionId( - uint32_t partition_id) -{ - return cluster_manager_.UpdateDataStoreServiceChannelByPartitionId( - partition_id); -} - -std::shared_ptr -DataStoreServiceClient::GetDataStoreServiceChannel(const DSSNode &node) -{ - return cluster_manager_.GetDataStoreServiceChannel(node); -} - -std::shared_ptr -DataStoreServiceClient::GetDataStoreServiceChannelByShardId(uint32_t shard_id) -{ - return cluster_manager_.GetDataStoreServiceChannelByShardId(shard_id); -} - -std::shared_ptr -DataStoreServiceClient::UpdateDataStoreServiceChannelByShardId( - uint32_t shard_id) -{ - return cluster_manager_.UpdateDataStoreServiceChannelByShardId(shard_id); -} - -std::shared_ptr -DataStoreServiceClient::UpdateDataStoreServiceChannel(const DSSNode &node) -{ - return cluster_manager_.UpdateDataStoreServiceChannel(node); -} - void DataStoreServiceClient::BatchWriteRecords( std::string_view kv_table_name, int32_t partition_id, @@ -3681,7 +3788,7 @@ void DataStoreServiceClient::BatchWriteRecordsInternal( if (IsLocalShard(req_shard_id)) { - closure->is_local_request_ = true; + closure->PrepareRequest(true); data_store_service_->BatchWriteRecords(closure->kv_table_name_, closure->partition_id_, closure->key_parts_, @@ -3697,26 +3804,14 @@ void DataStoreServiceClient::BatchWriteRecordsInternal( } else { - closure->is_local_request_ = false; - - auto channel = - cluster_manager_.GetDataStoreServiceChannelByShardId(req_shard_id); - if (!channel) - { - // TODO(lzx): retry.. - assert(false); - closure->result_.set_error_code( - remote::DataStoreError::NETWORK_ERROR); - closure->Run(); - return; - } - // prepare request - closure->PrepareRemoteRequest(); - // timeout is set in the PrepareRemoteRequest + closure->PrepareRequest(false); + uint32_t node_index = GetOwnerNodeIndexOfShard(req_shard_id); + closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); // send request - remote::DataStoreRpcService_Stub stub(channel.get()); + remote::DataStoreRpcService_Stub stub(channel); stub.BatchWriteRecords(closure->Controller(), closure->RemoteRequest(), closure->RemoteResponse(), diff --git a/data_store_service_client.h b/data_store_service_client.h index 1df3da4..a244e92 100644 --- a/data_store_service_client.h +++ b/data_store_service_client.h @@ -55,6 +55,8 @@ class ScanNextClosure; class CreateSnapshotForBackupClosure; class SinglePartitionScanner; +class DssClusterConfig; + typedef void (*DataStoreCallback)(void *data, ::google::protobuf::Closure *closure, DataStoreServiceClient &client, @@ -69,16 +71,27 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler txservice::CatalogFactory *catalog_factory[3], const DataStoreServiceClusterManager &cluster_manager, DataStoreService *data_store_service = nullptr) - : ds_serv_shutdown_indicator_(false), - catalog_factory_array_{catalog_factory[0], + : catalog_factory_array_{catalog_factory[0], catalog_factory[1], catalog_factory[2], &range_catalog_factory_, &hash_catalog_factory_}, - cluster_manager_(cluster_manager), - data_store_service_(data_store_service), - flying_remote_fetch_count_(0) + data_store_service_(data_store_service) { + // Init dss cluster config. + dss_topology_version_ = cluster_manager.GetTopologyVersion(); + auto all_shards = cluster_manager.GetAllShards(); + assert(all_shards.size() == 1); + for (auto &[shard_id, shard] : all_shards) + { + uint32_t node_idx = FindFreeNodeIndex(); + auto &node_ref = dss_nodes_[node_idx]; + node_ref.Reset(shard.nodes_[0].host_name_, + shard.nodes_[0].port_, + shard.version_); + dss_shards_[shard_id].store(shard_id); + } + if (data_store_service_ != nullptr) { data_store_service_->AddListenerForUpdateConfig( @@ -359,24 +372,6 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler void OnShutdown() override; - void HandleShardingError(const ::EloqDS::remote::CommonResult &result) - { - cluster_manager_.HandleShardingError(result); - } - - std::shared_ptr GetDataStoreServiceChannelByPartitionId( - uint32_t partition_id); - std::shared_ptr UpdateDataStoreServiceChannelByPartitionId( - uint32_t partition_id); - std::shared_ptr GetDataStoreServiceChannelByShardId( - uint32_t shard_id); - std::shared_ptr UpdateDataStoreServiceChannelByShardId( - uint32_t shard_id); - std::shared_ptr GetDataStoreServiceChannel( - const DSSNode &node); - std::shared_ptr UpdateDataStoreServiceChannel( - const DSSNode &node); - /** * Serialize a record with is_deleted flag and record string. * @param is_deleted @@ -616,6 +611,12 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler #endif } + const txservice::CatalogFactory *GetCatalogFactory( + txservice::TableEngine table_engine) + { + return catalog_factory_array_.at(static_cast(table_engine) - 1); + } + /** * @brief Check if the shard_id is local to the current node. * @param shard_id @@ -623,17 +624,6 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler */ bool IsLocalShard(uint32_t shard_id); - uint32_t GetShardIdByPartitionId(int32_t partition_id) - { - return cluster_manager_.GetShardIdByPartitionId(partition_id); - } - - const txservice::CatalogFactory *GetCatalogFactory( - txservice::TableEngine table_engine) - { - return catalog_factory_array_.at(static_cast(table_engine) - 1); - } - /** * @brief Check if the partition_id is local to the current node. * @param partition_id @@ -641,9 +631,18 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler */ bool IsLocalPartition(int32_t partition_id); - bthread::Mutex ds_service_mutex_; - bthread::ConditionVariable ds_service_cv_; - std::atomic ds_serv_shutdown_indicator_; + uint32_t GetShardIdByPartitionId(int32_t partition_id) const; + uint32_t AllDataShardCount() const; + uint32_t GetOwnerNodeIndexOfShard(uint32_t shard_id) const; + bool UpdateOwnerNodeIndexOfShard(uint32_t shard_id, + uint32_t old_node_index, + uint32_t &new_node_index); + uint32_t FindFreeNodeIndex(); + void HandleShardingError(const ::EloqDS::remote::CommonResult &result); + bool UpgradeShardVersion(uint32_t shard_id, + uint64_t shard_version, + const std::string &host_name, + uint16_t port); txservice::EloqHashCatalogFactory hash_catalog_factory_{}; txservice::EloqRangeCatalogFactory range_catalog_factory_{}; @@ -651,15 +650,76 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler // EngineServer TxService and DataStoreHandler std::array catalog_factory_array_; - // remote data store service configuration - DataStoreServiceClusterManager cluster_manager_; - + // bthread::Mutex ds_service_mutex_; + // bthread::ConditionVariable ds_service_cv_; + // std::atomic ds_serv_shutdown_indicator_; // point to the data store service if it is colocated DataStoreService *data_store_service_; - std::atomic flying_remote_fetch_count_{0}; - // Work queue for fetch records from primary node - std::deque remote_fetch_cc_queue_; + struct DssNode + { + DssNode() = default; + ~DssNode() = default; + DssNode(const DssNode &rhs) + : host_name_(rhs.host_name_), + port_(rhs.port_), + shard_verion_(rhs.shard_verion_) + { + } + DssNode &operator=(const DssNode &) = delete; + + void Reset(const std::string hostname, + uint16_t port, + uint64_t shard_version) + { + assert(expired_ts_.load(std::memory_order_acquire) == 0); + host_name_ = hostname; + port_ = port; + shard_verion_ = shard_version; + channel_.Init(host_name_.c_str(), port_, nullptr); + } + + const std::string &HostName() const + { + return host_name_; + } + uint16_t Port() const + { + return port_; + } + uint64_t ShardVersion() const + { + return shard_verion_; + } + brpc::Channel *Channel() + { + assert(!host_name_.empty() && port_ != 0); + return &channel_; + } + + // expired_ts_ is the timestamp when the node is expired. + // If expired_ts_ is 0, the node is not expired. + // If expired_ts_ is not 0, the node is expired and the value is the + // timestamp when the node is expired. + std::atomic expired_ts_{1U}; + + private: + std::string host_name_; + uint16_t port_; + brpc::Channel channel_; + uint64_t shard_verion_; + }; + // Cached leader nodes info of data shard. + std::array dss_nodes_; + const uint64_t NodeExpiredTime = 10 * 1000 * 1000; // 10s + // Now only support one shard. dss_shards_ caches the index in dss_nodes_ of + // shard owner. + std::array, 1> dss_shards_; + std::atomic dss_topology_version_{0}; + + // std::atomic flying_remote_fetch_count_{0}; + // // Work queue for fetch records from primary node + // std::deque remote_fetch_cc_queue_; // table names and their kv table names std::unordered_map @@ -674,9 +734,9 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler friend class ScanNextClosure; friend class CreateSnapshotForBackupClosure; friend void PartitionBatchCallback(void *data, - ::google::protobuf::Closure *closure, - DataStoreServiceClient &client, - const remote::CommonResult &result); + ::google::protobuf::Closure *closure, + DataStoreServiceClient &client, + const remote::CommonResult &result); friend class SinglePartitionScanner; friend void FetchAllDatabaseCallback(void *data, ::google::protobuf::Closure *closure, diff --git a/data_store_service_client_closure.h b/data_store_service_client_closure.h index 7878c31..d7ee3b2 100644 --- a/data_store_service_client_closure.h +++ b/data_store_service_client_closure.h @@ -611,6 +611,7 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable ds_service_client_ = client; callback_data_ = callback_data; callback_ = callback; + remote_node_index_ = UINT32_MAX; } void Clear() override @@ -638,6 +639,7 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; result_.Clear(); + remote_node_index_ = UINT32_MAX; } else { @@ -675,10 +677,12 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable cntl_.ErrorCode() != EAGAIN && cntl_.ErrorCode() != brpc::ERPCTIMEDOUT) { - auto channel = - ds_service_client_ - ->UpdateDataStoreServiceChannelByPartitionId( - partition_id_); + uint32_t shard_id = + ds_service_client_->GetShardIdByPartitionId( + partition_id_); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + shard_id, remote_node_index_, new_node_index); // Retry if (retry_count_ < ds_service_client_->retry_limit_) @@ -854,6 +858,11 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable return is_local_request_; } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: bool is_local_request_{false}; bool rpc_request_prepare_{false}; @@ -864,7 +873,7 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable brpc::Controller cntl_; EloqDS::remote::ReadRequest request_; EloqDS::remote::ReadResponse response_; - // keep channel alive + uint32_t remote_node_index_{UINT32_MAX}; // serve local call std::string_view table_name_; @@ -899,6 +908,7 @@ class FlushDataClosure : public ::google::protobuf::Closure, public Poolable cntl_.Reset(); request_.Clear(); response_.Clear(); + remote_node_index_ = UINT32_MAX; cntl_.Reset(); ds_service_client_ = nullptr; retry_count_ = 0; @@ -919,6 +929,7 @@ class FlushDataClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; rpc_request_prepare_ = false; + remote_node_index_ = UINT32_MAX; retry_count_ = 0; ds_service_client_ = &store_hd; kv_table_names_ = kv_table_names; @@ -933,6 +944,7 @@ class FlushDataClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; result_.Clear(); + remote_node_index_ = UINT32_MAX; } else { @@ -973,8 +985,9 @@ class FlushDataClosure : public ::google::protobuf::Closure, public Poolable cntl_.ErrorCode() != EAGAIN && cntl_.ErrorCode() != brpc::ERPCTIMEDOUT) { - ds_service_client_->UpdateDataStoreServiceChannelByShardId( - shard_ids_.back()); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + shard_ids_.back(), remote_node_index_, new_node_index); // Retry if (retry_count_ < ds_service_client_->retry_limit_) @@ -1080,10 +1093,16 @@ class FlushDataClosure : public ::google::protobuf::Closure, public Poolable return shard_ids_; } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: brpc::Controller cntl_; ::EloqDS::remote::FlushDataRequest request_; ::EloqDS::remote::FlushDataResponse response_; + uint32_t remote_node_index_{UINT32_MAX}; DataStoreServiceClient *ds_service_client_; uint16_t retry_count_{0}; @@ -1112,6 +1131,7 @@ class DeleteRangeClosure : public ::google::protobuf::Closure, public Poolable request_.Clear(); response_.Clear(); cntl_.Reset(); + remote_node_index_ = UINT32_MAX; ds_service_client_ = nullptr; retry_count_ = 0; is_local_request_ = false; @@ -1133,6 +1153,7 @@ class DeleteRangeClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; rpc_request_prepare_ = false; + remote_node_index_ = UINT32_MAX; retry_count_ = 0; ds_service_client_ = &store_hd; table_name_ = table_name; @@ -1150,6 +1171,7 @@ class DeleteRangeClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; result_.Clear(); + remote_node_index_ = UINT32_MAX; } else { @@ -1189,9 +1211,12 @@ class DeleteRangeClosure : public ::google::protobuf::Closure, public Poolable cntl_.ErrorCode() != EAGAIN && cntl_.ErrorCode() != brpc::ERPCTIMEDOUT) { - ds_service_client_ - ->UpdateDataStoreServiceChannelByPartitionId( + uint32_t shard_id = + ds_service_client_->GetShardIdByPartitionId( partition_id_); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + shard_id, remote_node_index_, new_node_index); // Retry if (retry_count_ < ds_service_client_->retry_limit_) @@ -1296,10 +1321,17 @@ class DeleteRangeClosure : public ::google::protobuf::Closure, public Poolable } } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: brpc::Controller cntl_; ::EloqDS::remote::DeleteRangeRequest request_; ::EloqDS::remote::DeleteRangeResponse response_; + // remote node index in dss_nodes_ + uint32_t remote_node_index_{UINT32_MAX}; DataStoreServiceClient *ds_service_client_; uint16_t retry_count_{0}; @@ -1330,6 +1362,7 @@ class DropTableClosure : public ::google::protobuf::Closure, public Poolable cntl_.Reset(); request_.Clear(); response_.Clear(); + remote_node_index_ = UINT32_MAX; cntl_.Reset(); ds_service_client_ = nullptr; retry_count_ = 0; @@ -1350,6 +1383,7 @@ class DropTableClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; rpc_request_prepare_ = false; + remote_node_index_ = UINT32_MAX; retry_count_ = 0; ds_service_client_ = &store_hd; table_name_ = table_name; @@ -1364,6 +1398,7 @@ class DropTableClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; result_.Clear(); + remote_node_index_ = UINT32_MAX; } else { @@ -1399,8 +1434,9 @@ class DropTableClosure : public ::google::protobuf::Closure, public Poolable cntl_.ErrorCode() != EAGAIN && cntl_.ErrorCode() != brpc::ERPCTIMEDOUT) { - ds_service_client_->UpdateDataStoreServiceChannelByShardId( - shard_ids_.back()); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + shard_ids_.back(), remote_node_index_, new_node_index); // Retry if (retry_count_ < ds_service_client_->retry_limit_) @@ -1506,10 +1542,16 @@ class DropTableClosure : public ::google::protobuf::Closure, public Poolable return shard_ids_; } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: brpc::Controller cntl_; ::EloqDS::remote::DropTableRequest request_; ::EloqDS::remote::DropTableResponse response_; + uint32_t remote_node_index_{UINT32_MAX}; DataStoreServiceClient *ds_service_client_; uint16_t retry_count_{0}; @@ -1559,8 +1601,10 @@ class BatchWriteRecordsClosure : public ::google::protobuf::Closure, request_.Clear(); response_.Clear(); cntl_.Reset(); + remote_node_index_ = UINT32_MAX; parts_cnt_per_key_ = 1; parts_cnt_per_record_ = 1; + result_.Clear(); } // for writing single record @@ -1649,8 +1693,10 @@ class BatchWriteRecordsClosure : public ::google::protobuf::Closure, uint32_t req_shard_id = ds_service_client_->GetShardIdByPartitionId( partition_id_); - ds_service_client_->UpdateDataStoreServiceChannelByShardId( - req_shard_id); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + req_shard_id, remote_node_index_, new_node_index); + need_retry = true; } else @@ -1690,8 +1736,16 @@ class BatchWriteRecordsClosure : public ::google::protobuf::Closure, (*callback_)(callback_data_, this, *ds_service_client_, result_); } - void PrepareRemoteRequest() + void PrepareRequest(bool is_local_request) { + if (is_local_request) + { + is_local_request_ = true; + result_.Clear(); + remote_node_index_ = UINT32_MAX; + return; + } + // clear cntl_.Reset(); cntl_.set_timeout_ms(5000); @@ -1770,10 +1824,16 @@ class BatchWriteRecordsClosure : public ::google::protobuf::Closure, return parts_cnt_per_record_; } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: brpc::Controller cntl_; EloqDS::remote::BatchWriteRecordsRequest request_; EloqDS::remote::BatchWriteRecordsResponse response_; + uint32_t remote_node_index_{UINT32_MAX}; DataStoreServiceClient *ds_service_client_{nullptr}; uint16_t retry_count_{0}; @@ -1814,6 +1874,7 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable cntl_.Reset(); request_.Clear(); response_.Clear(); + remote_node_index_ = UINT32_MAX; cntl_.Reset(); ds_service_client_ = nullptr; retry_count_ = 0; @@ -1853,6 +1914,7 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; rpc_request_prepare_ = false; + remote_node_index_ = UINT32_MAX; retry_count_ = 0; ds_service_client_ = &store_hd; table_name_ = table_name; @@ -1876,6 +1938,7 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; result_.Clear(); + remote_node_index_ = UINT32_MAX; } else { @@ -1935,9 +1998,12 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable cntl_.ErrorCode() != EAGAIN && cntl_.ErrorCode() != brpc::ERPCTIMEDOUT) { - ds_service_client_ - ->UpdateDataStoreServiceChannelByPartitionId( + uint32_t shard_id = + ds_service_client_->GetShardIdByPartitionId( partition_id_); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + shard_id, remote_node_index_, new_node_index); // Retry if (retry_count_ < ds_service_client_->retry_limit_) @@ -2130,10 +2196,16 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable return search_conditions_; } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: brpc::Controller cntl_; ::EloqDS::remote::ScanRequest request_; ::EloqDS::remote::ScanResponse response_; + uint32_t remote_node_index_{UINT32_MAX}; DataStoreServiceClient *ds_service_client_; uint16_t retry_count_{0}; @@ -2175,7 +2247,7 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, cntl_.Reset(); request_.Clear(); response_.Clear(); - channel_ = nullptr; + remote_node_index_ = UINT32_MAX; ds_service_client_ = nullptr; retry_count_ = 0; is_local_request_ = false; @@ -2202,6 +2274,7 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, { is_local_request_ = true; rpc_request_prepare_ = false; + remote_node_index_ = UINT32_MAX; retry_count_ = 0; ds_service_client_ = &store_hd; shard_ids_ = std::move(shard_ids); @@ -2218,6 +2291,7 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, { is_local_request_ = true; result_.Clear(); + remote_node_index_ = UINT32_MAX; } else { @@ -2262,15 +2336,14 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, cntl_.ErrorCode() != brpc::ERPCTIMEDOUT) { uint32_t shard_id = shard_ids_.back(); - channel_ = - ds_service_client_ - ->UpdateDataStoreServiceChannelByShardId(shard_id); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + shard_id, remote_node_index_, new_node_index); // Retry if (retry_count_ < ds_service_client_->retry_limit_) { self_guard.Release(); - channel_ = nullptr; retry_count_++; ds_service_client_->CreateSnapshotForBackupInternal( this); @@ -2300,7 +2373,6 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, if (retry_count_ < ds_service_client_->retry_limit_) { self_guard.Release(); - channel_ = nullptr; response_.Clear(); cntl_.Reset(); retry_count_++; @@ -2317,16 +2389,6 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, return &cntl_; } - brpc::Channel *GetChannel() - { - return channel_.get(); - } - - void SetChannel(std::shared_ptr channel) - { - channel_ = channel; - } - const std::string_view GetBackupName() { return backup_name_; @@ -2391,11 +2453,16 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, return &response_; } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: brpc::Controller cntl_; ::EloqDS::remote::CreateSnapshotForBackupRequest request_; ::EloqDS::remote::CreateSnapshotForBackupResponse response_; - std::shared_ptr channel_; + uint32_t remote_node_index_{UINT32_MAX}; DataStoreServiceClient *ds_service_client_; uint16_t retry_count_{0}; diff --git a/eloq_data_store_service/data_store_service.cpp b/eloq_data_store_service/data_store_service.cpp index bd836f4..43b3e04 100644 --- a/eloq_data_store_service/data_store_service.cpp +++ b/eloq_data_store_service/data_store_service.cpp @@ -242,22 +242,38 @@ bool DataStoreService::StartService(bool create_db_if_missing) auto dss_shards = cluster_manager_.GetShardsForThisNode(); assert(dss_shards.size() <= 1); + assert(shard_status_.load(std::memory_order_acquire) == + DSShardStatus::Closed); if (!dss_shards.empty()) { shard_id_ = dss_shards.at(0); - shard_status_ = cluster_manager_.FetchDSShardStatus(shard_id_); - if (shard_status_ == DSShardStatus::ReadOnly || - shard_status_ == DSShardStatus::ReadWrite) + auto open_mode = cluster_manager_.FetchDSShardStatus(shard_id_); + if (open_mode == DSShardStatus::ReadOnly || + open_mode == DSShardStatus::ReadWrite) { - data_store_ = data_store_factory_->CreateDataStore( - create_db_if_missing, shard_id_, this, true); - if (data_store_ == nullptr) + auto expect_status = DSShardStatus::Closed; + if (shard_status_.compare_exchange_strong(expect_status, + DSShardStatus::Starting)) { - LOG(ERROR) << "Failed to create data store on starting " - "DataStoreService."; - return false; + data_store_ = data_store_factory_->CreateDataStore( + create_db_if_missing, shard_id_, this, true); + if (data_store_ == nullptr) + { + LOG(ERROR) << "Failed to create data store on starting " + "DataStoreService."; + return false; + } + + if (open_mode == DSShardStatus::ReadOnly) + { + data_store_->SwitchToReadOnly(); + } + shard_status_.store(open_mode, std::memory_order_release); } } + + DLOG(INFO) << "Created data store shard id:" << shard_id_ + << ", shard_status:" << shard_status_; } server_ = std::make_unique(); @@ -287,6 +303,28 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id, DSShardStatus open_mode, bool create_db_if_missing) { + if (open_mode == DSShardStatus::Closed) + { + return true; + } + assert(open_mode == DSShardStatus::ReadOnly); + + DSShardStatus expect_status = DSShardStatus::Closed; + if (!shard_status_.compare_exchange_strong(expect_status, + DSShardStatus::Starting)) + { + if (expect_status == open_mode) + { + return true; + } + while (expect_status == DSShardStatus::Starting) + { + bthread_usleep(10000); + expect_status = shard_status_.load(std::memory_order_acquire); + } + return expect_status == open_mode; + } + assert(data_store_factory_ != nullptr); if (data_store_ == nullptr) { @@ -317,11 +355,13 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id, } } + data_store_->SwitchToReadOnly(); cluster_manager_.SwitchShardToReadOnly(data_shard_id, DSShardStatus::Closed); - assert(shard_status_.load(std::memory_order_acquire) == - DSShardStatus::Closed); - shard_status_.store(open_mode, std::memory_order_release); + + expect_status = DSShardStatus::Starting; + shard_status_.compare_exchange_strong( + expect_status, open_mode, std::memory_order_release); return true; } @@ -341,7 +381,9 @@ void DataStoreService::Read(::google::protobuf::RpcController *controller, return; } - if (shard_status_.load(std::memory_order_acquire) == DSShardStatus::Closed) + auto shard_status = shard_status_.load(std::memory_order_acquire); + if (shard_status != DSShardStatus::ReadOnly && + shard_status != DSShardStatus::ReadWrite) { brpc::ClosureGuard done_guard(done); auto *result = response->mutable_result(); @@ -376,7 +418,9 @@ void DataStoreService::Read(const std::string_view table_name, return; } - if (shard_status_.load(std::memory_order_acquire) == DSShardStatus::Closed) + auto shard_status = shard_status_.load(std::memory_order_acquire); + if (shard_status != DSShardStatus::ReadOnly && + shard_status != DSShardStatus::ReadWrite) { brpc::ClosureGuard done_guard(done); record->clear(); @@ -834,20 +878,12 @@ void DataStoreService::ScanNext( } auto shard_status = shard_status_.load(std::memory_order_acquire); - if (shard_status != DSShardStatus::ReadWrite) + if (shard_status != DSShardStatus::ReadWrite && + shard_status != DSShardStatus::ReadOnly) { brpc::ClosureGuard done_guard(done); - if (shard_status == DSShardStatus::Closed) - { - PrepareShardingError(partition_id, result); - } - else - { - assert(shard_status == DSShardStatus::ReadOnly); - result->set_error_code( - ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB); - result->set_error_msg("Write to read-only DB."); - } + result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + result->set_error_msg("KV store not opened yet."); return; } @@ -889,21 +925,13 @@ void DataStoreService::ScanNext(::google::protobuf::RpcController *controller, } auto shard_status = shard_status_.load(std::memory_order_acquire); - if (shard_status != DSShardStatus::ReadWrite) + if (shard_status != DSShardStatus::ReadWrite && + shard_status != DSShardStatus::ReadOnly) { brpc::ClosureGuard done_guard(done); - if (shard_status == DSShardStatus::Closed) - { - PrepareShardingError(partition_id, response->mutable_result()); - } - else - { - assert(shard_status == DSShardStatus::ReadOnly); - auto *result = response->mutable_result(); - result->set_error_code( - ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB); - result->set_error_msg("Write to read-only DB."); - } + auto *result = response->mutable_result(); + result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + result->set_error_msg("KV store not opened yet."); return; } @@ -932,21 +960,14 @@ void DataStoreService::ScanClose(::google::protobuf::RpcController *controller, } auto shard_status = shard_status_.load(std::memory_order_acquire); - if (shard_status != DSShardStatus::ReadWrite) + if (shard_status != DSShardStatus::ReadWrite && + shard_status != DSShardStatus::ReadOnly) { + assert(false); brpc::ClosureGuard done_guard(done); - if (shard_status == DSShardStatus::Closed) - { - PrepareShardingError(partition_id, response->mutable_result()); - } - else - { - assert(shard_status == DSShardStatus::ReadOnly); - auto *result = response->mutable_result(); - result->set_error_code( - ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB); - result->set_error_msg("Write to read-only DB."); - } + auto *result = response->mutable_result(); + result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + result->set_error_msg("KV store not opened yet."); return; } @@ -974,20 +995,13 @@ void DataStoreService::ScanClose(const std::string_view table_name, } auto shard_status = shard_status_.load(std::memory_order_acquire); - if (shard_status != DSShardStatus::ReadWrite) + if (shard_status != DSShardStatus::ReadWrite && + shard_status != DSShardStatus::ReadOnly) { + assert(false); brpc::ClosureGuard done_guard(done); - if (shard_status == DSShardStatus::Closed) - { - PrepareShardingError(partition_id, result); - } - else - { - assert(shard_status == DSShardStatus::ReadOnly); - result->set_error_code( - ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB); - result->set_error_msg("Write to read-only DB."); - } + result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + result->set_error_msg("KV store not opened yet."); return; } @@ -2206,6 +2220,8 @@ bool DataStoreService::SwitchReadOnlyToReadWrite(uint32_t shard_id) { if (!IsOwnerOfShard(shard_id)) { + DLOG(INFO) << "SwitchReadOnlyToReadWrite failed, shard " << shard_id + << " is not owner"; return false; } diff --git a/eloq_data_store_service/data_store_service.h b/eloq_data_store_service/data_store_service.h index b64a9f7..e718aa3 100644 --- a/eloq_data_store_service/data_store_service.h +++ b/eloq_data_store_service/data_store_service.h @@ -600,6 +600,13 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService ongoing_write_requests_.fetch_sub(1, std::memory_order_release); } + bool IsOwnerOfShard(uint32_t shard_id) const + { + return shard_status_.load(std::memory_order_acquire) != + DSShardStatus::Closed && + shard_id_ == shard_id; + } + private: uint32_t GetShardIdByPartitionId(int32_t partition_id) { @@ -608,12 +615,6 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService // return cluster_manager_.GetShardIdByPartitionId(partition_id); } - bool IsOwnerOfShard(uint32_t shard_id) - { - return shard_status_.load(std::memory_order_acquire) != - DSShardStatus::Closed && - shard_id_ == shard_id; - } DataStore *GetDataStore(uint32_t shard_id) { diff --git a/eloq_data_store_service/data_store_service_config.cpp b/eloq_data_store_service/data_store_service_config.cpp index a0a4e97..07c9164 100644 --- a/eloq_data_store_service/data_store_service_config.cpp +++ b/eloq_data_store_service/data_store_service_config.cpp @@ -865,7 +865,7 @@ bool DataStoreServiceClusterManager::SwitchShardToReadWrite( topology_.UpdateDSShardStatus(shard_id, DSShardStatus::ReadWrite); DLOG(INFO) << "SwitchToReadWrite, shard " << shard_id - << " status: " << shard_status; + << " status: " << static_cast(shard_status); return true; } @@ -919,6 +919,7 @@ void DataStoreServiceClusterManager::PrepareShardingError( { auto *new_shard = new_shards->add_shards(); new_shard->set_shard_id(shard_id); + new_shard->set_shard_version(shard.version_); for (const auto &node : shard.nodes_) { auto *new_node = new_shard->add_member_nodes(); @@ -932,12 +933,14 @@ void DataStoreServiceClusterManager::PrepareShardingError( // Same shard but primary changed - just send new primary key_sharding_changed_message->set_type( ::EloqDS::remote::KeyShardingErrorType::PrimaryNodeChanged); + auto shard_version = topology_.FetchDSShardVersion(shard_id); DSSNode primary_node = topology_.GetPrimaryNode(shard_id); auto *new_primary_node = key_sharding_changed_message->mutable_new_primary_node(); new_primary_node->set_host_name(primary_node.host_name_); new_primary_node->set_port(primary_node.port_); key_sharding_changed_message->set_shard_id(shard_id); + key_sharding_changed_message->set_shard_version(shard_version); } } diff --git a/eloq_data_store_service/data_store_service_util.h b/eloq_data_store_service/data_store_service_util.h index f157371..cd1d6d7 100644 --- a/eloq_data_store_service/data_store_service_util.h +++ b/eloq_data_store_service/data_store_service_util.h @@ -38,6 +38,9 @@ enum DSShardStatus : uint8_t ReadWrite = 2, // Node is closed Closed = 3, + // Node is opening data store. + Starting = 4 + }; // TODO(liunyl): define error code diff --git a/eloq_data_store_service/ds_request.proto b/eloq_data_store_service/ds_request.proto index 9123b84..89ce577 100644 --- a/eloq_data_store_service/ds_request.proto +++ b/eloq_data_store_service/ds_request.proto @@ -79,7 +79,8 @@ message KeyShardingChanged { KeyShardingErrorType type = 1; DSSNodeBuf new_primary_node = 2; uint32 shard_id = 3; - DSSClusterConfig new_cluster_config = 4; + uint64 shard_version = 4; + DSSClusterConfig new_cluster_config = 5; } message CommonResult {