Change bucket num to 1024 and dss support multi-shards#199
Conversation
WalkthroughRefactors hash/partition mapping and bucket assignment (introduces total_hash_partitions and changes mapping APIs), replaces rand()/srand() with per-node-group std::mt19937 RNG, changes shard-to-core residual computation in EnqueueCcRequest paths, and adds an Changes
Sequence Diagram(s)sequenceDiagram
actor CC_Node
participant LocalCC as LocalCCShards
participant StoreHD as DataStoreHandler
Note over CC_Node,StoreHD: lifecycle RPCs now include ng_id
CC_Node->>StoreHD: OnLeaderStart(ng_id, next_leader_node)
StoreHD-->>CC_Node: Ack
CC_Node->>StoreHD: OnStartFollowing(ng_id, leader_node_id, ...)
StoreHD-->>CC_Node: Follow actions
CC_Node->>StoreHD: OnLeaderStop(ng_id, term)
StoreHD-->>CC_Node: Stop actions
Note over LocalCC: enqueue and bucket assignment
LocalCC->>LocalCC: residual = shard_code & 0x3FF
LocalCC->>LocalCC: core = residual % cc_shards_.size()
LocalCC->>LocalCC: bucket assignment uses per-ng std::mt19937 RNG
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (6)
🚧 Files skipped from review as they are similar to previous changes (1)
🧰 Additional context used🧠 Learnings (5)📓 Common learnings📚 Learning: 2025-11-11T07:10:40.346ZApplied to files:
📚 Learning: 2025-10-21T06:46:53.700ZApplied to files:
📚 Learning: 2025-10-09T03:56:58.811ZApplied to files:
📚 Learning: 2025-10-20T04:30:07.884ZApplied to files:
🧬 Code graph analysis (2)include/store/data_store_handler.h (3)
src/fault/cc_node.cpp (2)
🔇 Additional comments (8)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
include/sharder.h (1)
236-245: Type and invariant polish for new mapping helpers
- Return type of MapHashPartitionIdToBucketId should be uint16_t to match bucket_id usage elsewhere.
- Add basic sanity checks (non-negative partition_id; partitions > 0). Also, consider promoting the total_hash_partitions <= total_range_buckets check to a shared compile-time or init-time assert rather than per-call.
- static inline int32_t MapBucketIdToHashPartitionId(uint16_t bucket_id) + static inline int32_t MapBucketIdToHashPartitionId(uint16_t bucket_id) { - assert(total_hash_partitions <= total_range_buckets); - return bucket_id % total_hash_partitions; + assert(total_hash_partitions > 0); + assert(total_hash_partitions <= total_range_buckets); + return static_cast<int32_t>(bucket_id % total_hash_partitions); } - static inline uint32_t MapHashPartitionIdToBucketId(int32_t partition_id) + static inline uint16_t MapHashPartitionIdToBucketId(int32_t partition_id) { - return static_cast<uint32_t>(partition_id % total_range_buckets); + assert(partition_id >= 0); + return static_cast<uint16_t>(partition_id % total_range_buckets); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
include/cc/local_cc_shards.h(1 hunks)include/sharder.h(2 hunks)include/store/data_store_handler.h(1 hunks)include/type.h(1 hunks)src/cc/local_cc_shards.cpp(1 hunks)src/fault/cc_node.cpp(3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
include/cc/local_cc_shards.hsrc/cc/local_cc_shards.cpp
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Applied to files:
include/cc/local_cc_shards.hsrc/cc/local_cc_shards.cpp
🧬 Code graph analysis (3)
include/sharder.h (1)
include/cc/ccm_scanner.h (1)
uint32_t(462-472)
include/store/data_store_handler.h (3)
src/fault/cc_node.cpp (6)
OnStartFollowing(692-716)OnStartFollowing(692-692)OnLeaderStart(257-470)OnLeaderStart(257-260)OnLeaderStop(472-521)OnLeaderStop(472-472)src/remote/cc_node_service.cpp (6)
OnStartFollowing(125-145)OnStartFollowing(125-129)OnLeaderStart(60-111)OnLeaderStart(60-63)OnLeaderStop(113-123)OnLeaderStop(113-116)src/sharder.cpp (6)
OnStartFollowing(906-921)OnStartFollowing(906-909)OnLeaderStart(843-859)OnLeaderStart(843-847)OnLeaderStop(861-873)OnLeaderStop(861-861)
src/fault/cc_node.cpp (2)
include/tx_request.h (1)
ng_id_(1203-1219)include/table_statistics.h (1)
ng_id_(516-516)
🔇 Additional comments (5)
include/sharder.h (1)
223-229: Verify power-of-two constraint for bitmask mapping; proposed static_assert won't compileThe bitmask operation (
& (kHashPartitions - 1)) requirestotal_hash_partitionsto be a power of two—currently true (0x400= 1024). However, the suggestedstatic_assertin the fix won't compile becausetotal_hash_partitionsin include/type.h isinline static const uint16_t, notconstexpr.To enforce this guarantee:
- Make
total_hash_partitionsconstexpr in include/type.h and add thestatic_assert, OR- Add a runtime assert in
MapKeyHashToHashPartitionIdand keep the bitmask, OR- Switch to modulo (reuse
MapBucketIdToHashPartitionId) to be defensive against future changes to a non-power-of-two value.Also consider:
MapKeyHashToHashPartitionIdandMapBucketIdToHashPartitionIdboth map bucket IDs to partition IDs but use different operators (bitmask vs. modulo). For consistency, prefer unified logic.src/fault/cc_node.cpp (3)
355-356: LGTM!The call to
OnLeaderStarthas been correctly updated to passng_id_as the first parameter, matching the updated interface signature.
498-498: LGTM!The call to
OnLeaderStophas been correctly updated to passng_id_as the first parameter, matching the updated interface signature.
977-978: LGTM!The call to
OnStartFollowinghas been correctly updated to passng_id_as the first parameter, matching the updated interface signature. The line wrapping maintains readability.include/store/data_store_handler.h (1)
386-401: All DataStoreHandler implementations are compatible with the interface changes.IntMemoryStore (include/store/int_mem_store.h:36) is the only concrete implementation of DataStoreHandler in the codebase. It does not override
OnStartFollowing,OnLeaderStart, orOnLeaderStop, so it automatically inherits the updated method signatures with theng_idparameter from the base class (include/store/data_store_handler.h:386-401). No implementation updates are required.
88e47f8 to
f4785dd
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (3)
include/cc/local_cc_shards.h (2)
225-226: Replace hardcoded0x3FFmask with configurable value.The hardcoded
0x3FFmask tightly couples this code to 1024 hash partitions, contradicting the PR's goal of configurable partitioning. Iftotal_hash_partitionschanges, this mask will become incorrect, leading to improper shard distribution.Apply this fix:
- uint32_t residual = shard_code & 0x3FF; + uint32_t residual = shard_code & (total_hash_partitions - 1); size_t core_idx = residual % cc_shards_.size();Additionally, add a static assertion to ensure
total_hash_partitionsis a power of 2:static_assert((total_hash_partitions & (total_hash_partitions - 1)) == 0, "total_hash_partitions must be a power of 2");
233-234: Same hardcoded mask issue in overloaded method.This overload has the identical hardcoded
0x3FFmask problem. Apply the same fix here to maintain consistency across bothEnqueueCcRequestoverloads.- uint32_t residual = shard_code & 0x3FF; + uint32_t residual = shard_code & (total_hash_partitions - 1); size_t core_idx = residual % cc_shards_.size();src/cc/local_cc_shards.cpp (1)
4371-4373: Make partition_number runtime-const and enforce power-of-two invariantUse a runtime const and assert the bitmask precondition to avoid compile-time coupling and future misconfigurations.
- // constexpr size_t partition_number = 1024; - constexpr size_t partition_number = total_hash_partitions; + // constexpr size_t partition_number = 1024; + const size_t partition_number = total_hash_partitions; + assert(partition_number > 0 && + (partition_number & (partition_number - 1)) == 0 && + "total_hash_partitions must be a power of two");
🧹 Nitpick comments (1)
src/cc/local_cc_shards.cpp (1)
5394-5399: Avoid hard-coded 0x3FF; derive from total_hash_partitionsTie the core-index mapping to the configured partition count to prevent silent skew if partitions change.
- if (!table_name.IsHashPartitioned()) - { - key_core_idx = (rec.Key().Hash() & 0x3FF) % Count(); - } + if (!table_name.IsHashPartitioned()) + { + // relies on total_hash_partitions being a power of two (guarded above) + key_core_idx = + (rec.Key().Hash() & (total_hash_partitions - 1)) % Count(); + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
include/cc/cc_request.h(1 hunks)include/cc/local_cc_shards.h(1 hunks)include/sharder.h(1 hunks)include/store/data_store_handler.h(1 hunks)include/type.h(1 hunks)src/cc/local_cc_shards.cpp(1 hunks)src/fault/cc_node.cpp(3 hunks)
✅ Files skipped from review due to trivial changes (1)
- include/cc/cc_request.h
🚧 Files skipped from review as they are similar to previous changes (3)
- include/type.h
- include/sharder.h
- include/store/data_store_handler.h
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
include/cc/local_cc_shards.hsrc/cc/local_cc_shards.cpp
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Applied to files:
include/cc/local_cc_shards.hsrc/cc/local_cc_shards.cpp
🔇 Additional comments (2)
src/fault/cc_node.cpp (2)
355-357: ng_id plumbing verified; all signatures consistentVerification confirms the interface signatures and callsites are aligned:
- DataStoreHandler defines
OnLeaderStart(uint32_t ng_id, uint32_t *next_leader_node)andOnLeaderStop(uint32_t ng_id, int64_t term)- Callsites at cc_node.cpp:355 and 498 correctly pass ng_id and term parameters
- No legacy or inconsistent call patterns detected
978-979: Code change verified; signature and calls are consistentThe reviewed code at cc_node.cpp:977-978 correctly passes all five arguments to
DataStoreHandler::OnStartFollowing:
ng_id_(new first parameter)leader_node_idprimary_termstandby_termresubscribeThis matches the base class signature in
include/store/data_store_handler.h:386-390exactly. All call sites toSharder::OnStartFollowingcorrectly pass either 3 or 4 arguments (withresubscribedefaulting tofalse).Verification is limited: no concrete
DataStoreHandlersubclass implementations that overrideOnStartFollowingexist in the accessible codebase (only the base class with a no-op default). If implementations exist elsewhere, they would need manual verification to confirm the signature update.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/cc/local_cc_shards.cpp (1)
2126-2133: Same RNG header requirement applies here; determinism per ng_id looks good.Once is added, this site is fine.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/cc/local_cc_shards.cpp(3 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.324Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
📚 Learning: 2025-11-11T07:10:40.324Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.324Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
Applied to files:
src/cc/local_cc_shards.cpp
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
src/cc/local_cc_shards.cpp
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Applied to files:
src/cc/local_cc_shards.cpp
🔇 Additional comments (1)
src/cc/local_cc_shards.cpp (1)
4376-4379: No changes required; power-of-two invariant is already satisfied.The review comment's underlying concern is valid, but the code is already correct. total_hash_partitions is defined as 0x400 (1024), which is a power of two (2^10). MapKeyHashToHashPartitionId uses hash_code % total_hash_partitions, and the 0x3FF mask (1023) consistently masks the lower 10 bits for core distribution, aligning with the 0x400 range.
The per-core partition assignment at lines 4376-4379 correctly divides 1024 partitions across cores. An existing assert at include/sharder.h:235 validates the relationship between total_hash_partitions and total_range_buckets. Since total_hash_partitions is a compile-time constant, the suggested runtime assert is defensive but not required for correctness.
| // Thread-safe and deterministic random generator | ||
| std::mt19937 rng(ng); | ||
| std::uniform_int_distribution<uint16_t> dist(0, | ||
| total_range_buckets - 1); | ||
| size_t generated = 0; | ||
| while (generated < 64) | ||
| { | ||
| uint16_t rand_num = rand() % total_range_buckets; | ||
| uint16_t rand_num = dist(rng); | ||
| if (rand_num_to_ng.find(rand_num) == rand_num_to_ng.end()) |
There was a problem hiding this comment.
Missing header for std::mt19937/std::uniform_int_distribution (compile error).
Add the proper header; otherwise this TU won’t compile.
Apply this diff to includes:
@@
#include <unordered_map>
+#include <random>Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/cc/local_cc_shards.cpp around lines 1949 to 1957, the code uses
std::mt19937 and std::uniform_int_distribution but the <random> header is not
included; add #include <random> to the file's includes (preferably near the
other standard library headers) so the translation unit compiles successfully.
59f837e to
6f683e9
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
src/cc/local_cc_shards.cpp (2)
2126-2133: Same header requirement applies here.Depends on the previous include of .
1949-1957: Compile error: missing for mt19937/uniform_int_distribution.This TU uses std::mt19937 and std::uniform_int_distribution but doesn’t include the header.
#include <unordered_map> +#include <random>Also consider capping attempts or logging when nearing saturation to avoid long loops when cluster has many node groups (optional).
🧹 Nitpick comments (1)
include/sharder.h (1)
233-237: Mirror invariant and document expectations.Good to assert total_hash_partitions <= total_range_buckets here. Also assert total_hash_partitions > 0 for symmetry.
static inline int32_t MapBucketIdToHashPartitionId(uint16_t bucket_id) { - assert(total_hash_partitions <= total_range_buckets); + assert(total_hash_partitions > 0); + assert(total_hash_partitions <= total_range_buckets); return bucket_id % total_hash_partitions; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
include/cc/local_cc_shards.h(1 hunks)include/sharder.h(1 hunks)include/store/data_store_handler.h(1 hunks)include/type.h(1 hunks)src/cc/local_cc_shards.cpp(3 hunks)src/fault/cc_node.cpp(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- src/fault/cc_node.cpp
- include/type.h
- include/cc/local_cc_shards.h
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.324Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
📚 Learning: 2025-11-11T07:10:40.324Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.324Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
Applied to files:
include/sharder.hsrc/cc/local_cc_shards.cpp
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
src/cc/local_cc_shards.cpp
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Applied to files:
src/cc/local_cc_shards.cpp
🧬 Code graph analysis (1)
include/store/data_store_handler.h (3)
src/fault/cc_node.cpp (6)
OnStartFollowing(692-716)OnStartFollowing(692-692)OnLeaderStart(257-470)OnLeaderStart(257-260)OnLeaderStop(472-521)OnLeaderStop(472-472)src/remote/cc_node_service.cpp (6)
OnStartFollowing(125-145)OnStartFollowing(125-129)OnLeaderStart(60-111)OnLeaderStart(60-63)OnLeaderStop(113-123)OnLeaderStop(113-116)src/sharder.cpp (6)
OnStartFollowing(906-921)OnStartFollowing(906-909)OnLeaderStart(843-859)OnLeaderStart(843-847)OnLeaderStop(861-873)OnLeaderStop(861-861)
🔇 Additional comments (4)
src/cc/local_cc_shards.cpp (3)
4377-4380: Hash-partition scan distribution LGTM.Even split across cores with remainder spread to lowest worker_idx; indices stay < total_hash_partitions. Nice.
4423-4432: Filter by partition id LGTM.Uses Sharder::MapKeyHashToHashPartitionId; consistent with new modulo-based mapping.
4718-4723: Partition id tagging LGTM.Tagging FlushRecords with MapKeyHashToHashPartitionId keeps downstream aggregation aligned.
include/store/data_store_handler.h (1)
386-402: Signature changes are correctly implemented—no overrides needed.The codebase contains only one concrete DataStoreHandler subclass (IntMemoryStore), which does not override the On* methods and therefore correctly inherits the updated base class signatures. All call sites match the new parameter lists.
| return static_cast<int32_t>(hash_code % total_hash_partitions); | ||
| } |
There was a problem hiding this comment.
Guard against zero partitions before modulo.
Add a lightweight runtime assert to prevent UB if total_hash_partitions is ever misconfigured to 0.
static inline int32_t MapKeyHashToHashPartitionId(uint64_t hash_code)
{
- return static_cast<int32_t>(hash_code % total_hash_partitions);
+ assert(total_hash_partitions > 0);
+ return static_cast<int32_t>(hash_code % total_hash_partitions);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| return static_cast<int32_t>(hash_code % total_hash_partitions); | |
| } | |
| assert(total_hash_partitions > 0); | |
| return static_cast<int32_t>(hash_code % total_hash_partitions); | |
| } |
🤖 Prompt for AI Agents
In include/sharder.h around lines 225-226, the modulo operation can UB if
total_hash_partitions is 0; add a lightweight runtime guard like an
assert(total_hash_partitions != 0) (or an if that logs/throws/returns a safe
default) immediately before the return to ensure total_hash_partitions is
non-zero and avoid performing the modulo when misconfigured.
| static inline uint32_t MapHashPartitionIdToBucketId(int32_t partition_id) | ||
| { | ||
| return static_cast<uint32_t>(partition_id % total_range_buckets); | ||
| } |
There was a problem hiding this comment.
Sign/round‑trip safety: prefer unsigned or assert non‑negative and mirror invariant.
partition_id is int32_t; if negative, modulo then cast-to-uint32_t yields a large value. Either make partition_id unsigned or assert non-negative and mirror the ≤ invariant for consistency.
-static inline uint32_t MapHashPartitionIdToBucketId(int32_t partition_id)
+static inline uint32_t MapHashPartitionIdToBucketId(int32_t partition_id)
{
- return static_cast<uint32_t>(partition_id % total_range_buckets);
+ assert(total_hash_partitions <= total_range_buckets);
+ assert(partition_id >= 0);
+ // Avoid signed modulo surprises.
+ return static_cast<uint32_t>(
+ static_cast<uint32_t>(partition_id) % total_range_buckets);
}Optionally:
- static inline uint32_t MapHashPartitionIdToBucketId(int32_t partition_id)
+ static inline uint32_t MapHashPartitionIdToBucketId(uint32_t partition_id)🤖 Prompt for AI Agents
In include/sharder.h around lines 239-242, the function
MapHashPartitionIdToBucketId accepts an int32_t and casts a potentially negative
modulo result to uint32_t which yields large values; either change the parameter
to an unsigned type (uint32_t) and perform static modulo with
total_range_buckets to mirror the ≤ invariant, or keep int32_t but assert
partition_id >= 0 at the top and explicitly cast before modulo
(static_cast<uint32_t>(partition_id) % total_range_buckets); update callers or
document the invariant accordingly so the function only ever receives
non-negative partition IDs.
c539c16 to
7eb86b4
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/cc/local_cc_shards.cpp (1)
4423-4432: Restrict filter to this core’s partitions to avoid overscanningCurrent filter uses a contiguous [min,max] range, which includes partitions not assigned to this core in the round‑robin layout. This can overscan and skew memory budgeting per pass. Constrain by core modulus as well.
Apply:
- std::function<bool(size_t)> filter_lambda = - [min_partition_id_this_scan, - max_partition_id_this_scan, - &filter_func = - data_sync_task->filter_lambda_](const size_t hash_code) - { - int32_t part_id = Sharder::MapKeyHashToHashPartitionId(hash_code); - return static_cast<size_t>(part_id) >= min_partition_id_this_scan && - static_cast<size_t>(part_id) <= max_partition_id_this_scan && - (!filter_func || filter_func(hash_code)); - }; + std::function<bool(size_t)> filter_lambda = + [min_partition_id_this_scan, + max_partition_id_this_scan, + core_number, + worker_idx, + &filter_func = data_sync_task->filter_lambda_](const size_t hash_code) + { + const size_t part_id = + static_cast<size_t>(Sharder::MapKeyHashToHashPartitionId(hash_code)); + const bool in_window = + part_id >= min_partition_id_this_scan && part_id <= max_partition_id_this_scan; + const bool belongs_to_core = (part_id % core_number) == worker_idx; + return in_window && belongs_to_core && + (!filter_func || filter_func(hash_code)); + };Note: the 0x3FF pre-mod mask used elsewhere for range paths is intentional and should remain unchanged. Based on learnings.
♻️ Duplicate comments (2)
include/sharder.h (1)
239-242: Guard against negativepartition_idbefore moduloIf
partition_idever arrives negative (e.g., malformed input or upstream bug), C++'s signed modulo keeps it negative and the subsequent cast touint32_twraps to0xFFFFFFFF, producing an invalid bucket index. Mirror the invariant explicitly before casting.- return static_cast<uint32_t>(partition_id % total_range_buckets); + assert(partition_id >= 0); + return static_cast<uint32_t>( + static_cast<uint32_t>(partition_id) % total_range_buckets);src/cc/local_cc_shards.cpp (1)
1949-1957: Add missing include and tighten distribution bound typesstd::mt19937/std::uniform_int_distribution are used but isn’t included; this TU won’t compile if not pulled in indirectly. Also, make the upper bound type-safe.
Apply:
Add header near other STL includes:
#includeMake the bound explicit to avoid narrowing if total_range_buckets grows:
- std::uniform_int_distribution<uint16_t> dist(0, - total_range_buckets - 1); + std::uniform_int_distribution<uint16_t> dist( + static_cast<uint16_t>(0), + static_cast<uint16_t>(total_range_buckets - 1));Optionally, guard the domain:
- static_assert(total_range_buckets <= std::numeric_limits<uint16_t>::max(), "bucket id exceeds uint16_t");
🧹 Nitpick comments (2)
src/cc/local_cc_shards.cpp (2)
1982-1985: Avoid shadowing parameter ng_idLocal variable NodeGroupId ng_id shadows the function parameter ng_id; rename to owner_ng for clarity.
- NodeGroupId ng_id = it == rand_num_to_ng.end() - ? rand_num_to_ng.begin()->second - : it->second; + NodeGroupId owner_ng = it == rand_num_to_ng.end() + ? rand_num_to_ng.begin()->second + : it->second; - auto insert_res = ng_bucket_infos.try_emplace( - bucket_id, std::make_unique<BucketInfo>(ng_id, version)); + auto insert_res = ng_bucket_infos.try_emplace( + bucket_id, std::make_unique<BucketInfo>(owner_ng, version)); - if (insert_res.second) { insert_res.first->second->Set(ng_id, version); } + if (insert_res.second) { insert_res.first->second->Set(owner_ng, version); }
2126-2133: DRY: extract a tiny helper for NG-seeded RNGSame RNG seeding logic appears in InitRangeBuckets and GenerateBucketMigrationPlan. Consider a small static helper (e.g., MakeNgRng(uint32_t ng)) to reduce duplication.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
include/cc/local_cc_shards.h(1 hunks)include/sharder.h(1 hunks)include/store/data_store_handler.h(1 hunks)include/type.h(1 hunks)src/cc/local_cc_shards.cpp(3 hunks)src/fault/cc_node.cpp(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- include/type.h
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.324Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
📚 Learning: 2025-11-11T07:10:40.324Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.324Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
Applied to files:
include/cc/local_cc_shards.hsrc/cc/local_cc_shards.cppinclude/sharder.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
include/cc/local_cc_shards.hsrc/cc/local_cc_shards.cpp
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Applied to files:
include/cc/local_cc_shards.hsrc/cc/local_cc_shards.cpp
📚 Learning: 2025-10-21T06:46:53.700Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 149
File: src/remote/cc_stream_receiver.cpp:1066-1075
Timestamp: 2025-10-21T06:46:53.700Z
Learning: In src/remote/cc_stream_receiver.cpp, for ScanNextRequest handling, BucketIds() on RemoteScanNextBatch should never be empty—this is an expected invariant of the scan protocol.
Applied to files:
include/sharder.h
🧬 Code graph analysis (2)
src/fault/cc_node.cpp (2)
include/tx_request.h (1)
ng_id_(1203-1219)include/table_statistics.h (1)
ng_id_(516-516)
include/store/data_store_handler.h (3)
src/fault/cc_node.cpp (6)
OnStartFollowing(692-716)OnStartFollowing(692-692)OnLeaderStart(257-470)OnLeaderStart(257-260)OnLeaderStop(472-521)OnLeaderStop(472-472)src/remote/cc_node_service.cpp (6)
OnStartFollowing(125-145)OnStartFollowing(125-129)OnLeaderStart(60-111)OnLeaderStart(60-63)OnLeaderStop(113-123)OnLeaderStop(113-116)src/sharder.cpp (6)
OnStartFollowing(906-921)OnStartFollowing(906-909)OnLeaderStart(843-859)OnLeaderStart(843-847)OnLeaderStop(861-873)OnLeaderStop(861-861)
🔇 Additional comments (4)
src/cc/local_cc_shards.cpp (1)
4377-4379: No changes needed; original review comment is based on incorrect assumptions about the codebase.The code at lines 4377-4379 is mathematically correct. The review comment incorrectly assumes that
MapKeyHashToHashPartitionIdrelies on bitmasking and thattotal_hash_partitionsrequires a power-of-two runtime check. In reality:
total_hash_partitions = 0x400is a compile-time constant (include/type.h:610), so runtime enforcement is unnecessaryMapKeyHashToHashPartitionIduses modulo (hash % total_hash_partitions), not bitmasking—modulo works correctly with any divisor- The round-robin distribution formula
total_hash_partitions / core_number + (worker_idx < total_hash_partitions % core_number)is mathematically sound regardless of whether the numerator is a power of twoLikely an incorrect or invalid review comment.
src/fault/cc_node.cpp (3)
355-356: OnLeaderStart now threads ng_id context correctly.The call site matches the new
OnLeaderStart(uint32_t ng_id, ...)signature, ensuring the store handler knows which node-group is escalating to leader.
498-498: Leader stop path wired to ng_id-aware handler.Passing
ng_id_alongsidetermkeeps the shutdown path consistent with the multi-shard handler API.
977-979: Follower subscription forwards ng_id as required.
OnStartFollowingnow receives the node-group context, aligning the standby transition with the updated store handler contract.
| virtual void OnStartFollowing(uint32_t ng_id, | ||
| uint32_t leader_node_id, | ||
| int64_t term, | ||
| int64_t standby_term, | ||
| bool resubscribe) | ||
| { | ||
| } | ||
|
|
||
| virtual bool OnLeaderStart(uint32_t *next_leader_node) | ||
| virtual bool OnLeaderStart(uint32_t ng_id, uint32_t *next_leader_node) | ||
| { | ||
| return true; | ||
| } | ||
|
|
||
| virtual bool OnLeaderStop(int64_t term) | ||
| virtual bool OnLeaderStop(uint32_t ng_id, int64_t term) | ||
| { | ||
| return true; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
API signature change: ensure all overrides and call sites updated
OnStartFollowing/OnLeaderStart/OnLeaderStop now include ng_id. Confirm all derived DataStoreHandler implementations and invocations are updated, or builds will break.
Additionally, update method docs to mention ng_id purpose.
🏁 Script executed:
#!/bin/bash
# Find overrides and mismatches for the three methods.
rg -nP 'class\s+\w+\s*:\s*public\s+.*DataStoreHandler'
echo "---- Potential outdated overrides ----"
rg -nP '(OnStartFollowing\s*\(|OnLeaderStart\s*\(|OnLeaderStop\s*\()' -C2
# Spot signatures still missing ng_id as the first arg.
rg -nP 'OnStartFollowing\s*\(\s*(uint32_t\s+leader_node_id|int64_t\s+term)' -C2
rg -nP 'OnLeaderStart\s*\(\s*(uint32_t\s*\*\s*next_leader_node)\s*\)' -C2
rg -nP 'OnLeaderStop\s*\(\s*int64_t\s+term\)' -C2Length of output: 12248
The web search returned Android-specific results unrelated to the codebase. Let me search the codebase directly for IntMemoryStore implementations.
🏁 Script executed:
#!/bin/bash
# Find IntMemoryStore implementation and check if it overrides these methods
fd -e h -e hpp -e cpp 'int_mem_store' | head -20
echo "---- Searching IntMemoryStore for method overrides ----"
rg -A 30 'class IntMemoryStore' --type=cpp --type=hLength of output: 2142
Critical signature mismatches confirmed across multiple implementations—builds will break
Three methods have signature mismatches between base class and derived implementations:
-
CcNode::OnLeaderStart (include/fault/cc_node.h:91, src/fault/cc_node.cpp:257): Missing
ng_idparameter. Current:(int64_t term, ...)should be(uint32_t ng_id, int64_t term, ...) -
CcNode::OnLeaderStop (include/fault/cc_node.h:95, src/fault/cc_node.cpp:472): Missing
ng_idparameter. Current:(int64_t term)should be(uint32_t ng_id, int64_t term) -
CcNode::OnStartFollowing (include/fault/cc_node.h:102, src/fault/cc_node.cpp:692): Completely wrong signature. Current:
(uint32_t node_id, int64_t term, bool resubscribe)should be(uint32_t ng_id, uint32_t leader_node_id, int64_t term, int64_t standby_term, bool resubscribe) -
Sharder::OnStartFollowing (include/sharder.h:388, src/sharder.cpp:906): Missing
standby_termparameter compared to base class.
Additionally, Sharder's calls to CcNode methods (src/sharder.cpp lines 858, 872, 920) pass arguments incompletely or in wrong order, compounding the issue.
CcNode's own calls to DataStoreHandler are correct (src/fault/cc_node.cpp lines 355, 498, 977), confirming the base class signatures are correct and the derived class is outdated.
Update CcNode and Sharder implementations to match base class DataStoreHandler signatures, and fix all call sites accordingly.
🤖 Prompt for AI Agents
In include/store/data_store_handler.h around lines 386-402 the base virtuals
require signatures (OnStartFollowing(uint32_t ng_id, uint32_t leader_node_id,
int64_t term, int64_t standby_term, bool resubscribe), OnLeaderStart(uint32_t
ng_id, int64_t term, uint32_t *next_leader_node), OnLeaderStop(uint32_t ng_id,
int64_t term)); update the derived implementations to match these exact
signatures: modify CcNode declarations/definitions (include/fault/cc_node.h
around line 91 and src/fault/cc_node.cpp at the noted lines ~257, ~472, ~692)
and Sharder (include/sharder.h ~388 and src/sharder.cpp around lines ~858, ~872,
~906, ~920) to add the missing ng_id and standby_term parameters and reorder
parameters to match the base; then fix all call sites in src/sharder.cpp and
src/fault/cc_node.cpp so arguments are passed in the correct order and include
the new standby_term/ng_id values (verify CcNode->DataStoreHandler calls remain
consistent), recompile and run tests to ensure signatures now match across base
and derived classes.
32a8c3e to
a6fc398
Compare
a6fc398 to
e16c240
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/cc/local_cc_shards.cpp (1)
1949-1970: Compile fix: add<random>header for std::mt19937 and std::uniform_int_distributionThe file uses
std::mt19937(lines 1950, 2127) andstd::uniform_int_distribution(lines 1951, 2128) but does not include the required<random>header, causing a compile error.Add the include between
<optional>and<shared_mutex>:#include <optional> +#include <random> #include <shared_mutex>
♻️ Duplicate comments (1)
include/sharder.h (1)
239-242: Address sign-safety concern for partition_id parameter.The function accepts
int32_t partition_id, but if a caller passes a negative value, the modulo operation followed by cast touint32_tyields a large positive value rather than an error. This could silently produce incorrect bucket mappings.Consider one of these solutions:
Option 1 (Preferred): Change parameter to unsigned
-static inline uint32_t MapHashPartitionIdToBucketId(int32_t partition_id) +static inline uint32_t MapHashPartitionIdToBucketId(uint32_t partition_id) { return static_cast<uint32_t>(partition_id % total_range_buckets); }Option 2: Assert non-negative and handle cast explicitly
static inline uint32_t MapHashPartitionIdToBucketId(int32_t partition_id) { + assert(partition_id >= 0); + assert(total_hash_partitions <= total_range_buckets); return static_cast<uint32_t>(partition_id % total_range_buckets); }Based on past review comments.
🧹 Nitpick comments (5)
include/sharder.h (2)
225-226: Add defensive guard against zero before modulo operation.Although
total_hash_partitionsis a compile-time constant (0x400), adding a runtime assertion provides defensive programming and makes the invariant explicit.Apply this diff:
static inline int32_t MapKeyHashToHashPartitionId(uint64_t hash_code) { + assert(total_hash_partitions > 0); return static_cast<int32_t>(hash_code % total_hash_partitions); }Based on past review comments.
233-237: Add zero-guard assertion for consistency.While the assertion
total_hash_partitions <= total_range_bucketsis good, consider also assertingtotal_hash_partitions > 0before the modulo operation to guard against potential misconfiguration and maintain consistency with the suggested pattern forMapKeyHashToHashPartitionId.Apply this diff:
static inline int32_t MapBucketIdToHashPartitionId(uint16_t bucket_id) { assert(total_hash_partitions <= total_range_buckets); + assert(total_hash_partitions > 0); return bucket_id % total_hash_partitions; }src/cc/local_cc_shards.cpp (3)
1972-1991: Ring-mapping robustness: avoid param shadowing and advance across multiple thresholds
- Shadows function parameter
ng_id; rename local toowner_ngfor clarity.- Use
while(not singleif) when stepping to the next virtual node; future adjustments (e.g., dedup relaxing) won’t surprise us.- auto it = rand_num_to_ng.begin(); + auto it = rand_num_to_ng.begin(); for (uint16_t bucket_id = 0; bucket_id < total_range_buckets; bucket_id++) { // The buckets larger than the last random number belongs to the // first virtual node on the ring. - if (it != rand_num_to_ng.end() && bucket_id >= it->first) - { - it++; - } - NodeGroupId ng_id = it == rand_num_to_ng.end() - ? rand_num_to_ng.begin()->second - : it->second; + while (it != rand_num_to_ng.end() && bucket_id >= it->first) { + ++it; + } + NodeGroupId owner_ng = it == rand_num_to_ng.end() + ? rand_num_to_ng.begin()->second + : it->second; - auto insert_res = ng_bucket_infos.try_emplace( - bucket_id, std::make_unique<BucketInfo>(ng_id, version)); + auto insert_res = ng_bucket_infos.try_emplace( + bucket_id, std::make_unique<BucketInfo>(owner_ng, version)); if (insert_res.second) { - insert_res.first->second->Set(ng_id, version); + insert_res.first->second->Set(owner_ng, version); } }Optional: add a guard to reflect the 16‑bit bucket id domain:
+ static_assert(total_range_buckets <= std::numeric_limits<uint16_t>::max(), + "total_range_buckets must fit in uint16_t");
2127-2147: DRY the ring construction (shared logic in two places)The “64 vnodes per NG” ring build is duplicated here and in InitRangeBuckets. Extract a small helper (e.g., BuildVirtualRing(node_groups)) returning std::map<uint16_t, NodeGroupId>, then reuse in both call sites. Reduces drift and eases testing.
2161-2176: Same nits as above: avoidng_idshadowing and usewhileMirror the InitRangeBuckets fixes here for consistency and readability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
include/cc/local_cc_shards.h(1 hunks)include/sharder.h(1 hunks)include/store/data_store_handler.h(1 hunks)include/type.h(1 hunks)src/cc/local_cc_shards.cpp(3 hunks)src/fault/cc_node.cpp(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- include/cc/local_cc_shards.h
- src/fault/cc_node.cpp
- include/store/data_store_handler.h
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
Applied to files:
src/cc/local_cc_shards.cppinclude/type.hinclude/sharder.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
src/cc/local_cc_shards.cppinclude/type.h
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Applied to files:
src/cc/local_cc_shards.cpp
📚 Learning: 2025-10-21T06:46:53.700Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 149
File: src/remote/cc_stream_receiver.cpp:1066-1075
Timestamp: 2025-10-21T06:46:53.700Z
Learning: In src/remote/cc_stream_receiver.cpp, for ScanNextRequest handling, BucketIds() on RemoteScanNextBatch should never be empty—this is an expected invariant of the scan protocol.
Applied to files:
include/sharder.h
🔇 Additional comments (1)
include/type.h (1)
596-598: LGTM!The bucket count has been correctly updated to 1024 (0x400) to match the hash partition count, and the new
total_hash_partitionsconstant provides clear separation between bucket and partition concepts. The comment accurately reflects the change.
| auto core_number = cc_shards_.size(); | ||
| constexpr size_t partition_number = 1024; | ||
| auto partition_number_this_core = | ||
| partition_number / core_number + | ||
| (worker_idx < partition_number % core_number); | ||
| total_hash_partitions / core_number + | ||
| (worker_idx < total_hash_partitions % core_number); | ||
| std::vector<size_t> partition_ids; |
There was a problem hiding this comment.
🧩 Analysis chain
Safety: guard against zero cores before dividing
Add an assert to prevent a potential div‑by‑zero if misconfigured with 0 cores.
- auto core_number = cc_shards_.size();
+ auto core_number = cc_shards_.size();
+ assert(core_number > 0 && "cc_shards_.size() must be > 0");
auto partition_number_this_core =
total_hash_partitions / core_number +
(worker_idx < total_hash_partitions % core_number);Please confirm core_num is validated > 0 at config load time.
I'll verify whether this division-by-zero concern is valid by examining the codebase.
🏁 Script executed:
#!/bin/bash
# Step 1: Look at the context around lines 4376-4380
echo "=== Context around lines 4376-4380 ==="
sed -n '4370,4385p' src/cc/local_cc_shards.cpp
echo ""
echo "=== Search for cc_shards_ initialization in LocalCcShards ==="
rg -n "cc_shards_" include/cc/local_cc_shards.h -A 2 -B 2 | head -50
echo ""
echo "=== Search for constructor and initialization ==="
rg -n "LocalCcShards\(" include/cc/local_cc_shards.h -A 10 | head -40
echo ""
echo "=== Search for config validation at load time ==="
rg -n "core.*num\|cc_shards.*size\|validate.*core\|config.*load" src/cc/local_cc_shards.cpp -B 2 -A 2 | head -60Length of output: 3648
🏁 Script executed:
#!/bin/bash
# Find the constructor implementation in the .cpp file
echo "=== LocalCcShards constructor in .cpp file ==="
rg -n "LocalCcShards::LocalCcShards" src/cc/local_cc_shards.cpp -A 100 | head -150
echo ""
echo "=== cc_shards_ member variable declaration ==="
rg -n "std::vector.*cc_shards_\|unique_ptr.*cc_shards_" include/cc/local_cc_shards.h
echo ""
echo "=== Search for where cc_shards_ is populated/resized ==="
rg -n "cc_shards_.*resize\|cc_shards_.*push\|cc_shards_.*emplace" src/cc/local_cc_shards.cpp -B 3 -A 3
echo ""
echo "=== Search for validation of core_number or cc_shards size ==="
rg -n "assert.*cc_shards\|assert.*core\|if.*cc_shards.*size\|if.*core.*num" src/cc/local_cc_shards.cpp -B 2 -A 2 | head -40Length of output: 4587
🏁 Script executed:
#!/bin/bash
# Search more broadly for cc_shards_ in header
echo "=== Search for cc_shards_ member variable in header ==="
rg -n "cc_shards_" include/cc/local_cc_shards.h | tail -20
echo ""
echo "=== Look for where cc_shards_ is assigned in constructor body ==="
rg -n "cc_shards_\s*=" src/cc/local_cc_shards.cpp -B 2 -A 5 | head -80
echo ""
echo "=== Check for make_unique or initialization of cc_shards_ ==="
rg -n "make_unique\|new CcShard" src/cc/local_cc_shards.cpp -B 3 -A 3 | head -60
echo ""
echo "=== Search entire constructor for cc_shards and core_num ==="
sed -n '71,500p' src/cc/local_cc_shards.cpp | grep -n "cc_shards\|core_num" | head -30Length of output: 2076
🏁 Script executed:
#!/bin/bash
# Get full constructor body around cc_shards_ initialization
echo "=== Constructor body lines 100-150 from .cpp ==="
sed -n '100,150p' src/cc/local_cc_shards.cpp
echo ""
echo "=== Search for core_num validation in entire file ==="
rg -n "core_num\s*>\s*0\|core_num\s*!=\s*0\|core_num\s*[><]=\|assert.*core_num" src/cc/local_cc_shards.cpp
echo ""
echo "=== Search for config validation for core_num elsewhere ==="
rg -n "core_num.*validate\|validate.*core\|core_num.*check" . --type=cpp --type=h 2>/dev/null | head -20Length of output: 2142
Add assert to guard against zero cores causing division-by-zero
The division-by-zero risk at lines 4377-4378 is valid. Investigation confirms core_num is never validated to be > 0 at config load time. If misconfigured to 0, cc_shards_ would be empty, causing undefined behavior. The suggested assert provides necessary runtime protection.
auto core_number = cc_shards_.size();
+ assert(core_number > 0 && "cc_shards_.size() must be > 0");
auto partition_number_this_core =
total_hash_partitions / core_number +
(worker_idx < total_hash_partitions % core_number);While ideally core_num should be validated at config load time, adding this assert is essential for safety at the division point.
🤖 Prompt for AI Agents
In src/cc/local_cc_shards.cpp around lines 4376 to 4380, the code computes
partition_number_this_core by dividing by core_number which can be zero if
cc_shards_ is empty; add a runtime guard (e.g. an assert or precondition) to
ensure cc_shards_.size() > 0 (or core_number > 0) immediately before the
division to prevent division-by-zero and fail fast with a clear message; if
assertions are not desired in production, replace with an explicit check that
logs an error and returns/throws appropriately.
e16c240 to
bb5ab20
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (5)
include/store/data_store_handler.h (1)
386-402: Critical signature mismatches remain unresolved in derived classes.The signature updates in the base class are not yet reflected in derived implementations (
CcNodeandSharder), causing parameter mismatches. This will lead to build failures or incorrect runtime behavior.include/sharder.h (2)
225-226: Add defensive guard against zero partitions.Although
total_hash_partitionsis currently a compile-time constant (0x0400), adding a lightweight assertion prevents undefined behavior if the constant is ever misconfigured.static inline int32_t MapKeyHashToHashPartitionId(uint64_t hash_code) { + assert(total_hash_partitions > 0); return static_cast<int32_t>(hash_code % total_hash_partitions); }
239-242: Handle negative partition_id to prevent incorrect bucket mapping.If
partition_idis negative, the signed modulo operation followed by cast touint32_tproduces a large incorrect value. Either change the parameter type touint32_tor add an assertion to validate non-negativity.static inline uint32_t MapHashPartitionIdToBucketId(int32_t partition_id) { + assert(partition_id >= 0); + assert(total_hash_partitions <= total_range_buckets); return static_cast<uint32_t>(partition_id % total_range_buckets); }Alternatively, change to unsigned:
-static inline uint32_t MapHashPartitionIdToBucketId(int32_t partition_id) +static inline uint32_t MapHashPartitionIdToBucketId(uint32_t partition_id) { + assert(total_hash_partitions <= total_range_buckets); return static_cast<uint32_t>(partition_id % total_range_buckets); }src/cc/local_cc_shards.cpp (2)
1945-1969: Add<random>include forstd::mt19937/std::uniform_int_distributionand avoid code duplicationThis block correctly replaces
rand()with a thread‑safe, deterministic RNG, but:
std::mt19937andstd::uniform_int_distributionrequire<random>; without it this TU is ill‑formed in standard C++. Please add the header.#include <tuple> #include <unordered_map>
- #include
- The “64 virtual nodes per node group” RNG logic is duplicated (with the same seed scheme) in `GenerateBucketMigrationPlan`; consider extracting a small helper (e.g. `BuildNodeGroupRing(const std::set<NodeGroupId>&)` returning the `std::map<uint16_t, NodeGroupId>`) to keep behavior in sync and simplify future changes. --- `4378-4399`: **Guard division by `core_number` to prevent potential divide-by-zero in hash-partition data sync** `core_number` comes from `cc_shards_.size()`. If configuration ever yields `core_num == 0` (and thus `cc_shards_` empty), `total_hash_partitions / core_number` and `total_hash_partitions % core_number` are undefined behavior. Add a precondition check here (and ideally validate `core_num > 0` at config load) to fail fast: ```diff - auto core_number = cc_shards_.size(); - auto partition_number_this_core = + auto core_number = cc_shards_.size(); + assert(core_number > 0 && "cc_shards_.size() must be > 0"); + auto partition_number_this_core = total_hash_partitions / core_number + (worker_idx < total_hash_partitions % core_number);This keeps the new
total_hash_partitions‑based distribution while protecting against misconfiguration.
🧹 Nitpick comments (1)
src/cc/local_cc_shards.cpp (1)
2123-2147: Share the per-node-group RNG ring construction between init and migration pathsThis block mirrors the RNG logic in
InitRangeBuckets(same seed, same distribution, same collision handling). To reduce drift risk and simplify reasoning about bucket assignment, factor this into a shared helper that both call sites use, e.g.:- std::map<uint16_t, NodeGroupId> rand_num_to_ng; - // use ng id as seed to generate random numbers - for (auto ng : new_node_groups) - { - std::mt19937 rng(ng); - std::uniform_int_distribution<uint16_t> dist( - 0, total_range_buckets - 1); - ... - } + std::map<uint16_t, NodeGroupId> rand_num_to_ng = + BuildBucketRingForNodeGroups(new_node_groups);This keeps the assignment algorithm identical for initialization and migration without duplicating the RNG details.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
include/cc/local_cc_shards.h(1 hunks)include/sharder.h(1 hunks)include/store/data_store_handler.h(1 hunks)include/type.h(1 hunks)src/cc/local_cc_shards.cpp(3 hunks)src/fault/cc_node.cpp(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/fault/cc_node.cpp
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
Applied to files:
include/sharder.hinclude/cc/local_cc_shards.hsrc/cc/local_cc_shards.cppinclude/type.h
📚 Learning: 2025-10-21T06:46:53.700Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 149
File: src/remote/cc_stream_receiver.cpp:1066-1075
Timestamp: 2025-10-21T06:46:53.700Z
Learning: In src/remote/cc_stream_receiver.cpp, for ScanNextRequest handling, BucketIds() on RemoteScanNextBatch should never be empty—this is an expected invariant of the scan protocol.
Applied to files:
include/sharder.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
include/cc/local_cc_shards.hsrc/cc/local_cc_shards.cppinclude/type.h
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Applied to files:
include/cc/local_cc_shards.hsrc/cc/local_cc_shards.cpp
🧬 Code graph analysis (1)
include/store/data_store_handler.h (3)
src/fault/cc_node.cpp (6)
OnStartFollowing(692-716)OnStartFollowing(692-692)OnLeaderStart(257-470)OnLeaderStart(257-260)OnLeaderStop(472-521)OnLeaderStop(472-472)src/remote/cc_node_service.cpp (6)
OnStartFollowing(125-145)OnStartFollowing(125-129)OnLeaderStart(60-111)OnLeaderStart(60-63)OnLeaderStop(113-123)OnLeaderStop(113-116)src/sharder.cpp (6)
OnStartFollowing(906-921)OnStartFollowing(906-909)OnLeaderStart(843-859)OnLeaderStart(843-847)OnLeaderStop(861-873)OnLeaderStop(861-861)
🔇 Additional comments (2)
include/type.h (1)
596-598: LGTM!The bucket and hash partition count reduction from 16384 to 1024 is correctly implemented. Both constants are set to the same power-of-2 value (0x0400 = 1024), which aligns with the PR objectives and supports efficient bitwise operations.
include/sharder.h (1)
233-237: LGTM!The function rename from
MapBucketIdToKvPartitionIdtoMapBucketIdToHashPartitionIdbetter reflects its purpose. The assertion ensures the invariant that hash partitions don't exceed bucket count.
bb5ab20 to
826118b
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (4)
include/store/data_store_handler.h (1)
386-402: Virtual API updated to be ng-aware—ensure all overrides are syncedThe three datastore lifecycle hooks now include
ng_id(andstandby_termforOnStartFollowing), while retaining no-op /truedefaults. That’s consistent with how the call sites were updated inCcNode.Since these are
virtualmethods on a central base class, please double‑check that all derivedDataStoreHandlerimplementations and any direct calls are updated to match the new signatures, especially the removal oftermfromOnLeaderStart. Also consider marking unused parameters in the default bodies (e.g.,uint32_t /*ng_id*/) if your build treats them as warnings.include/sharder.h (1)
223-241: Document/enforce non‑negative partition IDs inMapHashPartitionIdToBucketId.
MapHashPartitionIdToHashPartitionId/MapBucketIdToHashPartitionIdimply partition IDs live in[0, total_hash_partitions), butMapHashPartitionIdToBucketId(int32_t partition_id)will silently turn negative IDs into very large bucket IDs viastatic_cast<uint32_t>(partition_id % total_range_buckets).Consider tightening this API to reflect the invariant:
- Either change the parameter to
uint32_t(or a dedicatedPartitionIdtypedef) and keep the modulo, or- Keep
int32_tbut add runtime asserts up front, e.g.:static inline uint32_t MapHashPartitionIdToBucketId(int32_t partition_id) { assert(partition_id >= 0); assert(static_cast<uint32_t>(partition_id) < static_cast<uint32_t>(total_hash_partitions)); return static_cast<uint32_t>(partition_id) % total_range_buckets; }This documents assumptions and prevents accidental negative inputs from mapping to out‑of‑range bucket IDs.
src/cc/local_cc_shards.cpp (2)
1949-1969: Include<random>forstd::mt19937/std::uniform_int_distribution.
InitRangeBucketsnow usesstd::mt19937andstd::uniform_int_distribution<uint16_t>but this translation unit does not include<random>. Relying on transitive includes is fragile and can break compilation.Add the missing header alongside the other standard headers near the top of this file:
#include <tuple> #include <unordered_map> +#include <random>This also covers the identical RNG usage in
GenerateBucketMigrationPlan.
4378-4399: Guard against zerocore_numberbefore dividing by it.Here
core_number = cc_shards_.size()is used as a divisor when computingpartition_number_this_core:auto core_number = cc_shards_.size(); auto partition_number_this_core = total_hash_partitions / core_number + (worker_idx < total_hash_partitions % core_number);If
cc_shards_is ever empty (e.g., misconfiguredcore_num), this becomes a division‑by‑zero andpartition_number_this_coreis undefined.Add a fast precondition right after computing
core_number:auto core_number = cc_shards_.size(); assert(core_number > 0 && "cc_shards_.size() must be > 0");(or handle the error in a way consistent with your config validation), then keep the existing logic.
🧹 Nitpick comments (1)
src/cc/local_cc_shards.cpp (1)
2123-2147: RNG change looks good; consider extracting shared helper for ring construction.The new
std::mt19937+std::uniform_int_distribution<uint16_t>(0, total_range_buckets - 1)usage inGenerateBucketMigrationPlanmirrorsInitRangeBuckets, giving deterministic, thread‑safe virtual‑node placement per node group.To avoid divergence between initialization and migration logic, consider extracting the “virtual node ring” construction (the 64 random positions per node group and the
rand_num_to_ngmap) into a shared helper function used by both call sites.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
include/cc/local_cc_shards.h(1 hunks)include/sharder.h(1 hunks)include/store/data_store_handler.h(1 hunks)include/type.h(1 hunks)src/cc/local_cc_shards.cpp(3 hunks)src/fault/cc_node.cpp(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- include/type.h
- include/cc/local_cc_shards.h
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
Applied to files:
src/cc/local_cc_shards.cppinclude/sharder.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
src/cc/local_cc_shards.cpp
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Applied to files:
src/cc/local_cc_shards.cpp
📚 Learning: 2025-10-21T06:46:53.700Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 149
File: src/remote/cc_stream_receiver.cpp:1066-1075
Timestamp: 2025-10-21T06:46:53.700Z
Learning: In src/remote/cc_stream_receiver.cpp, for ScanNextRequest handling, BucketIds() on RemoteScanNextBatch should never be empty—this is an expected invariant of the scan protocol.
Applied to files:
include/sharder.h
🧬 Code graph analysis (2)
src/fault/cc_node.cpp (2)
include/tx_request.h (1)
ng_id_(1203-1219)include/table_statistics.h (1)
ng_id_(516-516)
include/store/data_store_handler.h (3)
src/fault/cc_node.cpp (6)
OnStartFollowing(692-716)OnStartFollowing(692-692)OnLeaderStart(257-470)OnLeaderStart(257-260)OnLeaderStop(472-521)OnLeaderStop(472-472)src/remote/cc_node_service.cpp (6)
OnStartFollowing(125-145)OnStartFollowing(125-129)OnLeaderStart(60-111)OnLeaderStart(60-63)OnLeaderStop(113-123)OnLeaderStop(113-116)src/sharder.cpp (6)
OnStartFollowing(906-921)OnStartFollowing(906-909)OnLeaderStart(843-859)OnLeaderStart(843-847)OnLeaderStop(861-873)OnLeaderStop(861-861)
🔇 Additional comments (2)
src/fault/cc_node.cpp (2)
353-361: Forwardingng_id_intoDataStoreHandlerleader callbacks looks correctBoth
OnLeaderStart(Line 355) andOnLeaderStop(Line 498) now passng_id_as the first argument, matching the updatedDataStoreHandlersignatures and preserving existing leader/term logic inCcNode. No issues from a correctness or threading perspective.Also applies to: 496-503
975-979:OnStartFollowingcall now aligned with new datastore APIThe call to
store_hd->OnStartFollowing(Lines 977-979) correctly forwards(ng_id_, leader_node_id, primary_term, standby_term, resubscribe), matching the newDataStoreHandler::OnStartFollowingsignature and using the samestandby_termthat is stored inSharder. This keeps store behavior consistent with CC/raft state.
add total_hash_partitions and set to 1024 Replace srand/rand with std::mt19937 to calcualte the distribution of buckets in InitRangeBuckets() for srand/rand is not thread-safe
826118b to
fb2a4a4
Compare
srand()/rand()withstd::mt19937to generate the distribution of buckets inInitRangeBuckets()forsrand()/rand()is not thread-safe.Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.