Skip to content

Refactor scan next batch for hash partition#131

Merged
lokax merged 6 commits into
hash-part-scanfrom
scan-next-batch-cc
Sep 26, 2025
Merged

Refactor scan next batch for hash partition#131
lokax merged 6 commits into
hash-part-scanfrom
scan-next-batch-cc

Conversation

@lokax

@lokax lokax commented Sep 12, 2025

Copy link
Copy Markdown
Collaborator

Here are some reminders before you submit the pull request

  • Add tests for the change
  • Document changes
  • Reference the link of issue using fixes eloqdb/tx_service#issue_id
  • Reference the link of RFC if exists
  • Pass ./mtr --suite=mono_main,mono_multi,mono_basic

Summary by CodeRabbit

  • New Features

    • Range-bounded scans with explicit start/end keys and inclusivity.
    • Bucket-based scan planning with per-bucket progress across shards/cores.
    • Clearer error messaging: “ERR invalid cursor.”
  • Performance

    • More efficient hash-partitioned scanning with merged per-shard in-memory/KV caches.
    • Batch fetching of multiple buckets and bulk task submission to the worker pool.
  • Refactor

    • Replaced legacy scanner with a new hash-partitioned scanner and unified blocking model.
    • Streamlined scan control flow and cache handling; reduced verbose logging.

@liunyl

liunyl commented Sep 17, 2025

Copy link
Copy Markdown
Contributor

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Sep 17, 2025

Copy link
Copy Markdown
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai

coderabbitai Bot commented Sep 17, 2025

Copy link
Copy Markdown

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description Check ⚠️ Warning The pull request description currently contains only the repository reminder checklist (the template) and does not include a summary of what changed, the rationale, a list of affected files or public API/signature changes, test plan or results, documentation updates, or links to related issues/RFCs, so it is incomplete for a proper code review. Please update the PR description to include a concise summary of the changes and motivation, enumerate key modified files and any public API/signature changes, provide testing instructions and CI/test status (or attach results), note documentation or migration steps, and add links to related issue(s) or RFCs before requesting final review.
Docstring Coverage ⚠️ Warning Docstring coverage is 3.82% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title Check ✅ Passed The title "Refactor scan next batch for hash partition" is concise, focused, and accurately reflects the primary change set shown in the diff (ScanNextBatch refactor and hash-partition scanning updates), so it clearly communicates the main intent to a teammate scanning history.

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (8)
src/remote/remote_cc_handler.cpp (1)

597-599: Don't pass a non‑null‑terminated StringView to protobuf setter.

set_table_name_str(tbl_name.StringView().data()) risks truncation/UB since .data() isn't null‑terminated. Use the owning string instead.

Apply this diff:

-    scan_slice->set_table_name_str(tbl_name.StringView().data());
+    scan_slice->set_table_name_str(tbl_name.String());
src/remote/cc_stream_receiver.cpp (2)

1124-1155: Term is never set after commenting out cache population.

term stays -1, then cc_node_terms_[ng_id] = term; records an invalid term. If per‑node‑group term is needed later, extract it without rebuilding caches.

Apply this diff to compute term from the first tuple of each core:

-                /*
-                ScanCache *shard_cache = scanner.AddShard(shard_code);
-
-                for (int idx = 0; idx < cache_msg.scan_tuple_size(); ++idx)
-                {
-                    const ScanTuple_msg &tuple_msg = cache_msg.scan_tuple(idx);
-                    assert(tuple_msg.cce_addr().core_id() ==
-                           (uint32_t) core_id);
-
-                    term = tuple_msg.cce_addr().term();
-                    ...
-                }
-                        */
+                // Derive term without populating caches.
+                if (term < 0 && cache_msg.scan_tuple_size() > 0) {
+                    term = cache_msg.scan_tuple(0).cce_addr().term();
+                }

1817-1825: Inverted error handling for KickoutDataResponse.

if (!cc_resp.error_code()) treats success as error.

Apply this diff:

-        if (!cc_resp.error_code())
-        {
-            hd_res->SetRemoteError(
-                ToLocalType::ConvertCcErrorCode(cc_resp.error_code()));
-        }
-        else
-        {
-            hd_res->SetRemoteFinished();
-        }
+        if (cc_resp.error_code() != 0)
+        {
+            hd_res->SetRemoteError(
+                ToLocalType::ConvertCcErrorCode(cc_resp.error_code()));
+        }
+        else
+        {
+            hd_res->SetRemoteFinished();
+        }
include/cc/scan.h (1)

149-160: Avoid dual-ownership leaks when switching between ptr/object record storage

Calling SetRecord(shared_ptr<>) then SetRecord(unique_ptr<>) (or vice versa) leaves the other holder non-null, keeping memory alive unnecessarily. Reset the opposite holder when switching.

Apply this diff:

-    void SetRecord(std::shared_ptr<ValueT> ptr)
-    {
-        is_ptr_ = true;
-        rec_ptr_ = ptr;
-    }
+    void SetRecord(std::shared_ptr<ValueT> ptr)
+    {
+        rec_obj_.reset();
+        is_ptr_ = true;
+        rec_ptr_ = std::move(ptr);
+    }
@@
-    void SetRecord(std::unique_ptr<ValueT> rec_obj)
-    {
-        is_ptr_ = false;
-        rec_obj_ = std::move(rec_obj);
-    }
+    void SetRecord(std::unique_ptr<ValueT> rec_obj)
+    {
+        rec_ptr_.reset();
+        is_ptr_ = false;
+        rec_obj_ = std::move(rec_obj);
+    }
src/cc/local_cc_handler.cpp (1)

1291-1313: Remote path disabled → leaked ref/scan hang.

Else-branch only increases remote ref; remote call is commented out, so the request can never finish.

Short-term guard to keep liveness until remote path is implemented:

-    else
-    {
-        hd_res.IncreaseRemoteRef();
-        /*
-        ... old remote path commented ...
-        */
-    }
+    else
+    {
+        // TODO: implement remote hash-part ScanNextBatch dispatch
+        hd_res.SetError(CcErrorCode::DATA_NOT_ON_LOCAL_NODE);
+        return;
+    }

If remote is expected, wire a proper remote call with the new start/end range semantics instead of the old blocked_cache API.

src/tx_execution.cpp (1)

1077-1086: Fix compile-time errors and typos.

These will fail to build and must be corrected.

Apply the following diffs:

--- a/src/tx_execution.cpp
+++ b/src/tx_execution.cpp
@@ -1077,7 +1077,7 @@ void TransactionExecution::ProcessTxRequest(ReloadCacheTxRequest &req)
     TX_TRACE_ACTION_WITH_CONTEXT(
         this,
-        &fi_req,
+        &req,
         [this]() -> std::string
         {
             return std::string("\"tx_number\":")
@@ -1123,7 +1123,7 @@ void TransactionExecution::ProcessTxRequest(InvalidateTableCacheTxRequest &req)
         {
             return std::string("\"tx_number\":")
-                .append(std::string(this->TxNumber()))
+                .append(std::to_string(this->TxNumber()))
                 .append("\"tx_term\":")
                 .append(std::to_string(this->tx_term_));
         });
@@ -6380,7 +6380,7 @@ void TransactionExecution::PostProcess(ReleaseScanExtraLockOp &unlock_op)
     TX_TRACE_ACTION_WITH_CONTEXT(
         this,
-        &lock_op,
+        &unlock_op,
         [this]() -> std::string
         {
             return std::string("\"tx_number\":")
@@ -6439,8 +6439,8 @@ void TransactionExecution::PostProcess(KickoutDataOp &kickout_data_all_op)
             return std::string("\"tx_number\":")
                 .append(std::to_string(this->TxNumber()))
                 .append("\"tx_term\":")
-                .append(std::to_string(this->tx_term_))
-        });
+                .append(std::to_string(this->tx_term_));
+        });
@@ -7419,8 +7419,8 @@ void TransactionExecution::PostProcess(KickoutDataAllOp &kickout_data_all_op)
             return std::string("\"tx_number\":")
                 .append(std::to_string(this->TxNumber()))
                 .append("\"tx_term\":")
-                .append(std::to_string(this->tx_term_))
-        });
+                .append(std::to_string(this->tx_term_));
+        });
@@ -7479,7 +7479,7 @@ void TransactionExecution::Process(BatchReadOperation &batch_read_op)
     {
         // Check leader term for hash partitioned table since we will
         // access bucket infos that are bind to ng term.
-        void_resp_->FinishError(TxErrorCode::TRANSACTION_NODE_NOT_LEADER);
+        void_resp_->FinishError(TxErrorCode::TX_NODE_NOT_LEADER);
         void_resp_ = nullptr;
         return;
     }
@@ -7554,7 +7554,7 @@ void TransactionExecution::PostProcess(BatchReadOperation &batch_read_op)
     TX_TRACE_ACTION_WITH_CONTEXT(
         this,
-        &read,
+        &batch_read_op,
         [this]() -> std::string
         {
             return std::string("\"tx_number\":")

Also applies to: 1120-1129, 6380-6388, 6439-6443, 7419-7426, 7477-7484, 7554-7563

include/cc/cc_request.h (2)

1807-1870: Avoid storing a shallow copy of end_key; use an owned clone

Reset() saves end_key_ via GetShallowCopy(). If the caller’s TxKey goes out of scope, end_key_ dangles across async execution. Clone to own the bytes.

Apply this diff:

-        end_key_ = end_key.GetShallowCopy();
+        end_key_ = end_key.Clone();

1807-1870: Per-core maps are mutated concurrently → data-race risk

wait_for_fetch_bucket_cnt_ and blocking_info_ (absl::flat_hash_map) are written/read by multiple cores without synchronization. Even disjoint-key writes on the same container are not thread-safe. Prefer per-core arrays or guard access with a mutex.

Minimal containment (mutex-guarded access) example:

@@
-    absl::flat_hash_map<uint16_t, size_t> wait_for_fetch_bucket_cnt_;
-    absl::flat_hash_map<uint16_t, ScanBlockingInfo> blocking_info_;
+    // Protects wait_for_fetch_bucket_cnt_ and blocking_info_ accesses.
+    bthread::Mutex blocking_mux_;
+    absl::flat_hash_map<uint16_t, size_t> wait_for_fetch_bucket_cnt_;
+    absl::flat_hash_map<uint16_t, ScanBlockingInfo> blocking_info_;

Then wrap all getters/setters below with the mutex (see next comments for concrete diffs).

Also applies to: 2075-2092

🧹 Nitpick comments (36)
include/error_messages.h (1)

216-216: Message style + symmetry with CcErrorCode

  • Message text is inconsistent with others. Prefer a neutral sentence.
  • CcErrorCode::INVALID_CURSOR exists but lacks a cc_error_messages entry.

Apply:

-    {TxErrorCode::INVALID_CURSOR, "ERR invalid cursor"},
+    {TxErrorCode::INVALID_CURSOR, "Invalid cursor."},

And add a CC mapping (near other entries):

   {CcErrorCode::UPDATE_SEQUENCE_TABLE_FAIL, "UPDATE_SEQUENCE_TABLE_FAIL"},
+  {CcErrorCode::INVALID_CURSOR, "INVALID_CURSOR"},
include/cc/cc_req_misc.h (1)

649-653: New is_drained_ flag: clarify lifecycle and reset

Ensure Reset() (or constructor path) always sets is_drained_ = false, and document whether it’s written/read only on the TxProcessor thread (else make it atomic).

If needed:

 void Reset(...){
   ...
   bucket_data_items_.clear();
+  is_drained_ = false;
   ...
 }
include/data_sync_task.h (2)

74-75: err_code_ visibility/locking

err_code_ is written under status_->mux_, but readers may access it without the lock. Either (a) make it atomic, or (b) enforce accessor methods that lock.

Option (a):

-    CcErrorCode err_code_{CcErrorCode::NO_ERROR};
+    std::atomic<CcErrorCode> err_code_{CcErrorCode::NO_ERROR};

Then store/load with memory_order as needed.


160-164: SetErrorCode locking contract

If err_code_ remains non-atomic, document that callers must read it with status_->mux_ held; otherwise convert to atomic as suggested above.

src/tx_worker_pool.cpp (2)

97-98: Avoid extra copies; move-insert the batch

Insert with move iterators to prevent copying std::function.

-    work_queue_.insert(work_queue_.end(), work.begin(), work.end());
-    work_queue_cv_.notify_one();
+    const bool multiple = work.size() > 1;
+    work_queue_.insert(work_queue_.end(),
+                       std::make_move_iterator(work.begin()),
+                       std::make_move_iterator(work.end()));
+    if (multiple) {
+        work_queue_cv_.notify_all();
+    } else {
+        work_queue_cv_.notify_one();
+    }

41-68: Worker wake-up policy for bulk enqueues

When enqueuing many tasks, waking a single worker may underutilize the pool. The notify_all() path above balances latency vs. overhead.

include/store/data_store_handler.h (1)

189-195: Batch FetchBucketData signature: pass by const&

Passing the vector by value adds an avoidable copy; prefer const& (or absl::Span) unless the implementation needs ownership.

-    virtual DataStoreOpStatus FetchBucketData(
-        std::vector<FetchBucketDataCc *> fetch_bucket_data_ccs)
+    virtual DataStoreOpStatus FetchBucketData(
+        const std::vector<FetchBucketDataCc *> &fetch_bucket_data_ccs)

Also consider making this pure virtual if all backends must implement it; otherwise leave the stub.

include/tx_operation_result.h (3)

700-705: Avoid copying large nested maps from CurrentPosition()

Return a const reference to prevent copying potentially large state.

-    absl::flat_hash_map<NodeGroupId,
-                        absl::flat_hash_map<uint16_t, BucketScanProgress>>
-    CurrentPosition()
+    const absl::flat_hash_map<NodeGroupId,
+                              absl::flat_hash_map<uint16_t, BucketScanProgress>> &
+    CurrentPosition() const
     {
-        return current_position_;
+        return current_position_;
     }

656-660: Buckets(NodeGroupId): guard against missing key

Using at() throws; returning nullptr when absent is often friendlier for plan consumers.

-    std::vector<uint16_t> *Buckets(NodeGroupId node_group_id)
-    {
-        return &buckets_->at(node_group_id);
-    }
+    std::vector<uint16_t> *Buckets(NodeGroupId node_group_id)
+    {
+        auto it = buckets_->find(node_group_id);
+        return it == buckets_->end() ? nullptr : &it->second;
+    }

708-715: Comment drift

The comment “// <pause key, is_drained>” no longer matches the BucketScanProgress layout. Update or remove.

include/cc/cc_shard.h (1)

742-754: Clarify contract and mutability for batched bucket fetch input.

absl::flat_hash_map<uint16_t, bool> &bucket_ids suggests in/out flags (e.g., drained). If the callee mutates the bools, document semantics; if not, make the map const&. Also note flat_hash_map’s non‑deterministic iteration; if result ordering matters downstream, preserve a stable order alongside the map.

Possible signatures:

  • Mutating: absl::flat_hash_map<uint16_t, bool>& bucket_ids /* in/out: drained */
  • Non‑mutating: absl::Span<const uint16_t> bucket_ids
include/cc/scan.h (1)

188-189: Typo in friend declaration: HashParitionCcScanner → HashPartitionCcScanner

Please fix the identifier or confirm the intended class name; current spelling looks accidental and risks mismatches.

Apply this diff if the class is indeed “HashPartitionCcScanner”:

-    friend class HashParitionCcScanner;
+    friend class HashPartitionCcScanner;
src/cc/cc_req_misc.cpp (1)

952-968: Type safety and counter symmetry in FetchBucketDataCc::Execute

Static-casting requester_ assumes it’s always ScanNextBatchCc. Add an assert to guard misuse. Also double-check Increase/DecreaseWaitForFetchBucketCnt calls are strictly paired to avoid underflow.

Apply this minimal guard:

-    ScanNextBatchCc *req = static_cast<ScanNextBatchCc *>(requester_);
+    auto *req = static_cast<ScanNextBatchCc *>(requester_);
+    assert(req != nullptr);
include/tx_request.h (2)

411-415: Make PlanSize const and noexcept (API polish).

Apply:

-    size_t PlanSize()
+    [[nodiscard]] size_t PlanSize() const noexcept
     {
         return bucket_groups_.size();
     }

436-454: Header logging in Debug(): add include or move to .cpp; also make const.

Using LOG in a header without including its definition can break builds and forces a transitive dependency.

Minimal fix in place:

-    void Debug()
+    void Debug() const
     {
         size_t cnt = 0;
         for (auto &group : bucket_groups_)
         {
             for (auto &[node_group_id, buckets] : group)
             {
                 cnt += buckets.size();
             }
         }

         LOG(INFO) << "==yf: cluster config version = "
                   << cluster_config_version_
                   << ", prev pause index = " << prev_pause_idx_
                   << ", group size = " << bucket_groups_.size()
                   << ", position size = " << pause_position_.size()
                   << ", cnt = " << cnt;
     }

Additionally, either:

  • Add the include in this header (quick):
 #include "absl/container/flat_hash_map.h"
+#include <glog/logging.h>
  • Or move Debug() to a .cpp to avoid header-level logging dependency (preferred).
src/cc/local_cc_handler.cpp (4)

939-939: Remove stale commented code.

Drop the commented SetDrainCacheMode line or add a TODO explaining why it stays.


1198-1198: Remove stale commented code.

Same here for local open.


1237-1239: Null-safety on current_scan_plan_.

Guard against null before dereference to avoid crash on miswired callers.

Apply:

-        BucketScanPlan *plan = hd_res.Value().current_scan_plan_;
+        BucketScanPlan *plan = hd_res.Value().current_scan_plan_;
+        assert(plan != nullptr);
         assert(plan->Buckets().count(node_group_id) > 0);

1240-1258: Core/bucket init: verify core index mapping.

Assumes ShardBucketIdToCoreIdx(bucket_id) always maps into the pre-emplaced cores; if topology changes or buckets are empty, at() will throw.

Suggested defensive tweak:

-                bucket_scan_progress->at(target_core)
-                    .scan_buckets_.emplace(bucket_id, false);
+                auto it = bucket_scan_progress->find(target_core);
+                if (it != bucket_scan_progress->end()) {
+                    it->second.scan_buckets_.emplace(bucket_id, false);
+                } else {
+                    // unexpected mapping; skip or log
+                }

Also consider seeding per-bucket cursor/inclusivity if start_inclusive should affect the first key on each core.

include/cc/template_cc_map.h (6)

2761-2765: Make shard_code composition explicit and guard typed_cache access.

Avoid accidental overflow and add basic sanity checks before dereferencing.

-        uint32_t shard_code = (ng_id << 10) + shard_->core_id_;
+        uint32_t shard_code =
+            (static_cast<uint32_t>(ng_id) << 10) | (shard_->core_id_ & 0x3FF);
         TemplateScanCache<KeyT, ValueT> *typed_cache =
             static_cast<TemplateScanCache<KeyT, ValueT> *>(
                 req.GetLocalMemoryCache(shard_code));
+        assert(typed_cache != nullptr && typed_cache->Scanner() != nullptr);

2789-2801: Avoid magic numbers in FetchBucketData.

false and 16 should be named constants or derived from config/req to avoid accidental divergence across call sites.


2902-2903: Silence unused counters or guard under debug.

debug_loop_cnt and add_cache_cnt are never read.

-        size_t debug_loop_cnt = 0;
-        size_t add_cache_cnt = 0;
+        [[maybe_unused]] size_t debug_loop_cnt = 0;
+        [[maybe_unused]] size_t add_cache_cnt = 0;

12126-12134: Avoid decoding VersionedRecord payload without using it.

In the VersionedRecord branch, the record is deserialized but never stored in scan_tuple, wasting CPU. Either set the record on tuple when IsRequireRecs() or skip decoding.

-                if constexpr (VersionedRecord)
-                {
-                    ValueT val;
-                    val.Deserialize(item.rec_str_.data(), rec_offset);
-                }
+                if constexpr (VersionedRecord)
+                {
+                    if (req->IsRequireRecs()) {
+                        auto payload = std::make_shared<ValueT>();
+                        payload->Deserialize(item.rec_str_.data(), rec_offset);
+                        scan_tuple->SetRecord(payload);
+                    }
+                }

12102-12106: Unify shard_code composition with bit‑packing form.

Match the earlier fix and avoid addition.

-    uint32_t shard_code = (req->NodeGroupId() << 10) + shard.core_id_;
+    uint32_t shard_code =
+        (static_cast<uint32_t>(req->NodeGroupId()) << 10) | (shard.core_id_ & 0x3FF);

12158-12159: Prefer consistent Enqueue overload.

Elsewhere Enqueue takes (core_id, req); here Enqueue(requester) is used. Align for clarity and to avoid unintended queue selection.

src/tx_execution.cpp (6)

2431-2433: Guard command-id bump after ScanOpen (hash partition).

Unconditionally bumping command_id_ can double-advance vs. finished local opens. Prefer the existing helper.

-        // immediately forward again.
-        command_id_.fetch_add(1, std::memory_order_relaxed);
+        // Immediately forward again (consistent with other ops).
+        AdvanceCommand();

Please confirm there’s no other place in this path that already advances the command id on a synchronous completion.


2673-2685: Resetting scanner on plan index change — verify lifecycle.

Closing and re-blocking the scanner on plan rollover is reasonable; ensure no leaked cache state or dangling iterators when a remote response lands late. Consider asserting scanner.Status() != Open before Close() for safety.


2704-2728: Add remote-request metrics for hash ScanNext dispatch.

Range path tracks in-flight remote count and durations; hash path doesn’t. Mirror the metrics for parity.

-            auto &ng_scan_buckets =
+            auto &ng_scan_buckets =
                 scan_next.hd_result_.Value().current_scan_plan_->Buckets();
             scan_next.ResetResultForHashPart(ng_scan_buckets.size());
 
             // Reset all caches, we need to scan next batch data
             scanner.ResetCaches();
+
+            if (metrics::enable_remote_request_metrics && !ng_scan_buckets.empty()) {
+                auto meter = tx_processor_->GetMeter();
+                meter->Collect(metrics::NAME_IN_FLIGHT_REMOTE_REQUEST_COUNT,
+                               metrics::Value::IncDecValue::Increment,
+                               "scan_next");
+                if (is_collecting_duration_round_) {
+                    scan_next.op_start_ = metrics::Clock::now();
+                }
+            }

5803-5805: Remove or gate commented drain-cache code.

The SetDrainCacheMode(true) stub is commented out. Either implement or delete to avoid confusion.


5906-5917: Use the function parameter consistently (avoid mixing member and param).

Process/PostProcess(ReloadCacheOperation&) mix reload_cache_op_ (member) with reload_cache_op (param). Use the param for clarity.

-        this,
-        &reload_cache_op_,
+        this,
+        &reload_cache_op,
@@
-    reload_cache_op_.is_running_ = true;
+    reload_cache_op.is_running_ = true;
-        this,
-        &reload_cache_op_,
+        this,
+        &reload_cache_op,

Also applies to: 5935-5943


2691-2699: Delete dead/commented local path or add rationale.

The ScanNextBatchLocal block is fully commented. Remove or add a TODO with owner/date to prevent drift.

include/cc/cc_request.h (3)

1882-1947: Finalize logic is sound; tighten memory ordering (optional)

err_ uses relaxed loads/stores; given fetch_sub(acq_rel) bounds, it works. If you want crisper intent, write with release and read with acquire.

Example:

-        err_.compare_exchange_strong(expected,
-                                     err_code,
-                                     std::memory_order_relaxed,
-                                     std::memory_order_relaxed);
+        err_.compare_exchange_strong(expected,
+                                     err_code,
+                                     std::memory_order_release,
+                                     std::memory_order_relaxed);

1974-1994: Guard shared state access with a mutex

These helpers read/write maps shared across cores. Add a lock to avoid UB.

Apply this diff:

 uint64_t BlockingCceLockAddr(uint16_t core_id)
 {
-        assert(blocking_info_.count(core_id) > 0);
-        return blocking_info_[core_id].cce_lock_addr_;
+        BAIDU_SCOPED_LOCK(blocking_mux_);
+        assert(blocking_info_.count(core_id) > 0);
+        return blocking_info_[core_id].cce_lock_addr_;
 }
@@
 std::pair<ScanBlockingType, ScanType> BlockingPair(uint16_t core_id)
 {
-        assert(blocking_info_.count(core_id) > 0);
-        return {blocking_info_[core_id].type_,
-                blocking_info_[core_id].scan_type_};
+        BAIDU_SCOPED_LOCK(blocking_mux_);
+        assert(blocking_info_.count(core_id) > 0);
+        return {blocking_info_[core_id].type_,
+                blocking_info_[core_id].scan_type_};
 }
@@
 void SetBlockingInfo(uint16_t core_id,
                      uint64_t cce_lock_addr,
                      ScanType scan_type,
                      ScanBlockingType blocking_type)
 {
-        assert(blocking_info_.count(core_id) > 0);
-        blocking_info_[core_id] = {cce_lock_addr, scan_type, blocking_type};
+        BAIDU_SCOPED_LOCK(blocking_mux_);
+        assert(blocking_info_.count(core_id) > 0);
+        blocking_info_[core_id] = {cce_lock_addr, scan_type, blocking_type};
 }
@@
 size_t WaitForFetchBucketCnt(uint16_t core_id)
 {
-        assert(wait_for_fetch_bucket_cnt_.count(core_id) > 0);
-        return wait_for_fetch_bucket_cnt_[core_id];
+        BAIDU_SCOPED_LOCK(blocking_mux_);
+        assert(wait_for_fetch_bucket_cnt_.count(core_id) > 0);
+        return wait_for_fetch_bucket_cnt_[core_id];
 }
@@
 void DecreaseWaitForFetchBucketCnt(uint16_t core_id)
 {
-        assert(wait_for_fetch_bucket_cnt_.count(core_id) > 0);
-        wait_for_fetch_bucket_cnt_[core_id]--;
+        BAIDU_SCOPED_LOCK(blocking_mux_);
+        assert(wait_for_fetch_bucket_cnt_.count(core_id) > 0);
+        wait_for_fetch_bucket_cnt_[core_id]--;
 }
@@
 void IncreaseWaitForFetchBucketCnt(uint16_t core_id)
 {
-        assert(wait_for_fetch_bucket_cnt_.count(core_id) > 0);
-        wait_for_fetch_bucket_cnt_[core_id]++;
+        BAIDU_SCOPED_LOCK(blocking_mux_);
+        assert(wait_for_fetch_bucket_cnt_.count(core_id) > 0);
+        wait_for_fetch_bucket_cnt_[core_id]++;
 }
@@
 bool IsWaitForFetchBucket(uint16_t core_id)
 {
-        assert(blocking_info_.count(core_id) > 0);
-        return blocking_info_[core_id].type_ ==
-               ScanBlockingType::BlockOnFetchBucket;
+        BAIDU_SCOPED_LOCK(blocking_mux_);
+        assert(blocking_info_.count(core_id) > 0);
+        return blocking_info_[core_id].type_ ==
+               ScanBlockingType::BlockOnFetchBucket;
 }
@@
 void SetIsWaitForFetchBucket(uint16_t core_id)
 {
-        assert(blocking_info_.count(core_id) > 0);
-        blocking_info_[core_id].type_ = ScanBlockingType::BlockOnFetchBucket;
+        BAIDU_SCOPED_LOCK(blocking_mux_);
+        assert(blocking_info_.count(core_id) > 0);
+        blocking_info_[core_id].type_ = ScanBlockingType::BlockOnFetchBucket;
 }

Also applies to: 2011-2040


2042-2061: Do not expose mutable internal progress maps

BucketIds() returns a non-const reference to internal state; external mutation can corrupt merge logic. Return const&, and add a separate MutableBucketIds() only where mutation is required.

Apply this diff:

-    absl::flat_hash_map<uint16_t, bool> &BucketIds(uint16_t core_id)
+    const absl::flat_hash_map<uint16_t, bool> &BucketIds(uint16_t core_id) const
     {
         return bucket_scan_progress_->at(core_id).scan_buckets_;
     }

If mutation is needed inside handlers, expose a dedicated, narrowly-scoped mutator API instead.

include/cc/ccm_scanner.h (2)

545-552: Typo in class name: HashParitionCcScanner → HashPartitionCcScanner

Correct spelling prevents long-lived footguns in type names. If renaming is invasive, add a using alias to maintain source compatibility.

Example:

-template <typename KeyT, typename ValueT>
-class HashParitionCcScanner : public CcScanner
+template <typename KeyT, typename ValueT>
+class HashPartitionCcScanner : public CcScanner
 {
   ...
 };
+template <typename KeyT, typename ValueT>
+using HashParitionCcScanner = HashPartitionCcScanner<KeyT, ValueT>;

663-679: ResetCaches should hold the mutex

You lock elsewhere when touching shard_caches_/index_chains_. Do the same here to avoid races when merges are in-flight.

Apply this diff:

-    void ResetCaches() override
+    void ResetCaches() override
     {
-        for (auto &[shard_code, cache] : shard_caches_)
+        std::unique_lock<std::mutex> lock(mutex_);
+        for (auto &[shard_code, cache] : shard_caches_)
         {
             cache->memory_cache_.Reset();
             for (auto &[bucket_id, kv_cache] : cache->kv_caches_)
             {
                 kv_cache.Reset();
             }
         }

-        for (auto &[shard_code, index] : index_chains_)
+        for (auto &[shard_code, index] : index_chains_)
         {
             index->Reset();
         }

         current_iter_ = {};
         init_ = false;
     }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b933955 and f248d19.

📒 Files selected for processing (24)
  • include/cc/cc_handler.h (1 hunks)
  • include/cc/cc_req_misc.h (1 hunks)
  • include/cc/cc_request.h (6 hunks)
  • include/cc/cc_shard.h (1 hunks)
  • include/cc/ccm_scanner.h (13 hunks)
  • include/cc/local_cc_handler.h (1 hunks)
  • include/cc/scan.h (2 hunks)
  • include/cc/template_cc_map.h (17 hunks)
  • include/data_sync_task.h (1 hunks)
  • include/error_messages.h (1 hunks)
  • include/store/data_store_handler.h (1 hunks)
  • include/tx_operation.h (2 hunks)
  • include/tx_operation_result.h (1 hunks)
  • include/tx_request.h (3 hunks)
  • include/tx_worker_pool.h (1 hunks)
  • src/cc/cc_req_misc.cpp (3 hunks)
  • src/cc/cc_shard.cpp (1 hunks)
  • src/cc/local_cc_handler.cpp (5 hunks)
  • src/eloq_basic_catalog_factory.cpp (1 hunks)
  • src/remote/cc_stream_receiver.cpp (2 hunks)
  • src/remote/remote_cc_handler.cpp (1 hunks)
  • src/tx_execution.cpp (14 hunks)
  • src/tx_operation.cpp (0 hunks)
  • src/tx_worker_pool.cpp (1 hunks)
💤 Files with no reviewable changes (1)
  • src/tx_operation.cpp
🔇 Additional comments (27)
include/tx_operation_result.h (1)

519-583: Incorrect — default ctor not required for BucketScanProgress

No code uses operator[] to create BucketScanProgress values; accesses to absl::flat_hash_map<..., BucketScanProgress> use .at() or are explicitly populated. The only operator[] hit is on BucketScanProgress::scan_buckets_ (map<uint16_t, bool>), which is safe. See include/cc/cc_request.h:1907, include/cc/template_cc_map.h:12149–12151, src/cc/local_cc_handler.cpp:1240–1244.

Likely an incorrect or invalid review comment.

src/eloq_basic_catalog_factory.cpp (1)

148-151: No mismatch — HashParitionCcScanner is declared and used consistently.
The class is declared as HashParitionCcScanner in include/cc/ccm_scanner.h and referenced the same in include/cc/scan.h and src/eloq_basic_catalog_factory.cpp; there is no build-breaking name mismatch. If you intended the spelling HashPartitionCcScanner, rename consistently across those files.

Likely an incorrect or invalid review comment.

src/remote/remote_cc_handler.cpp (1)

629-640: Incorrect — no null-guard needed for scanner.Cache(core_id)

All CcScanner implementations in include/cc/ccm_scanner.h return a valid ScanCache* for indices < ShardCount() (e.g., RangePartitionedCcmScanner::Cache() returns &scans_[shard_code]; HashParitionCcScanner::Cache() returns &shard_cache->memory_cache_), so the proposed null-check is unnecessary.

Likely an incorrect or invalid review comment.

include/cc/local_cc_handler.h (1)

235-243: Signature change: ensure all call sites updated and semantics documented

New start/end key params are inserted mid-parameter list. Verify all overrides/implementations and external callers compile cleanly and that inclusivity semantics are clearly documented (start_inclusive/end_inclusive).

src/cc/cc_req_misc.cpp (2)

923-926: Init drained state

Initializing is_drained_ in Reset is correct.


977-981: Enqueue self on finish (good change)

Routing completion through this request before waking the requester is appropriate for the new wait-counter flow.

include/tx_request.h (3)

395-407: PickPlan logic LGTM.

Only resume when prev_pause_idx_ matches current_idx; otherwise start fresh. Clear and direct.


455-462: Member layout/readability LGTM.


681-681: Default-initialize bucket_scan_plan_.

Good defensive default; avoids UB on read before set.

include/cc/cc_handler.h (1)

354-358: Document range params and verify overrides/forwarding

  • Add short param docs for start_key / start_inclusive / end_key / end_inclusive in include/cc/cc_handler.h: state they define the scan interval, the boolean flags control endpoint inclusion, the pointer overload uses nullptr for unbounded ranges, and logical bounds are in scan order (ScanDirection controls traversal); note callers may invert inclusivity when chaining slices (see src/tx_execution.cpp calls that pass !inclusive).
  • Verified overrides/impls: LocalCcHandler declares/implements both overloads and matches the new signatures (include/cc/local_cc_handler.h, src/cc/local_cc_handler.cpp). Remote exposes equivalent pointer-based APIs (include/remote/remote_cc_handler.h, src/remote/remote_cc_handler.cpp) and LocalCcHandler forwards to remote_hd_.ScanNext (src/cc/local_cc_handler.cpp) — ensure forwarding preserves parameter ordering/flags.
  • Action: Add the docs to include/cc/cc_handler.h (and mirror in include/remote/remote_cc_handler.h), then run a grep over ScanNextBatch/ScanNext call sites (e.g., src/tx_execution.cpp) to confirm callers follow the documented semantics.
include/cc/template_cc_map.h (4)

2982-2996: Deleted rows are always kept; verify intent.

keep_deleted is hardcoded true in AddScanTuple, which can surface tombstones to clients. Confirm this matches API semantics for ScanNextBatch.


2727-2730: Term-mismatch now returns NG_TERM_CHANGED; confirm callers handle it.

Upstream paths previously checking NOT_LEADER may now miss this code path.


12149-12152: Bounds-check bucket index when marking progress.

Ensure scan_buckets_ is sized for bucket_id to avoid OOB writes during backfill.


12113-12120: ```shell
#!/bin/bash
set -euo pipefail

echo "PWD: $(pwd)"

files=(
"include/cc/scan.h"
"include/tx_key.h"
"include/catalog_key_record.h"
"include/eloq_string_key_record.h"
"include/range_bucket_key_record.h"
"include/cc/template_cc_map.h"
"include/cc/ccm_scanner.h"
)

for f in "${files[@]}"; do
if [ -f "$f" ]; then
echo
echo "---- $f ----"
sed -n '1,300p' "$f" || true
else
echo
echo "---- $f NOT FOUND ----"
fi
done

echo
echo "---- rg SetPackedKey matches ----"
rg -n --hidden -S "SetPackedKey\(" || true

echo
echo "---- Show SetPackedKey functions (context) ----"
for f in $(rg -n --hidden -S "SetPackedKey\(" --files || true); do
echo
echo "== $f =="
rg -n -C3 "SetPackedKey\(" "$f" || true
sed -n '1,240p' "$f" || true
done

echo
echo "---- TemplateScanTuple definition (from include/cc/scan.h) ----"
rg -n --hidden -n -C3 "struct TemplateScanTuple" include/cc/scan.h || true
sed -n '1,240p' include/cc/scan.h || true

echo
echo "Done."


</blockquote></details>
<details>
<summary>src/tx_execution.cpp (3)</summary><blockquote>

`2529-2566`: **Bucket-plan generation and scanner state initialization look good.**

Setting Blocked and building per-NG/core bucket groups is clean, and scans_.try_emplace now persisting [start,end] with inclusivity is a welcome improvement. No issues spotted.  


Please confirm total_range_buckets and ScanState::max_bucket_count_per_core are bounded and stable across reconfig to avoid plan churn mid-scan.<!-- review_comment_end -->



Also applies to: 2610-2615

---

`2977-3061`: **Hash scan tuple processing and read-set integration — LGTM.**

scanner.Init(), lock-type deduction, Unknown -> read_ts=0 handling, and OCC abort path are sound.  
<!-- review_comment_end -->

---

`3571-3585`: **Verify MemoryShardCache* release semantics in ScanClose.**

Switching to MemoryShardCacheLastTuples/TrailingTuples is fine; please confirm:
- They cover all channels and only include tuples that actually acquired locks.
- Dedup vs. rw_set_ remains correct to avoid double PostRead.  
<!-- review_comment_end -->



Also applies to: 3621-3630

</blockquote></details>
<details>
<summary>include/cc/cc_request.h (2)</summary><blockquote>

`1567-1575`: **Centralize scan blocking enum looks good**

Using a shared ScanBlockingType across requests removes duplication and aligns semantics.

---

`1872-1881`: **New cache accessors are fine**

GetLocalMemoryCache/GetLocalKvCache match the new scanner API surface.

</blockquote></details>
<details>
<summary>include/cc/ccm_scanner.h (8)</summary><blockquote>

`31-31`: **Includes updated appropriately**

Adding absl flat_hash_map, sharder, type headers is appropriate for the new APIs.



Also applies to: 34-34, 37-37

---

`51-52`: **Enum trailing comma: fine**

Harmless formatting tweak; keeps diffs clean for future additions.

---

`162-168`: **Nice cache-trimming helpers**

RemoveLast(size_t) and RemoveLast(const KeyT&) simplify tail pruning and dedupe workflows.



Also applies to: 329-341

---

`369-433`: **Base scanner API expansion is coherent**

KvCache, Merge, Init, ShardCount, CommitAtCore/FinalizeCommit setters align with the new per-shard/per-bucket model.

---

`726-733`: **DecodeKey override: LGTM**

Deserializes via key_schema_ correctly.

---

`734-757`: **Iteration state is clear; ensure Init() happens after all merges**

Init/Current/MoveNext traverse index_chains_. Guarantee they run after all per-core Merge() calls to avoid racing on partially built chains.

Would you confirm Init() is invoked only on/after the last finisher (e.g., gated by unfinished_core_cnt_ == 0)?


Also applies to: 758-779, 781-811

---

`1008-1011`: **Init() no-op for range scanner: OK**

RangePartitionedCcmScanner keeps existing lifecycle; no issues.

---

`1145-1149`: **Return type change to uint32_t is fine**

ShardCount() now matches other scanners’ signatures.

</blockquote></details>

</blockquote></details>

</details>

<!-- This is an auto-generated comment by CodeRabbit for review status -->

Comment thread include/cc/ccm_scanner.h Outdated
Comment thread include/cc/template_cc_map.h Outdated
Comment thread include/tx_operation.h
Comment thread include/tx_worker_pool.h Outdated
Comment thread src/cc/cc_shard.cpp
Comment on lines +1609 to 1644
ScanNextBatchCc *scan_next_batch_cc =
static_cast<ScanNextBatchCc *>(requester);
std::vector<FetchBucketDataCc *> requests;
requests.reserve(bucket_ids.size());
for (const auto &[bucket, kv_is_drained] : bucket_ids)
{
if (kv_is_drained)
{
continue;
}

// increate counter
scan_next_batch_cc->IncreaseWaitForFetchBucketCnt(core_id_);
FetchBucketDataCc *fetch_bucket_data_cc =
fetch_bucket_data_cc_pool_.NextRequest();
fetch_bucket_data_cc->Reset(table_name,
table_schema,
node_group_id,
node_group_term,
ccs,
bucket,
std::move(start_key),
start_key_inclusive,
batch_size,
requester,
backfill_func);
requests.push_back(fetch_bucket_data_cc);
}

if (requests.size() > 0)
{
return local_shards_.store_hd_->FetchBucketData(requests);
}

return store::DataStoreHandler::DataStoreOpStatus::Success;
}

@coderabbitai coderabbitai Bot Sep 17, 2025

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

start_key moved repeatedly across buckets; later requests see a moved-from key

start_key is std::move’d for the first bucket and then reused, yielding empty keys for subsequent requests. Clone/copy per-bucket.

Apply this diff:

-        fetch_bucket_data_cc->Reset(table_name,
+        fetch_bucket_data_cc->Reset(table_name,
                                     table_schema,
                                     node_group_id,
                                     node_group_term,
                                     ccs,
                                     bucket,
-                                    std::move(start_key),
+                                    start_key.Clone(),
                                     start_key_inclusive,
                                     batch_size,
                                     requester,
                                     backfill_func);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ScanNextBatchCc *scan_next_batch_cc =
static_cast<ScanNextBatchCc *>(requester);
std::vector<FetchBucketDataCc *> requests;
requests.reserve(bucket_ids.size());
for (const auto &[bucket, kv_is_drained] : bucket_ids)
{
if (kv_is_drained)
{
continue;
}
// increate counter
scan_next_batch_cc->IncreaseWaitForFetchBucketCnt(core_id_);
FetchBucketDataCc *fetch_bucket_data_cc =
fetch_bucket_data_cc_pool_.NextRequest();
fetch_bucket_data_cc->Reset(table_name,
table_schema,
node_group_id,
node_group_term,
ccs,
bucket,
std::move(start_key),
start_key_inclusive,
batch_size,
requester,
backfill_func);
requests.push_back(fetch_bucket_data_cc);
}
if (requests.size() > 0)
{
return local_shards_.store_hd_->FetchBucketData(requests);
}
return store::DataStoreHandler::DataStoreOpStatus::Success;
}
ScanNextBatchCc *scan_next_batch_cc =
static_cast<ScanNextBatchCc *>(requester);
std::vector<FetchBucketDataCc *> requests;
requests.reserve(bucket_ids.size());
for (const auto &[bucket, kv_is_drained] : bucket_ids)
{
if (kv_is_drained)
{
continue;
}
// increate counter
scan_next_batch_cc->IncreaseWaitForFetchBucketCnt(core_id_);
FetchBucketDataCc *fetch_bucket_data_cc =
fetch_bucket_data_cc_pool_.NextRequest();
fetch_bucket_data_cc->Reset(table_name,
table_schema,
node_group_id,
node_group_term,
ccs,
bucket,
start_key.Clone(),
start_key_inclusive,
batch_size,
requester,
backfill_func);
requests.push_back(fetch_bucket_data_cc);
}
if (requests.size() > 0)
{
return local_shards_.store_hd_->FetchBucketData(requests);
}
return store::DataStoreHandler::DataStoreOpStatus::Success;
}
🤖 Prompt for AI Agents
In src/cc/cc_shard.cpp around lines 1609 to 1644, start_key is std::move'd into
Reset for the first bucket and then reused for subsequent buckets leaving later
starts empty; fix by making a per-iteration copy of start_key and move that copy
into FetchBucketDataCc::Reset (or simply pass the start_key by value instead of
std::move'ing the original), ensuring each FetchBucketDataCc gets its own copy
of the key rather than consuming the original.

⚠️ Potential issue

Handle DataStore Retry to avoid stuck wait counters and pooled-request leaks

If FetchBucketData returns Retry, the per-bucket wait counters remain incremented and requests are never freed. Roll back counters and free requests on Retry.

Apply this diff:

-    if (requests.size() > 0)
-    {
-        return local_shards_.store_hd_->FetchBucketData(requests);
-    }
+    if (!requests.empty())
+    {
+        auto status = local_shards_.store_hd_->FetchBucketData(requests);
+        if (status == store::DataStoreHandler::DataStoreOpStatus::Retry)
+        {
+            for (auto *r : requests)
+            {
+                scan_next_batch_cc->DecreaseWaitForFetchBucketCnt(core_id_);
+                r->Free();
+            }
+        }
+        return status;
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ScanNextBatchCc *scan_next_batch_cc =
static_cast<ScanNextBatchCc *>(requester);
std::vector<FetchBucketDataCc *> requests;
requests.reserve(bucket_ids.size());
for (const auto &[bucket, kv_is_drained] : bucket_ids)
{
if (kv_is_drained)
{
continue;
}
// increate counter
scan_next_batch_cc->IncreaseWaitForFetchBucketCnt(core_id_);
FetchBucketDataCc *fetch_bucket_data_cc =
fetch_bucket_data_cc_pool_.NextRequest();
fetch_bucket_data_cc->Reset(table_name,
table_schema,
node_group_id,
node_group_term,
ccs,
bucket,
std::move(start_key),
start_key_inclusive,
batch_size,
requester,
backfill_func);
requests.push_back(fetch_bucket_data_cc);
}
if (requests.size() > 0)
{
return local_shards_.store_hd_->FetchBucketData(requests);
}
return store::DataStoreHandler::DataStoreOpStatus::Success;
}
ScanNextBatchCc *scan_next_batch_cc =
static_cast<ScanNextBatchCc *>(requester);
std::vector<FetchBucketDataCc *> requests;
requests.reserve(bucket_ids.size());
for (const auto &[bucket, kv_is_drained] : bucket_ids)
{
if (kv_is_drained)
{
continue;
}
// increate counter
scan_next_batch_cc->IncreaseWaitForFetchBucketCnt(core_id_);
FetchBucketDataCc *fetch_bucket_data_cc =
fetch_bucket_data_cc_pool_.NextRequest();
fetch_bucket_data_cc->Reset(table_name,
table_schema,
node_group_id,
node_group_term,
ccs,
bucket,
std::move(start_key),
start_key_inclusive,
batch_size,
requester,
backfill_func);
requests.push_back(fetch_bucket_data_cc);
}
if (!requests.empty())
{
auto status = local_shards_.store_hd_->FetchBucketData(requests);
if (status == store::DataStoreHandler::DataStoreOpStatus::Retry)
{
for (auto *r : requests)
{
scan_next_batch_cc->DecreaseWaitForFetchBucketCnt(core_id_);
r->Free();
}
}
return status;
}
return store::DataStoreHandler::DataStoreOpStatus::Success;
}
🤖 Prompt for AI Agents
In src/cc/cc_shard.cpp around lines 1609 to 1644, the code currently calls
local_shards_.store_hd_->FetchBucketData(requests) and returns immediately; if
FetchBucketData returns Retry the per-bucket wait counters remain incremented
and the pooled requests are leaked. Change the code to capture the returned
DataStoreOpStatus; if it equals Retry, iterate over the requests and for each
call the pool's FreeRequest(request) and call
scan_next_batch_cc->DecreaseWaitForFetchBucketCnt(core_id_) to roll back the
earlier IncreaseWaitForFetchBucketCnt, then return Retry; otherwise return the
original status (or Success) as before.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double check if DataStoreOpStatus::Retry is used anywhere. If not, remove it to avoid confusion

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

@liunyl, let me check if DataStoreOpStatus::Retry is actually used anywhere in the codebase.


🏁 Script executed:

#!/bin/bash
# Search for DataStoreOpStatus enum definition
echo "=== DataStoreOpStatus enum definition ==="
rg -n "enum.*DataStoreOpStatus" --type=cpp --type=h

echo -e "\n=== All DataStoreOpStatus::Retry usages ==="
rg -n "DataStoreOpStatus::Retry" --type=cpp --type=h

echo -e "\n=== DataStoreHandler::DataStoreOpStatus::Retry usages ==="  
rg -n "DataStoreHandler::DataStoreOpStatus::Retry" --type=cpp --type=h

echo -e "\n=== Any Retry enum value in DataStore context ==="
rg -n -A3 -B3 "Retry" --type=cpp --type=h -g "*data_store*"

echo -e "\n=== Check return statements with Retry in DataStore files ==="
rg -n "return.*Retry" --type=cpp --type=h -g "*data_store*"

Length of output: 2115


🏁 Script executed:

#!/bin/bash
# Let's look at the specific enum definition and see the full context
echo "=== Full DataStoreOpStatus enum definition ==="
rg -n -A5 -B2 "enum struct DataStoreOpStatus" include/store/data_store_handler.h

echo -e "\n=== Check all functions that return DataStoreOpStatus::Retry ==="
rg -n -A3 -B3 "DataStoreOpStatus::Retry" --type=cpp --type=h

echo -e "\n=== Check if any DataStore methods actually return Retry ==="
rg -n "return.*DataStoreOpStatus::Retry" --type=cpp --type=h

echo -e "\n=== Look for methods in DataStoreHandler that might return Retry ==="
ast-grep --pattern 'return $$$DataStoreOpStatus::Retry$$$'

Length of output: 4151


🏁 Script executed:

#!/bin/bash
# Let's check what DataStore methods are defined and if any actually return Retry
echo "=== DataStoreHandler virtual methods that return DataStoreOpStatus ==="
rg -n -A10 "virtual.*DataStoreOpStatus" include/store/data_store_handler.h

echo -e "\n=== Check implementations of DataStore methods ==="
fd "data_store" --type=f --extension=cpp --exec rg -l "DataStoreOpStatus::Retry" {}

echo -e "\n=== Look for actual DataStore implementations ==="
rg -n "class.*DataStoreHandler" --type=cpp --type=h

echo -e "\n=== Check if any actual DataStore implementation returns Retry ==="
rg -n -B5 -A5 "return.*Retry" --type=cpp --type=h -g "*data_store*" -g "*store*"

Length of output: 1944


🏁 Script executed:

#!/bin/bash
# Let's check the actual DataStore implementations to see if they ever return Retry
echo "=== Check IntMemoryStore implementation ==="
rg -n -A5 -B5 "DataStoreOpStatus" include/store/int_mem_store.h src/store/int_mem_store.cpp

echo -e "\n=== Look for other DataStore implementation files ==="
fd -e cpp -e h | xargs grep -l "class.*DataStoreHandler\|: public DataStoreHandler\|: DataStoreHandler"

echo -e "\n=== Check if any concrete implementations return Retry ==="
rg -n "return DataStoreOpStatus::Retry\|return.*Retry" --type=cpp --type=h

echo -e "\n=== Check what the default implementations return ==="
rg -n -A3 -B1 "return DataStoreOpStatus::" include/store/data_store_handler.h

Length of output: 995


🏁 Script executed:

#!/bin/bash
# Let's search for all files that might contain DataStore implementations
echo "=== Find all DataStore related files ==="
find . -name "*.cpp" -o -name "*.h" | xargs grep -l "DataStoreOpStatus" | head -10

echo -e "\n=== Check if any real implementation actually returns Retry ==="
rg -n "return.*Success\|return.*Error\|return.*Retry" --type=cpp --type=h | grep -E "(DataStore|FetchRecord|FetchBucket|LoadRange)"

echo -e "\n=== Look at the usage patterns - how is Retry actually used ==="
rg -n -B3 -A3 "== DataStoreOpStatus::Retry\|case.*Retry:" --type=cpp --type=h

echo -e "\n=== Check summary: where Retry appears ==="
echo "Files that reference DataStoreOpStatus::Retry:"
rg -l "DataStoreOpStatus::Retry" --type=cpp --type=h

Length of output: 528


@liunyl, I've checked the codebase for DataStoreOpStatus::Retry usage. Here's what I found:

Current Usage:

  • Defined: In include/store/data_store_handler.h as part of the DataStoreOpStatus enum
  • Used in 4 files: src/cc/cc_shard.cpp, include/cc/object_cc_map.h, include/cc/template_cc_map.h, and src/cc/range_slice.cpp

Key Findings:

  1. No actual DataStore implementations return Retry - The virtual methods in DataStoreHandler only return Success or Error by default
  2. Only returns of Retry are in client code - Specifically in src/cc/cc_shard.cpp lines 1511 and 1549, where it's returned when channel establishment fails (not a DataStore operation failure)
  3. Used for handling transient failures - The case statements in range_slice.cpp and checks in template files suggest it's meant for retryable operations

Conclusion:
DataStoreOpStatus::Retry appears to be dead code in terms of actual DataStore implementations, but it is used in client-side logic for handling network/channel failures. The two returns of Retry in cc_shard.cpp happen when GetCcNodeServiceChannel() fails, which is a network issue, not a storage issue.

Recommendation:
You can safely remove DataStoreOpStatus::Retry if you:

  1. Change the two returns in cc_shard.cpp to return Error instead
  2. Remove the case statements handling Retry in the other files
  3. Update the enum to only have Success = 0, Error

This would simplify the API and eliminate the confusion around when Retry should be used.

Comment thread src/cc/local_cc_handler.cpp
Comment thread src/tx_worker_pool.cpp Outdated
Comment thread include/cc/cc_req_misc.h Outdated
@lokax lokax force-pushed the scan-next-batch-cc branch 5 times, most recently from c1012f8 to 5837e90 Compare September 23, 2025 10:00
@liunyl

liunyl commented Sep 23, 2025

Copy link
Copy Markdown
Contributor

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Sep 23, 2025

Copy link
Copy Markdown
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@lokax lokax force-pushed the scan-next-batch-cc branch from 5837e90 to 927a681 Compare September 23, 2025 10:05
Comment thread include/cc/scan.h
@lokax lokax linked an issue Sep 23, 2025 that may be closed by this pull request
@lokax lokax merged commit 7ec291f into hash-part-scan Sep 26, 2025
4 checks passed
@coderabbitai coderabbitai Bot mentioned this pull request Oct 20, 2025
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Improve hash partition scan performance [Feature]: Refactor ScanNextBatchCc [Feature]: Refactor CcScanner and ScanCache

2 participants