support remote scan next batch#149
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis pull request refactors the distributed scanning subsystem by removing local-path batch scanning APIs, restructuring scan progress tracking with explicit bucket-level state, adding direct execution paths via Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant LocalCcHandler
participant BucketScanPlan
participant CcShard
participant FetchBucketDataCc
participant CcMap
rect rgb(200, 220, 255)
Note over Client,CcMap: New unified ScanNextBatch flow (local path)
Client->>LocalCcHandler: ScanNextBatch(..., is_local=true)
LocalCcHandler->>LocalCcHandler: Check leadership, determine is_local
LocalCcHandler->>BucketScanPlan: new BucketScanPlan(pause_position, ...)
BucketScanPlan->>BucketScanPlan: Initialize per-bucket progress
LocalCcHandler->>CcShard: For each bucket, FetchBucketData(is_local=true, ...)
CcShard->>CcShard: Cast requester to ScanNextBatchCc
CcShard->>CcShard: IncreaseWaitForFetchBucketCnt(core_id_)
CcShard->>FetchBucketDataCc: Reset(..., is_local=true, ...)
FetchBucketDataCc->>CcMap: Execute (backfill)
CcMap-->>FetchBucketDataCc: results
rect rgb(255, 200, 200)
Note over FetchBucketDataCc,CcMap: Error handling branches on is_local
end
FetchBucketDataCc->>ScanNextBatchCc: Enqueue (if is_local=true)
end
rect rgb(200, 220, 255)
Note over Client,CcMap: New unified ScanNextBatch flow (remote path)
Client->>LocalCcHandler: ScanNextBatch(..., is_local=false, remote_node)
LocalCcHandler->>BucketScanPlan: new BucketScanPlan(pause_position, ...)
LocalCcHandler->>RemoteCcHandler: ScanNext(table_name, start_key, end_key, bucket_ids, progress, ...)
RemoteCcHandler->>RemoteCcHandler: Populate ScanNextRequest with table metadata
RemoteCcHandler->>RemoteCcHandler: Branch on bucket_scan_progress empty/non-empty
alt First plan (progress empty)
RemoteCcHandler->>RemoteCcHandler: Create BucketScanInfoMsg with start_key, bucket_ids
else Resume (progress non-empty)
RemoteCcHandler->>RemoteCcHandler: Populate BucketScanProgressMap per-core
end
RemoteCcHandler->>Network: Send ScanNextRequest
end
sequenceDiagram
participant RemoteScanNextBatch
participant CcShard
participant CcMap
rect rgb(200, 220, 255)
Note over RemoteScanNextBatch,CcMap: New Execute(CcShard&) override
RemoteScanNextBatch->>RemoteScanNextBatch: Execute(ccs) override
RemoteScanNextBatch->>RemoteScanNextBatch: Validate term (leader vs standby)
RemoteScanNextBatch->>RemoteScanNextBatch: GetCcm via table catalog
alt Catalog hit
RemoteScanNextBatch->>CcMap: GetCcmExecutor(term_check, ...)
rect rgb(200, 255, 200)
Note over RemoteScanNextBatch,CcMap: Delegate to cc map execution
end
RemoteScanNextBatch->>CcMap: Execute()
CcMap-->>RemoteScanNextBatch: result
else Catalog miss or drop
rect rgb(255, 200, 200)
Note over RemoteScanNextBatch: Return error (CATALOG_NOT_FOUND, etc.)
end
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Rationale: This PR spans ~25 files with substantial heterogeneous changes across the scanning subsystem—removing legacy APIs, introducing new proto message structures, refactoring cache internals with renamed containers and new iterator support, transitioning remote scan tracking from pointers to explicit bucket progress, and adding dual local/remote logic branches throughout. The interconnected nature of scanner, handler, request, and proto changes, combined with moderate logic density in Execute overrides and control flow updates, necessitates careful cross-file reasoning. Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
7ec291f to
fc3b582
Compare
7c7076b to
14ca731
Compare
14ca731 to
b13bb2b
Compare
7952363 to
6247426
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 15
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (7)
include/cc/ccm_scanner.h (2)
655-667: Overzealous assert when KV cache missingAsserting DataStoreHandler==nullptr on a missing KV cache is brittle. Returning nullptr is sufficient; let callers branch on it.
Apply:
- else - { - assert(Sharder::Instance().GetDataStoreHandler() == nullptr); - return nullptr; - } + else + { + return nullptr; + }
309-316: Critical behavior mismatch: RangePartitionedCcmScanner uses SetPackedKey but DecodeKey expects DeserializeThe code at line 309 uses
SetPackedKeyin the genericTemplateScanCache::AddScanTuplemethod, which is shared by both hash and range partitions. However,RangePartitionedCcmScanner::DecodeKey(line 1325) usesDeserializeto reconstruct keys, creating a contract violation.
- HashPartitionCcScanner: SetPackedKey in DecodeKey (line 909) + SetPackedKey in AddScanTuple (line 309) ✓ consistent
- RangePartitionedCcmScanner: Deserialize in DecodeKey (line 1329) + SetPackedKey in AddScanTuple (line 309) ✗ inconsistent
Range-partitioned keys stored as packed via SetPackedKey cannot be properly deserialized later.
RangePartitionedCcmScannermust either:
- Override
AddScanTupleto useDeserializefor range-partitioned keys, or- Store keys differently and adjust
DecodeKeyaccordinglyVerify producers provide appropriately formatted keys (packed for hash, serialized for range) and that all code paths respect this contract.
src/remote/cc_stream_receiver.cpp (1)
1053-1063: Do not crash on ScanOpenResponse; handle gracefully.Same availability risk: replace assert with logging + error propagation to the waiting handler to avoid remote hangs.
- assert(false && "Unimplemented"); + LOG(ERROR) << "ScanOpenResponse is unsupported; dropping."; + msg_pool_.enqueue(std::move(msg));src/cc/cc_shard.cpp (1)
1596-1614: Unsafe static_cast downcasts confirmed; fix required.The review is correct. Your code at lines 1624-1637 uses unsafe
static_cast<ScanNextBatchCc *>andstatic_cast<remote::RemoteScanNextBatch *>to downcast fromCcRequestBase *requester. SinceCcRequestBaseis polymorphic (has a virtual destructor), these downcasts silently succeed at compile time but have no runtime type checking. If an unexpected type reaches this code, you'll get undefined behavior.The issue is confirmed:
CcRequestBase(include/cc/cc_req_base.h:37) is polymorphic withvirtual ~CcRequestBase()- Both
ScanNextBatchCcandRemoteScanNextBatchinherit fromCcRequestBaseviaTemplatedCcRequestIncreaseWaitForFetchBucketCnt()is non-virtual in both derived classes, not declared in the baseEither: (1) use
dynamic_cast+ runtime assertions to catch contract violations in debug builds, or (2) add a virtualIncreaseWaitForFetchBucketCnt()toCcRequestBaseso the call can be made polymorphically without downcasting. Option 2 is preferred to avoid RTTI overhead.src/tx_execution.cpp (1)
2553-2608: Fix infinite loop whenread_local_is enabledFiltering out non-local buckets means
seen_bucket_cntnever reachesSharder::Instance().TotalRangeBuckets(), so thewhile (seen_bucket_cnt < total_bucket_cnt)loop spins forever. This wedges every read-local hash scan. Recomputetotal_bucket_cntfrom the buckets that actually remain ingroupedbefore entering the loop.- size_t plan_bucket_cnt_soft_limit = 256; - size_t total_bucket_cnt = Sharder::Instance().TotalRangeBuckets(); + size_t plan_bucket_cnt_soft_limit = 256; + size_t total_bucket_cnt = 0; + for (const auto &[ignored_ng, core_map] : grouped) + { + for (const auto &[ignored_core_idx, bucket_vec] : core_map) + { + total_bucket_cnt += bucket_vec.size(); + } + }src/cc/local_cc_handler.cpp (1)
1087-1156: Guard against null Scan plan before dereferencing.You dereference hd_res.Value().current_scan_plan_ without a null check. If a caller forgets to attach a plan, this will crash before the assert on Buckets(). Add a fast guard and surface a typed error.
Apply this diff:
- BucketScanPlan *plan = hd_res.Value().current_scan_plan_; + BucketScanPlan *plan = hd_res.Value().current_scan_plan_; + if (UNLIKELY(plan == nullptr)) { + hd_res.SetError(CcErrorCode::INVALID_SCAN_PLAN); + return; + }Also consider converting the assert on plan->Buckets().count(node_group_id) into an error at runtime in release builds.
include/remote/remote_cc_handler.h (1)
162-187: Add direct include for absl::flat_hash_map to avoid fragile transitive dependency.The file uses
absl::flat_hash_mapin the publicScanNextsignature but relies on a transitive include fromtx_operation_result.h. Direct includes should be explicit for all types used in public interfaces.BucketScanProgressvisibility is not a concern sincetx_operation_result.his already included.Apply this diff:
#include <memory> #include <vector> +#include "absl/container/flat_hash_map.h"
🧹 Nitpick comments (14)
src/tx_worker_pool.cpp (1)
91-91: Consider accepting the parameter by rvalue reference for efficiency.The
workparameter is passed by value, which copies the entire vector and all contained function objects. Consider usingstd::vector<std::function<void(size_t)>>&& workto enable move semantics and avoid unnecessary allocations.Apply this diff:
-void TxWorkerPool::BulkSubmitWork(std::vector<std::function<void(size_t)>> work) +void TxWorkerPool::BulkSubmitWork(std::vector<std::function<void(size_t)>>&& work)include/tx_worker_pool.h (1)
46-46: Consider accepting the parameter by rvalue reference for efficiency.The
workparameter is passed by value, which copies the entire vector and all contained function objects. Consider usingstd::vector<std::function<void(size_t)>>&& workto enable move semantics and avoid unnecessary allocations.Apply this diff:
- void BulkSubmitWork(std::vector<std::function<void(size_t)>> work); + void BulkSubmitWork(std::vector<std::function<void(size_t)>>&& work);include/tx_operation_result.h (2)
246-252: Guard against overfilling memory_cache_hash_codes_Add a capacity check to keep hash codes aligned with memory tuples.
Apply this diff:
void AddHashCode(size_t hash_code) { - memory_cache_hash_codes_.push_back(hash_code); + if (memory_cache_hash_codes_.size() < capacity_) { + memory_cache_hash_codes_.push_back(hash_code); + } else { + assert(false && "memory_cache_hash_codes_ exceeds capacity"); + } assert(memory_cache_hash_codes_.size() == static_cast<size_t>( shard_cache_->memory_scan_cache().scan_tuple_size())); }
793-821: pause_position handling: copy vs reference semanticsResuming by copying BucketScanProgress entries is fine, but confirm the caller supplies a fully-populated map (all node groups) and accepts the copy cost. If large, consider passing a const reference and moving selectively.
include/cc/ccm_scanner.h (2)
602-617: Rename ToTalSize() -> TotalSize() (and call site)Nit but improves readability; also mark const.
Apply:
- size_t ToTalSize() + size_t TotalSize() const { size_t size = 0; if (memory_cache_) { size += memory_cache_->Size(); } for (const auto &[bucket_id, kv_cache] : kv_caches_) { size += kv_cache->Size(); } return size; }And:
- shard_code_and_sizes->emplace_back(shard_code, cache->ToTalSize()); + shard_code_and_sizes->emplace_back(shard_code, cache->TotalSize());Also applies to: 880-881
1178-1179: Remove unused member cache_offset_Not referenced; drop it.
Apply:
- size_t cache_offset_{0};include/sharder.h (1)
230-234: Rename to TotalRangeBuckets is finePublic API rename is OK. If external modules still call the old name, consider a temporary inline alias to ease migration.
include/cc/cc_req_misc.h (1)
614-631: Name the locality explicitly; consider an enum instead of a bool.Passing a bare bool is_local obscures call intent. Prefer an enum (e.g., ScanOrigin::Local/Remote) or at least document it in the signature.
Apply if you opt for an enum:
- CcShard *ccs, - bool is_local, + CcShard *ccs, + ScanOrigin origin,And map to a bool internally if needed.
src/cc/cc_req_misc.cpp (2)
905-947: Add defensive checks for requester type vs is_local_.Reset() stores is_local_ and later statically casts requester_ based on it. If a mismatch ever occurs, this is UB.
Apply this diff to assert intent:
void FetchBucketDataCc::Reset( ... - ccs_ = ccs; - is_local_ = is_local; + ccs_ = ccs; + is_local_ = is_local; + // Defensive: ensure requester matches locality to prevent invalid casts later. + if (is_local_) { + assert(dynamic_cast<ScanNextBatchCc*>(requester) != nullptr && + "requester must be ScanNextBatchCc when is_local=true"); + } else { + assert(dynamic_cast<remote::RemoteScanNextBatch*>(requester) != nullptr && + "requester must be RemoteScanNextBatch when is_local=false"); + }If RTTI is disabled, consider a small enum tag on the requester to avoid RTTI.
971-1009: Error path looks correct; minor nit on naming and visibility.DecreaseWaitForFetchBucketCnt/IsWaitForFetchBucket/WaitForFetchBucketCnt reads well. Consider making these clearly atomic (or document thread-affinity) to avoid confusion.
No code change required; add a short comment on thread ownership or atomicity of these counters.
src/remote/remote_cc_request.cpp (2)
1169-1210: Accessor robustness: avoid .at(core_id) on progress map.StartKeyType() and StartKeyStr() use .at(core_id) which throws on missing key. Prefer find() and fall back to global_info when absent.
Apply this pattern:
- start_key_case = scan_next.progress() - .progress() - .at(core_id) - .start_key() - .inner_key_case(); + const auto& prog_map = scan_next.progress().progress(); + auto it = prog_map.find(core_id); + if (it != prog_map.end()) { + start_key_case = it->second.start_key().inner_key_case(); + } else { + // Fallback: use global_info if present, else treat as -inf + start_key_case = scan_next.has_global_info() + ? scan_next.global_info().start_key().inner_key_case() + : RemoteTxKey::InnerKeyCase::kNegInf; + }Mirror the same for StartKeyStr().
1047-1168: Reset: good coverage of new fields; minor nits.
- Good: carry term, node_group_id, table metadata, ckpt flag, and build shard cache map per core.
- Nit: The assert(scan_next.direction()) will fail once backward scans are supported; convert to a runtime default (e.g., default to forward if field missing/false) until direction is fully wired.
include/remote/remote_cc_request.h (2)
503-516: Tighten atomic memory ordering for error path.SetErrorCode()/ErrorCode() use relaxed ordering. Given finalization uses unfinished_core_cnt_ with acq_rel, pair the final read with acquire (see SetFinish fix). For SetErrorCode, relaxed is fine, but document “first-wins” semantics.
Optionally add comments:
- err_.compare_exchange_strong(expected, - err_code, - std::memory_order_relaxed, - std::memory_order_relaxed); + // First error wins; relaxed is sufficient since SetFinish does an acquire load. + err_.compare_exchange_strong(expected, err_code, + std::memory_order_relaxed, + std::memory_order_relaxed);And prefer acquire in ErrorCode() when used outside SetFinish:
- return err_.load(std::memory_order_relaxed); + return err_.load(std::memory_order_acquire);
577-586: Const-correctness for accessors.Make pure getters const to improve API clarity and thread-safety expectations.
Apply this diff:
-int64_t NodeGroupTerm() +int64_t NodeGroupTerm() const { return ng_term_; }-const TxKey *EndKey() +const TxKey *EndKey() const { return &end_key_; }- CcErrorCode ErrorCode() + CcErrorCode ErrorCode() const { - return err_.load(std::memory_order_relaxed); + return err_.load(std::memory_order_acquire); }Optionally:
- bool IsWaitForFetchBucket(uint16_t core_id) + bool IsWaitForFetchBucket(uint16_t core_id) const { /* unchanged body using at(core_id) */ } - bool IsRequireRecs() + bool IsRequireRecs() const { return is_require_recs_; }Also applies to: 512-516, 489-493, 641-646, 701-704
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (25)
include/cc/cc_handler.h(0 hunks)include/cc/cc_req_misc.h(2 hunks)include/cc/cc_request.h(2 hunks)include/cc/cc_shard.h(1 hunks)include/cc/ccm_scanner.h(27 hunks)include/cc/local_cc_handler.h(0 hunks)include/proto/cc_request.proto(3 hunks)include/remote/remote_cc_handler.h(1 hunks)include/remote/remote_cc_request.h(3 hunks)include/sharder.h(1 hunks)include/store/data_store_handler.h(0 hunks)include/store/int_mem_store.h(0 hunks)include/tx_operation.h(1 hunks)include/tx_operation_result.h(6 hunks)include/tx_request.h(1 hunks)include/tx_worker_pool.h(1 hunks)src/cc/cc_entry.cpp(1 hunks)src/cc/cc_req_misc.cpp(3 hunks)src/cc/cc_shard.cpp(3 hunks)src/cc/local_cc_handler.cpp(3 hunks)src/remote/cc_stream_receiver.cpp(3 hunks)src/remote/remote_cc_handler.cpp(2 hunks)src/remote/remote_cc_request.cpp(9 hunks)src/tx_execution.cpp(7 hunks)src/tx_worker_pool.cpp(1 hunks)
💤 Files with no reviewable changes (4)
- include/cc/cc_handler.h
- include/cc/local_cc_handler.h
- include/store/data_store_handler.h
- include/store/int_mem_store.h
🔇 Additional comments (14)
src/cc/cc_entry.cpp (1)
163-169: Cosmetic formatting improvement in ClearLocks.The blank line insertion between read intent deletion and queued request cleanup improves readability by visually grouping related lock cleanup operations. No functional changes; the logic and control flow remain intact.
include/tx_request.h (1)
400-402: Remove the verification concern — no dangling reference issue exists.The
BucketScanPlanconstructor receivespause_positionas a reference parameter but does not store it. Instead, the constructor iterates throughpause_positionduring initialization and copies/moves its data into the owned member variablecurrent_position_. Thepause_positionreference is scoped to the constructor and is not retained, so there is no dangling reference risk.include/tx_operation_result.h (3)
217-227: RemoteScanCache: constructors/capacity init look goodCtor + reserve are consistent with default capacity; no issues.
234-239: Capacity/Full and memory cache accessor are consistentTuple-count-based capacity checks and mutable accessor look fine.
Also applies to: 241-245
299-327: min_key overwrite bug confirmed—applies to both memory and KV cache iterationThe bug is real: min_key is overwritten per bucket in the loop (lines 318-328) without comparison, selecting the last-iterated key instead of the actual minimum. The suggested fix correctly adds
if (min_key == nullptr || last_tuple.key() < *min_key)to perform proper comparison.The "Also applies to: 341-354" refers to usage lines within the same method where the incorrect min_key is consumed by RemoveLast() calls.
The kv_is_drained pre-population concern is also valid—the code uses
.at(bucket_id)at lines 319 and 321, which throws if the key doesn't exist. The caller must initialize this map with all bucket IDs before invoking Merge(). However, the exact initialization pattern in calling code could not be located in the codebase search results.include/cc/ccm_scanner.h (3)
74-84: ScanCache capacity_ adoption is correctCapacity now consistently gates Status/Reset/Full. Good cleanup.
Also applies to: 105-106, 124-125, 141-143
906-911: DecodeKey uses SetPackedKey — matches AddScanTuple changeConsistent with packed-key path. See verification script above.
913-941: ShardCache iterator driving Current/MoveNext looks solidInit skips empties; MoveNext walks memory then KV per shard then advances shards. LGTM.
Confirm no concurrent mutation of shard_caches_ while iterating.
Also applies to: 950-991
include/tx_operation.h (1)
496-501: Propagating ccm_scanner_ into hd_result_ is correctEnsures downstream users can read scanner state without relying on slice_hd_result_. Good.
include/cc/cc_shard.h (1)
743-761: All call sites correctly updated; is_local parameter properly integrated throughoutVerification confirms: The two main
FetchBucketDatacall sites (template_cc_map.h lines 2389, 3035) explicitly passis_local(true and false respectively). TheFetchBucketDataCcstruct stores the flag inis_local_member field,Reset()correctly assigns it, and the simpler overload carries consistency through the struct. No missing updates detected.src/cc/local_cc_handler.cpp (2)
954-957: Good: make ScanOpen non-blocking and defer actual work to ScanNext.Setting finished here simplifies the state machine and avoids early scanning side effects.
1159-1184: Review comment is accurate; all proposed changes are correct and necessary.Verification confirms:
- First
ScanNextoverload (line 162) lacks direction parameter and hardcodesdirection=true(line 557) ✓CcScanner::Direction()method exists (include/cc/ccm_scanner.h:464) ✓ScanDirection::Forwardenum exists (include/cc/scan.h:34) ✓- Call site at line 1160 correctly should pass
scanner.Direction() == ScanDirection::Forward✓- Second
ScanNextoverload (line 188) already correctly extracts direction fromccm_scanner_and requires no changes ✓All three coordinated changes in the review comment are accurate and address the bug completely for the affected overload.
include/remote/remote_cc_request.h (2)
472-473: Execute(CcShard&) override: ensure no blocking I/O and bounded work.Given this is called on request threads, ensure it never performs blocking waits or unbounded merges. If heavy merge is needed, offload to worker/bthread and return.
Can you confirm the implementation offloads long-running steps (bucket fetch, merges) and uses timeouts for remote calls?
489-493: No compilation issues exist. ng_term_ is properly declared in TemplatedCcRequest, the base class.RemoteScanNextBatch inherits from
TemplatedCcRequest<RemoteScanNextBatch, Void>, which is whereng_term_is declared as a protected member (include/cc/cc_request.h:335). TheNodeGroupTerm()method correctly accesses this inherited member. The code compiles without issues.
| // Even if ScanNext operation fail, we should also move scan | ||
| // result into read set to release acquired lock. | ||
| { | ||
| uint32_t node_group_id = scan_next_res.node_group_id(); | ||
| CcScanner *scanner = hd_res->Value().ccm_scanner_; | ||
| hd_res->Value().current_scan_plan_->UpdateNodeGroupTerm( | ||
| node_group_id, scan_next_res.term()); | ||
| auto *ng_bucket_scan_progress = |
There was a problem hiding this comment.
Comment contradicts behavior.
The comment says to move scan result even on failure, but the code only executes on success. Align the comment or implement the intended fallback.
🤖 Prompt for AI Agents
In src/remote/cc_stream_receiver.cpp around lines 1124-1131, the inline comment
claims we should move the scan result into the read set even if ScanNext fails,
but the code only does this on success; either update the comment to reflect the
current behavior or implement the intended fallback: add an else/failure path
that still extracts node_group_id, uses hd_res->Value().ccm_scanner_ and
current_scan_plan_ to call UpdateNodeGroupTerm and move the scan result into the
read set (releasing any acquired lock), while guarding null pointers and logging
the failure case; ensure the fallback mirrors the success-path cleanup so locks
are always released.
c733733 to
59b71fa
Compare
c66c35b to
c388cfe
Compare
dd9cf54 to
8a97681
Compare
01366e5 to
731cb1c
Compare
731cb1c to
88f8b8b
Compare
Here are some reminders before you submit the pull request
fixes eloqdb/tx_service#issue_id./mtr --suite=mono_main,mono_multi,mono_basicSummary by CodeRabbit
Refactor
Chores