Skip to content

add compaction queue, fix CkptTsCc hang bug#483

Merged
thweetkomputer merged 4 commits into
mainfrom
update-zc
May 25, 2026
Merged

add compaction queue, fix CkptTsCc hang bug#483
thweetkomputer merged 4 commits into
mainfrom
update-zc

Conversation

@thweetkomputer

@thweetkomputer thweetkomputer commented May 22, 2026

Copy link
Copy Markdown
Collaborator

Here are some reminders before you submit the pull request

  • Add tests for the change
  • Document changes
  • Reference the link of issue using fixes eloqdb/tx_service#issue_id
  • Reference the link of RFC if exists
  • Pass ./mtr --suite=mono_main,mono_multi,mono_basic

Summary by CodeRabbit

  • New Features
    • Retrieve approximate key counts from the data store for operational monitoring.
    • Trigger manual data store compaction to optimize storage and reclaim space.
  • Refactor
    • Background compaction worker added for asynchronous compaction and improved reliability.
    • Checkpoint completion wait mechanism made more robust for better synchronization.

Review Change Stack

@coderabbitai

coderabbitai Bot commented May 22, 2026

Copy link
Copy Markdown

Caution

Review failed

Pull request was closed or merged during review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 10b0bda8-8c21-4c47-a6af-5270007990d7

📥 Commits

Reviewing files that changed from the base of the PR and between 86a460e and fe167b1.

📒 Files selected for processing (1)
  • tx_service/include/cc/cc_request.h

Walkthrough

Adds distributed approximate key-count and compaction APIs (contracts → proto → service → RocksDB → client aggregation) and refactors CkptTsCc synchronization to use an atomic completion counter with a polling Wait().

Changes

Store Key Count and Compaction

Layer / File(s) Summary
DataStoreHandler base contracts
tx_service/include/store/data_store_handler.h
DataStoreHandler adds virtual ApproxStoreKeyCount() and CompactStore() with default stubs.
DataStore abstract base
store_handler/eloq_data_store_service/data_store.h
DataStore adds the same two virtual methods with inline defaults returning 0 and false.
Proto RPC service and messages
store_handler/eloq_data_store_service/ds_request.proto
DataStoreRpcService defines GetApproxStoreKeyCount and CompactStore RPCs plus request/response messages (shard_id, CommonResult, key_count).
DataStoreService RPC handlers
store_handler/eloq_data_store_service/data_store_service.h, .../data_store_service.cpp
Implements GetApproxStoreKeyCount and CompactStore (RPC and shard helpers), validating ownership and shard-open state before delegating to DataStore.
RocksDB compaction implementation
store_handler/eloq_data_store_service/rocksdb_data_store_common.h, .../rocksdb_data_store_common.cpp
Adds compact_worker_pool_ and compact_running_; implements ApproxStoreKeyCount() via rocksdb property and CompactStore() submitting async CompactRange() guarded by CAS.
DataStoreServiceClient aggregation
store_handler/data_store_service_client.h, .../data_store_service_client.cpp
Aggregates key counts and compaction across local and remote shards, issuing remote RPCs with 60s timeouts and combining results.

CkptTsCc Checkpoint Synchronization Refactor

Layer / File(s) Summary
Atomic counter and storage refactor
tx_service/include/cc/cc_request.h
Replace mutex/cv with std::atomic<size_t> unfinish_cnt_, change heap_full_vec_ to std::vector<int8_t>, and use atomic fetch_sub in Execute().
Wait() polling implementation
tx_service/src/cc/cc_request.cpp
Implement CkptTsCc::Wait() to poll unfinish_cnt_ with exponential-backoff sleeps until zero.

Sequence Diagram

sequenceDiagram
  participant Client as DataStoreServiceClient
  participant Service as DataStoreService
  participant RocksDB as RocksDBDataStoreCommon
  participant Pool as WorkerPool
  Client->>Service: Request per-shard operation
  Service->>RocksDB: Call ApproxStoreKeyCount or CompactStore
  RocksDB->>RocksDB: Check DB open / read property or CAS compact_running
  RocksDB->>Pool: Submit async compaction task
  Pool->>RocksDB: Run db.CompactRange and reset compact_running
  Service-->>Client: Return result / CommonResult
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Suggested reviewers

  • liunyl

Poem

🐰 I hopped through shards both near and far,
Counting keys beneath each data star,
I nudged the rocks to tidy their rooms,
And swapped the locks for atomic booms —
Now compaction hums and checkpoints bloom!

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description contains only a submission checklist with no actual implementation details, rationale, or description of changes. All checkboxes are unchecked, indicating none of the required documentation steps have been completed. Fill in the description with implementation details explaining the compaction queue feature, CkptTsCc hang fix, affected components, and testing status. Check off or complete the checklist items.
Docstring Coverage ⚠️ Warning Docstring coverage is 2.86% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title partially relates to the changeset. It mentions 'CkptTsCc hang bug' (addressed in cc_request.cpp/h) and 'compaction queue', but the changes show two distinct feature additions: store compaction/key counting APIs across multiple files, and a CkptTsCc synchronization refactoring. The title emphasizes the bug fix but under-represents the primary scope of compaction feature additions.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch update-zc

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@store_handler/data_store_service_client.cpp`:
- Around line 618-621: The code currently treats remote failures (checking
cntl.Failed() || resp.result().error_code() != remote::DataStoreError::NO_ERROR)
as an immediate continue/skip which ignores transient owner-change errors;
update the handling in the blocks around these checks (the if at cntl/Fault and
the similar block near line 660) to call HandleShardingError(cntl, resp,
/*context info as needed*/) and use retry_limit_ to retry the request when
HandleShardingError indicates a shard-map/owner-change transient error, only
falling back to continue/fail after retries are exhausted or when
HandleShardingError deems the error permanent; ensure you reference the same
control objects (cntl, resp) and respect retry_limit_ and existing retry/backoff
semantics so behavior matches other callers that already use
HandleShardingError.

In `@tx_service/include/cc/cc_request.h`:
- Around line 3033-3040: The decrement of unfinish_cnt_ currently uses
fetch_sub(..., std::memory_order_release) which only synchronizes with the final
decrement and can leave earlier shard writes (memory_allocated_vec_,
memory_committed_vec_, heap_full_vec_, total_key_cnt_vec_, dirty_key_cnt_vec_,
standby_msg_seq_id_vec_, subscribed_node_ids_) not visible to the waiter; update
the fetch_sub call on unfinish_cnt_ to use std::memory_order_acq_rel (or
otherwise establish an acquire-release barrier) so each shard's writes performed
before fetch_sub are released and become visible to the thread that does the
corresponding acquire (e.g., the waiter in ckpt_req.Wait()); keep the decrement
on the same unfinish_cnt_ variable and use ccs.LocalCoreId() references
unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 06e0baf9-af54-4556-bd13-a66f7618f34f

📥 Commits

Reviewing files that changed from the base of the PR and between 64554e4 and 86a460e.

📒 Files selected for processing (11)
  • store_handler/data_store_service_client.cpp
  • store_handler/data_store_service_client.h
  • store_handler/eloq_data_store_service/data_store.h
  • store_handler/eloq_data_store_service/data_store_service.cpp
  • store_handler/eloq_data_store_service/data_store_service.h
  • store_handler/eloq_data_store_service/ds_request.proto
  • store_handler/eloq_data_store_service/rocksdb_data_store_common.cpp
  • store_handler/eloq_data_store_service/rocksdb_data_store_common.h
  • tx_service/include/cc/cc_request.h
  • tx_service/include/store/data_store_handler.h
  • tx_service/src/cc/cc_request.cpp

Comment on lines +618 to +621
if (cntl.Failed() ||
resp.result().error_code() != remote::DataStoreError::NO_ERROR)
{
continue;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Handle owner-change errors with retry before skipping/failing.

At Line 618 and Line 660, remote failures are handled as immediate skip/fail. During topology changes (REQUESTED_NODE_NOT_OWNER), this can produce partial key counts or false compaction failure even though a retry after shard-map refresh would succeed. Reuse HandleShardingError(...) and retry_limit_ here.

🔧 Suggested fix sketch
 for (uint32_t shard_id : shard_ids)
 {
+    bool shard_ok = false;
+    for (int retry = 0; retry <= retry_limit_; ++retry)
+    {
         uint32_t node_index = GetOwnerNodeIndexOfShard(shard_id);
         auto *channel = dss_nodes_[node_index].Channel();
         if (channel == nullptr)
         {
-            continue;
+            break;
         }

         remote::DataStoreRpcService_Stub stub(channel);
         brpc::Controller cntl;
         cntl.set_timeout_ms(60000);

         // ... issue RPC ...

-        if (cntl.Failed() ||
-            resp.result().error_code() != remote::DataStoreError::NO_ERROR)
-        {
-            continue; // or success = false
-        }
+        if (!cntl.Failed() &&
+            resp.result().error_code() == remote::DataStoreError::NO_ERROR)
+        {
+            // apply resp payload
+            shard_ok = true;
+            break;
+        }
+        if (!cntl.Failed() &&
+            resp.result().error_code() ==
+                remote::DataStoreError::REQUESTED_NODE_NOT_OWNER)
+        {
+            HandleShardingError(resp.result());
+        }
+    }
+    if (!shard_ok)
+    {
+        // ApproxStoreKeyCount: log partial-result warning
+        // CompactStore: success = false
+    }
 }

Also applies to: 660-663

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@store_handler/data_store_service_client.cpp` around lines 618 - 621, The code
currently treats remote failures (checking cntl.Failed() ||
resp.result().error_code() != remote::DataStoreError::NO_ERROR) as an immediate
continue/skip which ignores transient owner-change errors; update the handling
in the blocks around these checks (the if at cntl/Fault and the similar block
near line 660) to call HandleShardingError(cntl, resp, /*context info as
needed*/) and use retry_limit_ to retry the request when HandleShardingError
indicates a shard-map/owner-change transient error, only falling back to
continue/fail after retries are exhausted or when HandleShardingError deems the
error permanent; ensure you reference the same control objects (cntl, resp) and
respect retry_limit_ and existing retry/backoff semantics so behavior matches
other callers that already use HandleShardingError.

Comment thread tx_service/include/cc/cc_request.h Outdated
@thweetkomputer thweetkomputer merged commit f25308c into main May 25, 2026
3 of 4 checks passed
@thweetkomputer thweetkomputer deleted the update-zc branch May 25, 2026 01:58
liunyl added a commit that referenced this pull request Jun 13, 2026
…aiters

Same bug class as the CkptTsCc hang fixed in #483: a bthread::Mutex /
ConditionVariable shared between CcRequest::Execute() (which runs on the
brpc worker main stack via the tx processor module) and a bthread waiter
can deadlock. When the worker main stack sleeps in the mutex's butex
queue as a pthread waiter, an unlock may route the wake to a bthread
bound to that same blocked worker; the bthread is parked in the worker's
bound run queue and can never be scheduled, so both sides hang forever.
Timeouts (wait_for) do not help because no timer remains pending in the
end state.

Convert the affected requests to the same pattern as the CkptTsCc fix:
shard-side completion is tracked with atomics and the waiter polls with
bthread_usleep exponential backoff.

- ActiveTxMaxTsCc, ClearCcNodeGroup, EscalateStandbyCcmCc, ClearTxCc:
  drop the mutex/cv, use an atomic counter (plus a done flag for
  ClearCcNodeGroup so the waiter returns only after the last core's
  cleanup).
- WaitableCc: blocking Wait() now polls; IsFinished/IsError/ErrorCode
  read atomics. mux_ is kept solely for the coroutine yield/resume
  handshake, whose waiter is a dedicated flush-worker thread, so bthread
  owners never touch the butex.
- DbSizeCc / RemoteDbSizeCc: atomic ref counts; Wait() keeps the
  give-up-on-remote-after-2s semantics. remote_ref_cnt_ is decremented
  before total_ref_cnt_ so the waiter never mistakes a pending local
  core for a missing remote response.
- UploadBatchCc: the external coordination state (req_mux/req_cv/
  finished_req_cnt/req_result/ng_term) becomes std::atomic references;
  ValidTermCheck uses a CAS instead of locking a mutex that lives on the
  owner bthread's stack. The CcNodeService::UploadBatch handler and the
  sk_generator upload path (SendIndexes, UploadIndexInternal, and the
  UploadBatchClosure callback, which runs on a bthread) are converted
  accordingly.

Requests with a single in-flight execution and a single waiter
(CheckTxStatusCc, WaitNoNakedBucketRefCc, UploadRangeSlicesCc,
UploadBatchSlicesCc) are structurally safe and left unchanged, as are
paths whose waiter is a dedicated std::thread (UpdateCceCkptTsCc, the
flush-data coroutine path).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
liunyl added a commit that referenced this pull request Jun 14, 2026
…aiters (#491)

* fix: avoid bthread mutex deadlock between tx processors and bthread waiters

Same bug class as the CkptTsCc hang fixed in #483: a bthread::Mutex /
ConditionVariable shared between CcRequest::Execute() (which runs on the
brpc worker main stack via the tx processor module) and a bthread waiter
can deadlock. When the worker main stack sleeps in the mutex's butex
queue as a pthread waiter, an unlock may route the wake to a bthread
bound to that same blocked worker; the bthread is parked in the worker's
bound run queue and can never be scheduled, so both sides hang forever.
Timeouts (wait_for) do not help because no timer remains pending in the
end state.

Convert the affected requests to the same pattern as the CkptTsCc fix:
shard-side completion is tracked with atomics and the waiter polls with
bthread_usleep exponential backoff.

- ActiveTxMaxTsCc, ClearCcNodeGroup, EscalateStandbyCcmCc, ClearTxCc:
  drop the mutex/cv, use an atomic counter (plus a done flag for
  ClearCcNodeGroup so the waiter returns only after the last core's
  cleanup).
- WaitableCc: blocking Wait() now polls; IsFinished/IsError/ErrorCode
  read atomics. mux_ is kept solely for the coroutine yield/resume
  handshake, whose waiter is a dedicated flush-worker thread, so bthread
  owners never touch the butex.
- DbSizeCc / RemoteDbSizeCc: atomic ref counts; Wait() keeps the
  give-up-on-remote-after-2s semantics. remote_ref_cnt_ is decremented
  before total_ref_cnt_ so the waiter never mistakes a pending local
  core for a missing remote response.
- UploadBatchCc: the external coordination state (req_mux/req_cv/
  finished_req_cnt/req_result/ng_term) becomes std::atomic references;
  ValidTermCheck uses a CAS instead of locking a mutex that lives on the
  owner bthread's stack. The CcNodeService::UploadBatch handler and the
  sk_generator upload path (SendIndexes, UploadIndexInternal, and the
  UploadBatchClosure callback, which runs on a bthread) are converted
  accordingly.

Requests with a single in-flight execution and a single waiter
(CheckTxStatusCc, WaitNoNakedBucketRefCc, UploadRangeSlicesCc,
UploadBatchSlicesCc) are structurally safe and left unchanged, as are
paths whose waiter is a dedicated std::thread (UpdateCceCkptTsCc, the
flush-data coroutine path).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix: address CodeRabbit review comments

- WaitableCc: latch the first error in AbortCcRequest with a CAS and
  stop storing NO_ERROR on every successful Execute, so a later
  success on another core can no longer erase an earlier failure.
- sk_generator: same latch-first-error contract for the shared
  res_code in SendIndexes (channel failure path and the
  UploadBatchClosure callback), mirroring UploadBatchCc::SetError.
- DbSizeCc: back total_obj_sizes_ with real std::atomic<int64_t>
  storage instead of reinterpret_cast on a std::vector<int64_t>
  (undefined behavior per the C++ atomic object model), and make
  term_ atomic since AddRemoteObjSize races IncTerm lock-free.
- Backoff loops: cap the polling interval at max_interval instead of
  stopping one doubling short of it (also fixes the pre-existing
  CkptTsCc::Wait).
- Fix "Waitting" typo in the dbsize timeout warning log.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* style: apply clang-format-18

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* test: add WaitableCc completion-counter regression tests

Guards the atomic completion counter + first-error latching that replaced the
bthread::Mutex + ConditionVariable handshake in the waitable CC requests
(ClearTxCc / ActiveTxMaxTsCc / EscalateStandbyCcmCc / DbSizeCc /
ClearCcNodeGroup share the same atomic-counter + polling Wait()). WaitableCc is
the generic carrier of that pattern.

Covers: concurrent multi-core completion (Wait() returns, no lost decrement),
first-error-wins latching (a later success must not clear it), Reset re-arming,
and a repeated-rounds stress loop. A watchdog aborts with a clear message if
Wait() hangs (the deadlock the fix prevents).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants