Notify checkpoint scan workers when raising concurrency#167
Conversation
WalkthroughAdjusts data-sync notification behavior: notifications are now sent while holding the worker mutex when waking the DataSyncWorker, and DataSyncForHashPartition conditionally notifies only when an acquire-load detects an increase in Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
fa2560c to
02b3b20
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
tx_service/src/cc/local_cc_shards.cpp (1)
4655-4661: Conditional notify onscan_concurrency_increase matches intent; atomic ordering can be simplifiedThe new
need_notifycheck correctly avoids spuriousnotify_all()calls when the concurrency doesn't increase and still wakes sleepers when it does, so behaviorally this looks good and aligns with the gating logic inDataSyncWorker.One nit: using
std::memory_order_acquireon the load while the store usesmemory_order_relaxeddoesn’t actually buy extra ordering here and may give a misleading sense of a release/acquire pair. You can either:
- Keep the logic as-is but make both operations
memory_order_relaxedfor clarity, or- If you ever need a precise “old value” for
need_notify, switch to an atomic RMW (e.g.exchangewithmemory_order_acq_rel) so the comparison is against the real previous value.Given this is only a tuning hint for concurrency, the current code is acceptable; the above is just for clarity/semantics hygiene.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tx_service/src/cc/local_cc_shards.cpp(1 hunks)
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
tx_service/src/cc/local_cc_shards.cpp
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
Applied to files:
tx_service/src/cc/local_cc_shards.cpp
📚 Learning: 2025-12-02T10:43:27.431Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.
Applied to files:
tx_service/src/cc/local_cc_shards.cpp
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Applied to files:
tx_service/src/cc/local_cc_shards.cpp
02b3b20 to
2b11c6f
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tx_service/src/cc/local_cc_shards.cpp (1)
2867-2901: Remove the redundant lock;task_worker_lkalready holdsdata_sync_worker_ctx_.mux_At line 2856,
DataSyncWorkeracquiresstd::unique_lock<std::mutex> task_worker_lk(data_sync_worker_ctx_.mux_)and holds it through the entirewhile(true)loop. The newstd::lock_guard<std::mutex> lk(data_sync_worker_ctx_.mux_);at lines 2891-2893 attempts to lock the same non-recursive mutex again, which will deadlock whenneed_notifybecomes true.Condition variable notifications may be safely issued while already holding the mutex. Remove the extra lock:
if (need_notify) { - std::lock_guard<std::mutex> lk(data_sync_worker_ctx_.mux_); data_sync_worker_ctx_.cv_.notify_all(); }
🧹 Nitpick comments (1)
tx_service/src/cc/local_cc_shards.cpp (1)
4654-4663: Conditional notify onscan_concurrency_looks good; consider tightening memory‑order usageThe new logic to only call
notify_all()when the computedscan_concurrencyexceeds the previous value makes sense and should reduce unnecessary wakeups. It also avoids ever writing0(which would stall workers), since the block is gated onscan_concurrency > 0.If you want cleaner happens‑before reasoning between the producer here and the wait predicate in
DataSyncWorker, you might consider:
- Storing with
memory_order_release, and- Loading in the wait predicate with
memory_order_acquireinstead ofrelaxed.This isn’t a correctness blocker, more about making the concurrency story easier to reason about.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tx_service/src/cc/local_cc_shards.cpp(2 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
tx_service/src/cc/local_cc_shards.cpp
📚 Learning: 2025-12-02T10:43:27.431Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.
Applied to files:
tx_service/src/cc/local_cc_shards.cpp
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
Applied to files:
tx_service/src/cc/local_cc_shards.cpp
Here are some reminders before you submit the pull request
fixes eloqdb/tx_service#issue_id./mtr --suite=mono_main,mono_multi,mono_basicSummary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.