Skip to content

Update ng leader cache from multi-stage log response.#134

Merged
MrGuin merged 4 commits into
mainfrom
ddl_log_ng_leader
Sep 23, 2025
Merged

Update ng leader cache from multi-stage log response.#134
MrGuin merged 4 commits into
mainfrom
ddl_log_ng_leader

Conversation

@MrGuin

@MrGuin MrGuin commented Sep 16, 2025

Copy link
Copy Markdown
Collaborator

So that the coordinator can update the latest leader info when some participant fails over. Otherwise, the new participant leader could be missed by the coordinator and orphan lock would be left.

Related PR:
eloqdata/tx-log-protos#7
https://github.com/eloqdata/eloq_log_service/pull/18

Summary by CodeRabbit

  • New Features

    • Automatically refreshes leader information from successful log responses to improve routing after leadership changes.
  • Bug Fixes

    • Prevents stale leader data from replacing current leaders, reducing misrouted operations and improving write/forward stability.
  • Chores

    • Improved logging around leader update events.
    • Updated tx-log-protos submodule reference.

@MrGuin

MrGuin commented Sep 16, 2025

Copy link
Copy Markdown
Collaborator Author

Is it enough to just compare the ip stored in log service with the host name stored in ng_configs_? Would it happen that two nodes in the same group share the same hostname? @liunyl

@liunyl

liunyl commented Sep 17, 2025

Copy link
Copy Markdown
Contributor

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Sep 17, 2025

Copy link
Copy Markdown
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai

coderabbitai Bot commented Sep 17, 2025

Copy link
Copy Markdown

Walkthrough

Makes Sharder leader updates term-aware (UpdateLeader now accepts term), refines logging, preserves CAS-based leader assignment with memory-order semantics; adds a translation-unit helper in tx_operation.cpp to extract leader IP/port from log responses, map to node_id, and refresh leader cache; bumps tx-log-protos submodule pointer.

Changes

Cohort / File(s) Summary
Sharder leader term handling
src/sharder.cpp
Change UpdateLeader signature to UpdateLeader(uint32_t ng_id, uint32_t node_id, int64_t term); keep term-guarded CAS loop and memory_order semantics; adjust logging to reference ng_id, term, and cached term; store leader with memory_order_release on success.
TX operation leader refresh
src/tx_operation.cpp
Add static helper UpdateLeadersFromLogInfo(const Map<uint32_t, txlog::LeaderInfo>&) that maps leader IP/port → node_id using node configs and calls Sharder::Instance().UpdateLeader(ng_id, node_id, term); call this helper from WriteToLogOp::Forward after successful write_log_response.
Submodule pointer
tx-log-protos (submodule)
Bump submodule pointer from a5bde4c08877be826b6783c34ba56dd254dd34b4793d5c820bee869acd2a4da7da34df76bc7f8dd1 (no code changes).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client
  participant WriteOp as WriteToLogOp
  participant LogSvc as LogService
  participant Mapper as UpdateLeadersFromLogInfo
  participant Sharder

  Client->>WriteOp: Forward(request)
  WriteOp->>LogSvc: write_log(request)
  LogSvc-->>WriteOp: write_log_response(status, leader_info)
  alt success AND leader_info present
    WriteOp->>Mapper: UpdateLeadersFromLogInfo(leader_info)
    loop per ng_id
      Mapper->>Mapper: map (leader_ip, leader_port) -> node_id
      alt node_id found
        Mapper->>Sharder: UpdateLeader(ng_id, node_id, term)
        alt term <= cached_term
          Sharder-->>Mapper: skip (stale term)
        else
          Sharder-->>Mapper: CAS commit leader + term
        end
      else
        Mapper-->>WriteOp: log unknown mapping
      end
    end
  end
  WriteOp-->>Client: Forward result
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • liunyl

Poem

I thump my paw and count each term,
Leaders hop in sync, no stale worm.
Ports and names the logs recite,
I nudge the cache and set things right.
A little CAS, a tidy chore—🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Description Check ⚠️ Warning The PR description explains the intent and links related PRs but does not follow the repository's required template: it omits the checklist items (Add tests, Document changes, explicit "fixes eloqdb/tx_service#issue_id" reference), any RFC reference, and any mention of passing ./mtr tests or CI output, so it is incomplete for this repository's review requirements. Because the template's items are expected by reviewers to confirm test coverage, documentation, and issue linkage, the current description is insufficient. Please update the PR description to include the repository checklist and mark completed items, add or link the tests and documentation changes, include an explicit "fixes eloqdb/tx_service#82" reference and any RFC link, and report or link the result of running ./mtr --suite=mono_main,mono_multi,mono_basic (or CI output). If some items cannot be completed in this PR, state why and provide a follow-up issue or PR with an expected timeline.
✅ Passed checks (2 passed)
Check name Status Explanation
Title Check ✅ Passed The pull request title "Update ng leader cache from multi-stage log response." is a concise, single-sentence summary that accurately captures the primary change—refreshing the node-group leader cache using multi-stage log responses—and directly relates to the modifications in sharder.cpp and tx_operation.cpp. It is specific and free of extraneous detail, so a teammate scanning history can understand the main intent.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ddl_log_ng_leader

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e851f41 and d56e13a.

📒 Files selected for processing (1)
  • tx-log-protos (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • tx-log-protos

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (11)
include/proto/cc_request.proto (1)

44-45: Make term presence explicit and align sentinel semantics.

Proto3 scalars default to 0 when absent. If you need to distinguish “absent” from an actual term, declare the field optional and use has_term(). Also align with Sharder::UpdateLeader(term=-1 default). Two compatible options:

  • Preferred: make the field optional and treat “missing” as unknown.
  • Or: keep it non-optional but document that 0 means “unknown” and map 0→-1 at call sites.

Suggested proto change:

-message NotifyNewLeaderStartRequest {
+message NotifyNewLeaderStartRequest {
   uint32 ng_id = 1;
   uint32 node_id = 2;
-  int64 term = 3;
+  optional int64 term = 3;
 }
include/sharder.h (3)

356-359: Document and handle term <= 0 as “unknown” in UpdateLeader.

Older peers won’t send term and receivers will see 0. Ensure the implementation treats term <= 0 as “no gating by term” (equivalent to the default -1) to keep backward compatibility. If already done in sharder.cpp, add a brief note here.


197-201: Pair memory-order with release store.

LeaderNodeId() uses relaxed load but UpdateLeader() (per PR summary) stores with memory_order_release. Use memory_order_acquire here to pair and ensure readers see a consistent leader/term snapshot when they depend on it.

-        return ng_leader_cache_[cc_ng_id].load(std::memory_order_relaxed);
+        return ng_leader_cache_[cc_ng_id].load(std::memory_order_acquire);

765-767: Bounds-safety and single source of truth for cache sizes.

All caches are sized to 1000. Add a single constexpr (e.g., kMaxNodeGroups) and static_assert the arrays use it. Also add a debug assert on ng_id index accesses in UpdateLeader to prevent OOB on larger clusters.

src/remote/cc_node_service.cpp (1)

219-225: Guard for older peers: normalize absent term (0) to unknown (-1).

Older senders won’t populate term; proto3 will read it as 0. Normalize before calling UpdateLeader to avoid incorrectly gating updates.

-    int64_t term = request->term();
+    int64_t term = request->term();
+    if (term == 0) {
+        term = -1; // treat missing/unknown term
+    }
-    Sharder::Instance().UpdateLeader(ng_id, node_id, term);
+    Sharder::Instance().UpdateLeader(ng_id, node_id, term);
src/fault/cc_node.cpp (2)

835-839: Always refresh term, even if leader node_id is unchanged.

Terms can advance while the leader node_id stays the same. The current guard prevents updating the term cache in that case.

-    if (!resubscribe &&
-        Sharder::Instance().LeaderNodeId(ng_id_) != leader_node_id)
-    {
-        Sharder::Instance().UpdateLeader(ng_id_, leader_node_id, primary_term);
-    }
+    if (!resubscribe) {
+        Sharder::Instance().UpdateLeader(ng_id_, leader_node_id, primary_term);
+    }

231-241: Populate term in NotifyNewLeaderStart RPC.

Now that the proto supports term, include it so receivers can gate correctly. Use LeaderTerm(leader_ng_id) if already escalated; otherwise CandidateLeaderTerm.

         remote::NotifyNewLeaderStartRequest req;
         req.set_ng_id(leader_ng_id);
         req.set_node_id(leader_node_id);
+        int64_t t = Sharder::Instance().LeaderTerm(leader_ng_id);
+        if (t <= 0) t = Sharder::Instance().CandidateLeaderTerm(leader_ng_id);
+        if (t > 0) req.set_term(t);
src/tx_operation.cpp (3)

904-910: Add null check for safety.

While the current code appears safe, adding an explicit check for empty leader info would make the intent clearer.

 static void UpdateLeadersFromLogInfo(
     const google::protobuf::Map<uint32_t, txlog::LeaderInfo> &ng_leader_info)
 {
     if (ng_leader_info.empty())
     {
         return;
     }
+    
+    DLOG(INFO) << "Updating leader cache from log service response with "
+               << ng_leader_info.size() << " entries";

941-947: Consider alternatives to handle unmapped leader IPs.

When no node matches the leader IP, the function logs a warning and continues. Consider whether this warrants a more robust fallback mechanism, such as triggering a full leader discovery.

Consider adding a comment explaining the implications:

         if (leader_node_id == UINT32_MAX)
         {
             // No direct match, skip but log for diagnostics
+            // This could indicate a configuration mismatch or a node that hasn't
+            // been fully registered yet. The leader cache will remain unchanged
+            // for this node group.
             DLOG(WARNING) << "Cannot map leader IP '" << leader_ip
                           << "' to node_id for ng_id=" << ng_id;
             continue;
         }

949-952: Consider log level for production.

The INFO level log might be too verbose in production if leader updates are frequent.

-        DLOG(INFO) << "Update leader from log service: ng_id=" << ng_id
+        DVLOG(1) << "Update leader from log service: ng_id=" << ng_id
                    << ", node_id=" << leader_node_id
                    << ", term=" << leader_term;
src/sharder.cpp (1)

168-175: Initialize ng_leader_term_cache_ with relaxed semantics.

Match nearby initialization style and avoid unnecessary SC fences during boot.

-            ng_leader_term_cache_[nid].store(-1);
+            ng_leader_term_cache_[nid].store(-1, std::memory_order_relaxed);
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d3cf6e8 and 71893dd.

📒 Files selected for processing (6)
  • include/proto/cc_request.proto (1 hunks)
  • include/sharder.h (2 hunks)
  • src/fault/cc_node.cpp (1 hunks)
  • src/remote/cc_node_service.cpp (1 hunks)
  • src/sharder.cpp (2 hunks)
  • src/tx_operation.cpp (3 hunks)
🔇 Additional comments (2)
src/tx_operation.cpp (2)

904-954: Solid implementation of leader cache update mechanism.

The new UpdateLeadersFromLogInfo function properly handles leader information from log service responses:

  • Correctly fetches a snapshot of node group configs for thread-safe access
  • Appropriately handles unknown node groups and unmapped IPs with diagnostic logging
  • Properly maps leader IP to node ID through hostname comparison

1028-1042: LGTM! Proper integration with WriteToLogOp.

The integration point is well-chosen - updating the leader cache after a successful write log response ensures fresh leader information is propagated quickly without adding extra network calls.

Comment thread src/sharder.cpp
Comment on lines +684 to 712
void Sharder::UpdateLeader(uint32_t ng_id, uint32_t node_id, int64_t term)
{
DLOG(INFO) << "ccnode group ng" << ng_id
<< " updates leader to node_id:" << node_id;
auto cached_leader_term =
ng_leader_term_cache_[ng_id].load(std::memory_order_relaxed);
// If the term is passed and out-dated, skip it.
if (term != -1 && cached_leader_term != -1 && term <= cached_leader_term)
{
DLOG(INFO) << "skip out-dated node group leader update, ng: " << ng_id
<< ", term: " << term
<< ", cached term: " << cached_leader_term;
return;
}

while (!ng_leader_term_cache_[ng_id].compare_exchange_weak(
cached_leader_term, term))
{
if (term != -1 && cached_leader_term != -1 &&
term <= cached_leader_term)
{
DLOG(INFO) << "skip out-dated node group leader update, ng: "
<< ng_id << ", term: " << term
<< ", cached term: " << cached_leader_term;
return;
}
}
LOG(INFO) << "update node group ng" << ng_id
<< " leader to node_id: " << node_id;
ng_leader_cache_[ng_id].store(node_id, std::memory_order_release);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Don’t clobber a known term when caller passes unknown term (-1); tighten memory ordering.

Passing term = -1 currently writes -1 into ng_leader_term_cache_, discarding a newer cached term and nullifying gating. This can regress leader decisions if a stale source triggers UpdateLeader without a term. Also, make CAS acq_rel and initial load acquire; fix log text.

 void Sharder::UpdateLeader(uint32_t ng_id, uint32_t node_id, int64_t term)
 {
-    auto cached_leader_term =
-        ng_leader_term_cache_[ng_id].load(std::memory_order_relaxed);
-    // If the term is passed and out-dated, skip it.
-    if (term != -1 && cached_leader_term != -1 && term <= cached_leader_term)
-    {
-        DLOG(INFO) << "skip out-dated node group leader update, ng: " << ng_id
-                   << ", term: " << term
-                   << ", cached term: " << cached_leader_term;
-        return;
-    }
-
-    while (!ng_leader_term_cache_[ng_id].compare_exchange_weak(
-        cached_leader_term, term))
-    {
-        if (term != -1 && cached_leader_term != -1 &&
-            term <= cached_leader_term)
-        {
-            DLOG(INFO) << "skip out-dated node group leader update, ng: "
-                       << ng_id << ", term: " << term
-                       << ", cached term: " << cached_leader_term;
-            return;
-        }
-    }
-    LOG(INFO) << "update node group ng" << ng_id
-              << " leader to node_id: " << node_id;
+    static constexpr int64_t kUnknownTerm = -1;
+    auto cached_leader_term =
+        ng_leader_term_cache_[ng_id].load(std::memory_order_acquire);
+    // If the term is provided and out-dated, skip it.
+    if (term != kUnknownTerm && cached_leader_term != kUnknownTerm &&
+        term <= cached_leader_term) {
+        DLOG(INFO) << "skip out-dated node group leader update, ng: " << ng_id
+                   << ", term: " << term
+                   << ", cached term: " << cached_leader_term;
+        return;
+    }
+
+    // Only update the cached term when a known term is provided.
+    if (term != kUnknownTerm) {
+        while (!ng_leader_term_cache_[ng_id].compare_exchange_weak(
+            cached_leader_term, term, std::memory_order_acq_rel, std::memory_order_acquire)) {
+            if (cached_leader_term != kUnknownTerm && term <= cached_leader_term) {
+                DLOG(INFO) << "skip out-dated node group leader update, ng: "
+                           << ng_id << ", term: " << term
+                           << ", cached term: " << cached_leader_term;
+                return;
+            }
+        }
+    } else {
+        DLOG(INFO) << "update node group leader with unknown term, ng: "
+                   << ng_id << ", node_id: " << node_id;
+    }
+
+    LOG(INFO) << "update node group ng: " << ng_id
+              << " leader to node_id: " << node_id
+              << (term != kUnknownTerm ? (", term: " + std::to_string(term)) : "");
     ng_leader_cache_[ng_id].store(node_id, std::memory_order_release);
 }

To ensure call sites don’t unintentionally downgrade the cached term, confirm the default argument is present in the declaration and audit arities:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Declaration includes default term = -1
rg -nP 'UpdateLeader\s*\(\s*uint32_t\s+\w+,\s*uint32_t\s+\w+,\s*int64_t\s+\w+\s*=\s*-?1\s*\)' -g 'include/**'

# 2) Inventory all call sites by arity
echo "Two-arg calls (potentially unknown term):"
rg -nP --type=cpp '\bUpdateLeader\s*\(\s*[^,()]+\s*,\s*[^,()]+\s*\)' | sed -n '1,200p'
echo "Three-arg calls (term-aware):"
rg -nP --type=cpp '\bUpdateLeader\s*\(\s*[^,()]+\s*,\s*[^,()]+\s*,\s*[^,()]+\s*\)' | sed -n '1,200p'

Length of output: 768


Don't clobber cached leader term with unknown term (-1); tighten memory ordering.

Only update ng_leader_term_cache_ when term != -1; use memory_order_acquire for the initial load and compare_exchange_weak with memory_order_acq_rel (success) and memory_order_acquire (failure); fix the log messages. Header declares default term = -1 (include/sharder.h:358). Two-arg caller that will pass the default: src/fault/cc_node.cpp:215 — audit this call (and other three-arg sites: src/fault/cc_node.cpp:838, src/remote/cc_node_service.cpp:224, src/tx_operation.cpp:952) to ensure callers supply a known term when appropriate.

🤖 Prompt for AI Agents
In src/sharder.cpp around lines 684 to 712, the code currently loads and may
overwrite ng_leader_term_cache_ with the sentinel -1 and uses relaxed ordering;
change the initial load to use memory_order_acquire, only attempt to update
ng_leader_term_cache_ when term != -1 (skip the compare_exchange loop entirely
if term == -1), and when performing compare_exchange_weak use
memory_order_acq_rel on success and memory_order_acquire on failure; also adjust
the log text to be clearer (e.g., "skip outdated node group leader update for
ng: X") and after a successful term update store the node id with the existing
memory_order_release; finally, audit the two- and three-arg call sites
(src/fault/cc_node.cpp lines ~215 and ~838, src/remote/cc_node_service.cpp:224,
src/tx_operation.cpp:952) to ensure callers pass a known term when appropriate.

Comment thread src/tx_operation.cpp Outdated
@MrGuin

MrGuin commented Sep 18, 2025

Copy link
Copy Markdown
Collaborator Author

Closes #82.

@MrGuin MrGuin linked an issue Sep 18, 2025 that may be closed by this pull request

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/sharder.cpp (1)

686-712: Don’t clobber cached term with -1; add acquire/acq_rel semantics (repeats prior feedback).

If a caller passes term = -1 (unknown), this CAS writes -1 into ng_leader_term_cache_, potentially downgrading a newer cached term and breaking gating. Also, use acquire for the initial load and acq_rel on CAS for proper ordering with the subsequent release store to ng_leader_cache_.

Apply this diff to gate unknown-term updates and tighten memory ordering:

 void Sharder::UpdateLeader(uint32_t ng_id, uint32_t node_id, int64_t term)
 {
-    auto cached_leader_term =
-        ng_leader_term_cache_[ng_id].load(std::memory_order_relaxed);
-    // If the term is passed and out-dated, skip it.
-    if (term != -1 && cached_leader_term != -1 && term <= cached_leader_term)
-    {
-        DLOG(INFO) << "skip out-dated node group leader update, ng: " << ng_id
-                   << ", term: " << term
-                   << ", cached term: " << cached_leader_term;
-        return;
-    }
-
-    while (!ng_leader_term_cache_[ng_id].compare_exchange_weak(
-        cached_leader_term, term))
-    {
-        if (term != -1 && cached_leader_term != -1 &&
-            term <= cached_leader_term)
-        {
-            DLOG(INFO) << "skip out-dated node group leader update, ng: "
-                       << ng_id << ", term: " << term
-                       << ", cached term: " << cached_leader_term;
-            return;
-        }
-    }
+    static constexpr int64_t kUnknownTerm = -1;
+    auto cached_leader_term =
+        ng_leader_term_cache_[ng_id].load(std::memory_order_acquire);
+    // If the term is provided and outdated, skip it.
+    if (term != kUnknownTerm && cached_leader_term != kUnknownTerm &&
+        term <= cached_leader_term)
+    {
+        DLOG(INFO) << "skip out-dated node group leader update, ng: " << ng_id
+                   << ", term: " << term
+                   << ", cached term: " << cached_leader_term;
+        return;
+    }
+
+    // Only advance the cached term when a known term is provided.
+    if (term != kUnknownTerm) {
+        while (!ng_leader_term_cache_[ng_id].compare_exchange_weak(
+            cached_leader_term, term,
+            std::memory_order_acq_rel, std::memory_order_acquire))
+        {
+            if (cached_leader_term != kUnknownTerm && term <= cached_leader_term)
+            {
+                DLOG(INFO) << "skip out-dated node group leader update, ng: "
+                           << ng_id << ", term: " << term
+                           << ", cached term: " << cached_leader_term;
+                return;
+            }
+        }
+    }
     LOG(INFO) << "update node group ng" << ng_id
               << " leader to node_id: " << node_id;
     ng_leader_cache_[ng_id].store(node_id, std::memory_order_release);
 }

Run to confirm default arg and callsites use known term where available:

#!/bin/bash
set -euo pipefail
echo "Header declares default term = -1:"
rg -nP 'UpdateLeader\s*\(\s*uint32_t\s+\w+,\s*uint32_t\s+\w+,\s*int64_t\s+\w+\s*=\s*-?1\s*\)'

echo -e "\nTwo-arg calls (unknown term):"
rg -nP --type=cpp '\bUpdateLeader\s*\(\s*[^,()]+\s*,\s*[^,()]+\s*\)' -g '!**/build/**'

echo -e "\nThree-arg calls (term-aware):"
rg -nP --type=cpp '\bUpdateLeader\s*\(\s*[^,()]+\s*,\s*[^,()]+\s*,\s*[^,()]+\s*\)' -g '!**/build/**'
🧹 Nitpick comments (1)
src/sharder.cpp (1)

691-692: Nit: clarify/standardize log messages.

Use “outdated” (no hyphen) and add a colon after “ng” for consistency.

-        DLOG(INFO) << "skip out-dated node group leader update, ng: " << ng_id
+        DLOG(INFO) << "skip outdated node group leader update, ng: " << ng_id
                    << ", term: " << term
                    << ", cached term: " << cached_leader_term;
@@
-            DLOG(INFO) << "skip out-dated node group leader update, ng: "
+            DLOG(INFO) << "skip outdated node group leader update, ng: "
                        << ng_id << ", term: " << term
                        << ", cached term: " << cached_leader_term;
@@
-    LOG(INFO) << "update node group ng" << ng_id
+    LOG(INFO) << "update node group ng: " << ng_id
               << " leader to node_id: " << node_id;

Also applies to: 704-705, 709-710

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 71893dd and 51eb197.

📒 Files selected for processing (2)
  • src/sharder.cpp (2 hunks)
  • src/tx_operation.cpp (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/tx_operation.cpp

  - WriteToLogOp: on successful WriteLog, consume node_group_leader_info and update Sharder leader cache with term.
  - Add UpdateLeadersFromLogInfo(...) to map leader IP to node_id using cached node group configs.
  - Sharder::UpdateLeader: clarify out-of-date term checks and log messages; emit INFO when leader changes.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
src/tx_operation.cpp (4)

946-952: Promote mapping failures to non-debug logs (rate-limited).

Leader updates silently skipped in release builds (DLOG) will hide real incidents. Use LOG with sampling.

-            DLOG(WARNING) << "Cannot map leader '" << leader_ip << ":"
-                          << leader_port << "' to node_id for ng_id=" << ng_id;
+            LOG_EVERY_N(WARNING, 100) << "Cannot map leader '" << leader_host << ":"
+                                      << leader_port << "' to node_id for ng_id=" << ng_id;

912-914: Avoid copying the entire NG config map on the hot path.

GetNodeGroupConfigs() into a value (auto) copies all NGs. If this runs on every successful WriteLog, it can be expensive. Prefer a per-NG snapshot getter (e.g., GetNodeGroupConfigSnapshot(ng_id)) or a read-locked const reference plus internal synchronization.

Would you confirm whether Sharder exposes a per-NG accessor? If not, I can propose one.


954-958: Gate updates on valid terms, and document semantics.

If leader_info.term() can be 0/negative (unknown), skip updating or let Sharder enforce monotonicity. Add a guard or a comment clarifying UpdateLeader’s term rules.

-        DLOG(INFO) << "Update leader from log service: ng_id=" << ng_id
+        if (leader_term <= 0) {
+            LOG_EVERY_N(INFO, 1000) << "Skip leader update with non-positive term "
+                                    << leader_term << " for ng_id=" << ng_id;
+            continue;
+        }
+        DLOG(INFO) << "Update leader from log service: ng_id=" << ng_id
                    << ", node_id=" << leader_node_id
                    << ", term=" << leader_term;

903-959: Add minimal normalization helper (optional).

If you prefer to keep the body lean, consider an internal lambda for bracket stripping to keep comparisons tidy.

 static void UpdateLeadersFromLogInfo(
     const google::protobuf::Map<uint32_t, txlog::LeaderInfo> &ng_leader_info)
 {
+    auto normalize_host = [](const std::string& h)->std::string {
+        return (!h.empty() && h.front()=='[' && h.back()==']')
+            ? h.substr(1, h.size()-2) : h;
+    };
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 74e9c13 and e851f41.

📒 Files selected for processing (3)
  • src/sharder.cpp (2 hunks)
  • src/tx_operation.cpp (3 hunks)
  • tx-log-protos (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • tx-log-protos
  • src/sharder.cpp
🧰 Additional context used
🧬 Code graph analysis (1)
src/tx_operation.cpp (1)
include/tx_operation.h (1)
  • log_closure_ (328-329)
🔇 Additional comments (3)
src/tx_operation.cpp (3)

1033-1041: LGTM: integrate leader-info refresh only on successful WriteLog responses.

The gating on !cntl->Failed(), Success status, and has_write_log_response() looks correct.

Please confirm that LogResponse::write_log_response().node_group_leader_info() is populated consistently across all WriteLog types you expect to rely on (DATA, PREPARE/COMMIT/CLEAN for schema, migration logs). If not, consider logging when it’s present to track coverage.


930-944: Make leader mapping robust: don’t assume ip() equals host_name_; handle IPv6 and add a safe fallback.

If NodeConfig.host_name_ stores a hostname and LeaderInfo.ip() returns an IP literal, the equality will fail and we’ll skip valid updates. Also, bracketed IPv6 (e.g., “[::1]”) won’t match. Normalize the host, and if no host match is found, fall back to a unique-replay-port match within the same NG (only if exactly one node uses that replay port).

Apply this diff inside UpdateLeadersFromLogInfo:

-        const std::string &leader_ip = leader_info.ip();
+        const std::string &raw_host = leader_info.ip();
+        const std::string leader_host =
+            (!raw_host.empty() && raw_host.front() == '[' && raw_host.back() == ']')
+                ? raw_host.substr(1, raw_host.size() - 2)
+                : raw_host;
         const uint32_t leader_port = leader_info.port();
         int64_t leader_term = leader_info.term();
@@
-        for (const auto &node_config : node_configs)
+        for (const auto &node_config : node_configs)
         {
             // The log replay service listens on GET_LOG_REPLAY_RPC_PORT(port_)
             // Compare both host and the corresponding replay port.
-            if (node_config.host_name_ == leader_ip &&
-                static_cast<uint32_t>(
-                    GET_LOG_REPLAY_RPC_PORT(node_config.port_)) == leader_port)
+            const uint32_t replay_port =
+                static_cast<uint32_t>(GET_LOG_REPLAY_RPC_PORT(node_config.port_));
+            if (node_config.host_name_ == leader_host && replay_port == leader_port)
             {
                 leader_node_id = node_config.node_id_;
                 break;
             }
         }
+
+        // Optional, safe fallback: if host didn’t match, pick the sole node whose
+        // replay port equals leader_port within this NG.
+        if (leader_node_id == UINT32_MAX) {
+            uint32_t candidate = UINT32_MAX;
+            int matches = 0;
+            for (const auto &node_config : node_configs) {
+                const uint32_t replay_port =
+                    static_cast<uint32_t>(GET_LOG_REPLAY_RPC_PORT(node_config.port_));
+                if (replay_port == leader_port) {
+                    candidate = node_config.node_id_;
+                    ++matches;
+                    if (matches > 1) break;
+                }
+            }
+            if (matches == 1) {
+                leader_node_id = candidate;
+            }
+        }

930-944: Verify endpoint schema alignment across services

  • Confirmed: NodeConfig.port_ is the base port and GET_LOG_REPLAY_RPC_PORT(port) == port + 3. (include/sharder.h; used in src/sharder.cpp when starting log_replay_server and passing node_conf.port_.)
  • Confirmed by code: UpdateLeadersFromLogInfo compares leader_info.port() against GET_LOG_REPLAY_RPC_PORT(node_config.port_) and leader_info.ip() against node_config.host_name_ (src/tx_operation.cpp:905-939).
  • NOT CONFIRMED: txlog::LeaderInfo definition / proto is not present in this repo, so I cannot verify whether LeaderInfo.port is actually the log-replay RPC port or whether LeaderInfo.ip is an IP literal vs hostname. You must check the txlog proto / log service that populates LeaderInfo and ensure:
    • port field contains the log-replay RPC port,
    • ip field uses the same hostname/IP format as NodeConfig.host_name_ (or update UpdateLeadersFromLogInfo to normalize/resolve names before comparing).

@MrGuin MrGuin merged commit 575b560 into main Sep 23, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: DDL orphan lock in participants needs to be handled.

3 participants