Add OnLeaderStop in DataStoreHandler#181
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdded a new OnLeaderStop(term) hook and expanded OnStartFollowing to accept (node_id, term, standby_term, resubscribe). Updated call sites and logging across leader/standby flows and added retry loops for shard/snapshot synchronization. Changes
Sequence Diagram(s)sequenceDiagram
participant Leader as Leader Node
participant Store as Store Handler
participant Standby as Standby Node
rect rgb(240,248,255)
Note over Leader,Store: Leader stop (NEW)
Leader->>Store: OnLeaderStop(term)
alt stop succeeds
Store-->>Leader: true
else stop fails
Store-->>Leader: false (retry by caller)
end
end
rect rgb(245,245,220)
Note over Leader,Store: Start following (UPDATED)
Leader->>Store: OnStartFollowing(node_id, term, standby_term, resubscribe)
Store-->>Leader: done
end
rect rgb(255,250,240)
Note over Standby,Store: Snapshot sync with retries (NEW)
Standby->>Store: OnSnapshotSynced()
alt RPC fails
Store-->>Standby: error (log & retry up to 5x with 1s backoff)
Standby->>Store: OnSnapshotSynced() (retry)
else RPC succeeds
Store-->>Standby: success
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (1)
🧰 Additional context used🧠 Learnings (1)📚 Learning: 2025-10-09T03:56:58.811ZApplied to files:
🧬 Code graph analysis (1)include/store/data_store_handler.h (3)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
ec89efb to
4263417
Compare
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
This PR is being reviewed by Cursor Bugbot
Details
You are on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle.
To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.
e4913d6 to
d24514a
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/fault/cc_node.cpp (1)
487-503: Don’t clear leader terms before the store shutdown finishes.We still set
LeaderTerm/CandidateTermto-1before callingstore_hd_->OnLeaderStop. When the store asks for a retry (returnsfalse), we exit with both terms cleared. A concurrent new leader can install its term in between retries, but the next attempt will immediately overwrite it with-1again—this is the same race flagged earlier. Guard the term transitions around the store callback so we only clear them once the store finishes and the term hasn’t advanced.- { - std::lock_guard<std::mutex> lk(recovery_mux_); - Sharder::Instance().SetLeaderTerm(ng_id_, -1); - Sharder::Instance().SetCandidateTerm(ng_id_, -1); - } - - if (!txservice_skip_kv) - { - if (!local_cc_shards_.store_hd_->OnLeaderStop(term)) - { - // Keep retrying - return false; - } - } + int64_t prev_leader_term; + int64_t prev_candidate_term; + { + std::lock_guard<std::mutex> lk(recovery_mux_); + prev_leader_term = Sharder::Instance().LeaderTerm(ng_id_); + prev_candidate_term = Sharder::Instance().CandidateLeaderTerm(ng_id_); + if (prev_leader_term != term && prev_candidate_term != term) + { + return true; + } + } + + if (!txservice_skip_kv) + { + if (!local_cc_shards_.store_hd_->OnLeaderStop(term)) + { + return false; + } + } + + { + std::lock_guard<std::mutex> lk(recovery_mux_); + if (Sharder::Instance().LeaderTerm(ng_id_) != prev_leader_term || + Sharder::Instance().CandidateLeaderTerm(ng_id_) != prev_candidate_term) + { + return true; + } + Sharder::Instance().SetLeaderTerm(ng_id_, -1); + Sharder::Instance().SetCandidateTerm(ng_id_, -1); + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
include/store/data_store_handler.h(3 hunks)src/fault/cc_node.cpp(7 hunks)src/store/snapshot_manager.cpp(2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
src/fault/cc_node.cpp
🧬 Code graph analysis (2)
src/fault/cc_node.cpp (1)
include/sharder.h (1)
node_id_(119-156)
include/store/data_store_handler.h (3)
src/fault/cc_node.cpp (4)
OnStartFollowing(691-715)OnStartFollowing(691-691)OnLeaderStop(471-520)OnLeaderStop(471-471)src/sharder.cpp (4)
OnStartFollowing(906-921)OnStartFollowing(906-909)OnLeaderStop(861-873)OnLeaderStop(861-861)src/remote/cc_node_service.cpp (4)
OnStartFollowing(125-145)OnStartFollowing(125-129)OnLeaderStop(113-123)OnLeaderStop(113-116)
| while (retry_times-- > 0) | ||
| { | ||
| brpc::Controller cntl; | ||
| cntl.set_timeout_ms(1); | ||
| remote::OnSnapshotSyncedRequest on_synced_req; | ||
| remote::OnSnapshotSyncedResponse on_sync_resp; | ||
| on_synced_req.set_snapshot_path(req.dest_path()); | ||
| on_synced_req.set_standby_node_term( | ||
| req.standby_node_term()); | ||
| stub.OnSnapshotSynced( | ||
| &cntl, &on_synced_req, &on_sync_resp, nullptr); | ||
| if (cntl.Failed()) | ||
| { | ||
| LOG(WARNING) << "OnSnapshotSynced to standby node #" | ||
| << req.standby_node_id() << " failed, " | ||
| << " error: " << cntl.ErrorText() | ||
| << " error code: " << cntl.ErrorCode(); | ||
| // sleep 1 second and retry | ||
| std::this_thread::sleep_for(std::chrono::seconds(1)); | ||
| continue; | ||
| } | ||
| else | ||
| { | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
Increase the retry RPC timeout from 1 ms.
cntl.set_timeout_ms(1) gives the RPC only a microsecond-scale budget, so every attempt will almost certainly time out even when the standby is healthy. That turns the new retry loop into guaranteed log spam and prevents “best-effort” notification from ever succeeding. Please restore a realistic per-attempt timeout (e.g., 1000 ms to match the backoff cadence).
- cntl.set_timeout_ms(1);
+ cntl.set_timeout_ms(1000);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| while (retry_times-- > 0) | |
| { | |
| brpc::Controller cntl; | |
| cntl.set_timeout_ms(1); | |
| remote::OnSnapshotSyncedRequest on_synced_req; | |
| remote::OnSnapshotSyncedResponse on_sync_resp; | |
| on_synced_req.set_snapshot_path(req.dest_path()); | |
| on_synced_req.set_standby_node_term( | |
| req.standby_node_term()); | |
| stub.OnSnapshotSynced( | |
| &cntl, &on_synced_req, &on_sync_resp, nullptr); | |
| if (cntl.Failed()) | |
| { | |
| LOG(WARNING) << "OnSnapshotSynced to standby node #" | |
| << req.standby_node_id() << " failed, " | |
| << " error: " << cntl.ErrorText() | |
| << " error code: " << cntl.ErrorCode(); | |
| // sleep 1 second and retry | |
| std::this_thread::sleep_for(std::chrono::seconds(1)); | |
| continue; | |
| } | |
| else | |
| { | |
| break; | |
| } | |
| } | |
| while (retry_times-- > 0) | |
| { | |
| brpc::Controller cntl; | |
| cntl.set_timeout_ms(1000); | |
| remote::OnSnapshotSyncedRequest on_synced_req; | |
| remote::OnSnapshotSyncedResponse on_sync_resp; | |
| on_synced_req.set_snapshot_path(req.dest_path()); | |
| on_synced_req.set_standby_node_term( | |
| req.standby_node_term()); | |
| stub.OnSnapshotSynced( | |
| &cntl, &on_synced_req, &on_sync_resp, nullptr); | |
| if (cntl.Failed()) | |
| { | |
| LOG(WARNING) << "OnSnapshotSynced to standby node #" | |
| << req.standby_node_id() << " failed, " | |
| << " error: " << cntl.ErrorText() | |
| << " error code: " << cntl.ErrorCode(); | |
| // sleep 1 second and retry | |
| std::this_thread::sleep_for(std::chrono::seconds(1)); | |
| continue; | |
| } | |
| else | |
| { | |
| break; | |
| } | |
| } |
🤖 Prompt for AI Agents
In src/store/snapshot_manager.cpp around lines 249 to 274, the RPC per-attempt
timeout is set to 1ms which will always time out; change cntl.set_timeout_ms(1)
to a realistic value (e.g., 1000) so each retry has a 1s budget to succeed (or
make it a named constant matching the retry/backoff cadence) and keep the
existing retry sleep logic.
d24514a to
4889d95
Compare
Here are some reminders before you submit the pull request
fixes eloqdb/tx_service#issue_id./mtr --suite=mono_main,mono_multi,mono_basicNote
Introduce datastore OnLeaderStop and parameterized OnStartFollowing, wire them into cc_node, and add retrying snapshot-sync notification plus improved diagnostics.
include/store/data_store_handler.h):virtual bool OnLeaderStop(int64_t term).OnStartFollowingtoOnStartFollowing(uint32_t node_id, int64_t term, int64_t standby_term, bool resubscribe).src/fault/cc_node.cpp):store_hd_->OnLeaderStart(...)unconditionally when KV enabled (remove shared-storage check).store_hd_->OnLeaderStop(term)on leader step-down with retry gating.store_hd_->OnStartFollowing(leader_node_id, primary_term, standby_term, resubscribe).src/store/snapshot_manager.cpp):OnSnapshotSynced, add retry loop with backoff and warnings.include/sharder.h.Written by Cursor Bugbot for commit e4913d6. This will update automatically on new commits. Configure here.
Summary by CodeRabbit