read range info parallelly and batch read catalog#418
Conversation
WalkthroughThis pull request extends FetchTableRanges from single-partition to multi-partition scanning with per-partition state management and parallel ScanNext invocations. It also introduces catalog batch read processing as a distinct code path in transaction execution with special handling for catalog metadata reads. Changes
Sequence DiagramsequenceDiagram
participant Client
participant FetchTableRanges
participant KVPartition1 as KV Partition 1
participant KVPartition2 as KV Partition 2
participant Callback as Aggregation Callback
Client->>FetchTableRanges: Initiate FetchTableRanges
FetchTableRanges->>FetchTableRanges: Compute total_partitions<br/>Init partition_scan_states_
FetchTableRanges->>FetchTableRanges: Set remaining_partitions_
par Parallel Partition Scans
FetchTableRanges->>KVPartition1: ScanNext(partition_id=0, keys, session)
FetchTableRanges->>KVPartition2: ScanNext(partition_id=1, keys, session)
end
KVPartition1->>Callback: Return ranges for partition 0
Callback->>Callback: Acquire finish_mux_<br/>Decrement remaining_partitions_
Callback->>Callback: AppendTableRanges(0, ranges)
KVPartition2->>Callback: Return ranges for partition 1
Callback->>Callback: Acquire finish_mux_<br/>Decrement remaining_partitions_<br/>Check remaining == 0
Callback->>Callback: Merge results<br/>Handle synthetic ranges
Callback->>Client: SetFinish(merged_result)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tx_service/include/tx_request.h (1)
1346-1364:⚠️ Potential issue | 🟠 MajorReset
is_catalog_batch_inSet(...)to prevent stale request mode.At Line 1363 you reset
local_cache_checked_, butis_catalog_batch_is never reset/updated inSet(...). If this request object is reused, a previous catalog batch value can leak into later non-catalog reads.💡 Proposed fix
void Set(const TableName *tab_name, uint64_t schema_version, std::vector<ScanBatchTuple> &batch_read_pri, bool is_for_write = false, bool is_for_share = false, bool read_local = false, bool point_read_on_cache_miss = false, - uint64_t corresponding_sk_commit_ts = 0) + uint64_t corresponding_sk_commit_ts = 0, + bool is_catalog_batch = false) { tab_name_ = tab_name; read_batch_ = std::move(batch_read_pri); is_for_write_ = is_for_write; is_for_share_ = is_for_share; read_local_ = read_local; point_read_on_cache_miss_ = point_read_on_cache_miss; corresponding_sk_commit_ts_ = corresponding_sk_commit_ts; schema_version_ = schema_version; local_cache_checked_ = false; + is_catalog_batch_ = is_catalog_batch; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tx_service/include/tx_request.h` around lines 1346 - 1364, The Set(...) method for the request object (function Set in tx_request.h) resets local_cache_checked_ but fails to reset is_catalog_batch_, allowing a previous catalog batch flag to leak when the object is reused; update Set(...) to explicitly reset or set is_catalog_batch_ (e.g., is_catalog_batch_ = false or set based on tab_name/schema) alongside the other member assignments so each call initializes is_catalog_batch_ correctly.
🧹 Nitpick comments (2)
store_handler/data_store_service_client.cpp (1)
979-1011: Consider bounding scan fan-out to avoid large RPC bursts.At Line 979, this dispatches one
ScanNextper partition immediately. If partition count is high, this can spike RPC pressure and callback contention. A bounded in-flight window (semaphore/worker queue) would keep latency more stable under load.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@store_handler/data_store_service_client.cpp` around lines 979 - 1011, The loop currently issues a ScanNext for every partition unbounded, causing large RPC fan-out; limit concurrent in-flight ScanNext calls by introducing a bounded window (e.g., semaphore or fixed-size worker queue) when iterating total_partitions and dispatching ScanNext on fetch_cc->partition_scan_states_. Use a concurrency cap constant (e.g., max_in_flight_scans) and acquire before calling ScanNext (for symbols: ScanNext, FetchTableRangesCallback, partition_scan_states_, GetShardIdByPartitionId, fetch_cc), and release when the corresponding FetchTableRangesCallback completes or via a future/promise callback so new partitions are dispatched only as slots free up to keep RPC bursts bounded.store_handler/data_store_service_client_closure.cpp (1)
881-894: Consider adding a clarifying comment for the synthetic default range.When all partitions complete with empty results, a synthetic default range is inserted with
kv_part_id=0. While functionally correct (theddl_skip_kv_comment helps), consider adding a brief comment explaining whykv_part_id=0is used as the bucket key for this placeholder.📝 Suggested documentation improvement
if (fetch_range_cc->EmptyRanges()) { // When ddl_skip_kv_ is enabled and the range entry is not // physically ready, initializes the original range from // negative infinity to positive infinity. std::vector<txservice::InitRangeEntry> default_ranges; default_ranges.emplace_back( catalog_factory->NegativeInfKey(), txservice::Sequences::InitialRangePartitionIdOf( fetch_range_cc->table_name_), 1); - // Use kv_part_id 0 for the synthetic default range bucket. + // Use kv_part_id 0 as the bucket key for the synthetic default + // range. This is arbitrary since there's only one default entry. fetch_range_cc->AppendTableRanges(0, std::move(default_ranges)); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@store_handler/data_store_service_client_closure.cpp` around lines 881 - 894, Add a short clarifying comment immediately above the AppendTableRanges(0, ...) call in the if (fetch_range_cc->EmptyRanges()) block explaining that kv_part_id=0 is chosen as a sentinel/reserved bucket id for the synthetic default range (so it does not collide with real partition ids and ensures downstream bucket lookups treat this as a placeholder), and reference that this behavior is intentional when ddl_skip_kv_ is enabled and the original range is initialized from NegativeInfKey() and txservice::Sequences::InitialRangePartitionIdOf(fetch_range_cc->table_name_); keep the comment concise and colocated with the AppendTableRanges call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@store_handler/data_store_service_client_closure.cpp`:
- Around line 860-861: Multiple concurrent callbacks can call
fetch_range_cc->AppendTableRanges with the same kv_part_id and race on
partition_ranges_vec_[kv_part_id]; make AppendTableRanges thread-safe by
synchronizing access to partition_ranges_vec_ (e.g., add a mutex for the whole
vector or a per-partition mutex) and acquire the lock inside AppendTableRanges
before pushing/mutating the inner vector, or alternatively switch
partition_ranges_vec_ to a concurrent container; update any callers
(ScanNext/ScanNextClosure paths) to rely on the synchronized AppendTableRanges
so concurrent ScanNext callbacks no longer race.
In `@tx_service/src/tx_execution.cpp`:
- Around line 7590-7597: The catalog branch accesses
batch_read_op.hd_result_vec_[i] without checking its size; add the same safety
used in the non-catalog path by guarding that
batch_read_op.hd_result_vec_.size() is at least N (or adjust the loop to iterate
to std::min(N, batch_read_op.hd_result_vec_.size())) before any indexed access
to hd_result_vec_, and handle the shortfall (e.g., skip catalog-specific logic
or log/return an error) so accessing hd_result_vec_[i] cannot go out-of-bounds;
update the code around read_batch, N, and the loop that uses hd_result_vec_ to
implement this check.
---
Outside diff comments:
In `@tx_service/include/tx_request.h`:
- Around line 1346-1364: The Set(...) method for the request object (function
Set in tx_request.h) resets local_cache_checked_ but fails to reset
is_catalog_batch_, allowing a previous catalog batch flag to leak when the
object is reused; update Set(...) to explicitly reset or set is_catalog_batch_
(e.g., is_catalog_batch_ = false or set based on tab_name/schema) alongside the
other member assignments so each call initializes is_catalog_batch_ correctly.
---
Nitpick comments:
In `@store_handler/data_store_service_client_closure.cpp`:
- Around line 881-894: Add a short clarifying comment immediately above the
AppendTableRanges(0, ...) call in the if (fetch_range_cc->EmptyRanges()) block
explaining that kv_part_id=0 is chosen as a sentinel/reserved bucket id for the
synthetic default range (so it does not collide with real partition ids and
ensures downstream bucket lookups treat this as a placeholder), and reference
that this behavior is intentional when ddl_skip_kv_ is enabled and the original
range is initialized from NegativeInfKey() and
txservice::Sequences::InitialRangePartitionIdOf(fetch_range_cc->table_name_);
keep the comment concise and colocated with the AppendTableRanges call.
In `@store_handler/data_store_service_client.cpp`:
- Around line 979-1011: The loop currently issues a ScanNext for every partition
unbounded, causing large RPC fan-out; limit concurrent in-flight ScanNext calls
by introducing a bounded window (e.g., semaphore or fixed-size worker queue)
when iterating total_partitions and dispatching ScanNext on
fetch_cc->partition_scan_states_. Use a concurrency cap constant (e.g.,
max_in_flight_scans) and acquire before calling ScanNext (for symbols: ScanNext,
FetchTableRangesCallback, partition_scan_states_, GetShardIdByPartitionId,
fetch_cc), and release when the corresponding FetchTableRangesCallback completes
or via a future/promise callback so new partitions are dispatched only as slots
free up to keep RPC bursts bounded.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
store_handler/data_store_service_client.cppstore_handler/data_store_service_client_closure.cpptx_service/include/cc/cc_req_misc.htx_service/include/tx_request.htx_service/include/tx_util.htx_service/src/cc/cc_req_misc.cpptx_service/src/tx_execution.cpp
💤 Files with no reviewable changes (2)
- tx_service/src/cc/cc_req_misc.cpp
- tx_service/include/tx_util.h
| const int32_t kv_part_id = scan_next_closure->PartitionId(); | ||
| fetch_range_cc->AppendTableRanges(kv_part_id, std::move(range_vec)); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Search for AppendTableRanges definition and usage across the codebase
rg -n 'AppendTableRanges' --type cpp --type h -A 10 -B 2Repository: eloqdata/tx_service
Length of output: 4231
🏁 Script executed:
# Also search for the class/struct that contains AppendTableRanges
rg -n 'class.*fetch_range|struct.*fetch_range' --type cpp --type hRepository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Look at the broader context of the file to understand the callback execution model
head -n 20 store_handler/data_store_service_client_closure.cpp
wc -l store_handler/data_store_service_client_closure.cppRepository: eloqdata/tx_service
Length of output: 1020
🏁 Script executed:
# Search for partition_ranges_vec_ declaration in the header and cpp files
rg -n 'partition_ranges_vec_' --type cpp --type h -B 3 -A 3Repository: eloqdata/tx_service
Length of output: 5133
🏁 Script executed:
# Look for the FetchTableRangesCc class definition to understand data member protection
rg -n 'class FetchTableRangesCc' --type h -A 50Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Search for how scan callbacks are created and if multiple callbacks can handle the same partition
rg -n 'partition_scan_states_' --type cpp --type h -B 2 -A 5Repository: eloqdata/tx_service
Length of output: 3015
🏁 Script executed:
# Look for the callback spawning logic and partition assignment
rg -n 'scan_next_closure\|PartitionScanState' --type cpp --type h | head -40Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Check if there's any synchronization around partition_ranges_vec_ at the class level
rg -n 'class FetchTableRangesCc' --type h -A 100 | head -120Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Search for where callbacks are created and understand the execution model
rg -n 'scan_next_closure\|AsyncScan\|bthread::start_background' --type cpp -B 3 -A 3 | head -60Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Look at the structure to understand if partition is locked while scanning
rg -n 'remaining_partitions\|PartitionScanState' --type cpp -B 2 -A 2 | head -50Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Check if partition_ranges_vec_ is ever accessed from multiple threads for the same index
grep -n 'for.*partition' store_handler/data_store_service_client_closure.cpp | head -20Repository: eloqdata/tx_service
Length of output: 118
🏁 Script executed:
# Search for where the callbacks are spawned/created to understand partition assignment
rg -n 'for.*total_partitions\|spawn\|start_background' --type cpp store_handler/data_store_service_client.cpp -A 5 | head -80Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Look at the complete context around lines 860-870 to understand callback execution model
sed -n '830,920p' store_handler/data_store_service_client_closure.cppRepository: eloqdata/tx_service
Length of output: 3506
🏁 Script executed:
# Search for the callback class definition
rg -n 'class.*Closure\|struct.*Closure' --type h store_handler/ | grep -i scanRepository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Search for how scan callbacks are initiated and the execution pattern
rg -n 'ScanNext\|remaining_partitions_' store_handler/data_store_service_client.cpp -B 3 -A 3 | head -80Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Check if a partition can be processed by multiple concurrent callbacks
rg -n 'PartitionScanState\|remaining_partitions_' tx_service/include/cc/cc_req_misc.h -B 5 -A 5Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Understand the full class definition
sed -n '200,250p' tx_service/include/cc/cc_req_misc.hRepository: eloqdata/tx_service
Length of output: 1637
🏁 Script executed:
# Look at the initialization and understand how many callbacks per partition
sed -n '975,1010p' store_handler/data_store_service_client.cppRepository: eloqdata/tx_service
Length of output: 1306
🏁 Script executed:
# Check how the callbacks are spawned in a loop to understand partition-to-callback mapping
rg -n 'for.*remaining_partitions\|for.*total_partitions' store_handler/ --type cpp -A 10 | head -50Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Search for the callback spawn location and loop structure
rg -n 'ScanNext.*kv_part_id\|for.*kv_part_id' store_handler/ --type cpp -B 2 -A 5 | head -60Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Confirm the full pattern - are callbacks spawned sequentially in a loop for each partition?
sed -n '960,1020p' store_handler/data_store_service_client.cppRepository: eloqdata/tx_service
Length of output: 2215
🏁 Script executed:
# Double-check: does the callback handler receive the partition ID and only process that partition?
sed -n '820,870p' store_handler/data_store_service_client_closure.cppRepository: eloqdata/tx_service
Length of output: 1926
Data race: multiple callbacks can concurrently call AppendTableRanges with the same partition ID.
When a callback's batch is not complete, it spawns another ScanNext for the same partition (line 915), creating concurrent callbacks. Both can then call AppendTableRanges with the same kv_part_id, accessing partition_ranges_vec_[kv_part_id] simultaneously without synchronization, causing a data race on the inner vector's push_back.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@store_handler/data_store_service_client_closure.cpp` around lines 860 - 861,
Multiple concurrent callbacks can call fetch_range_cc->AppendTableRanges with
the same kv_part_id and race on partition_ranges_vec_[kv_part_id]; make
AppendTableRanges thread-safe by synchronizing access to partition_ranges_vec_
(e.g., add a mutex for the whole vector or a per-partition mutex) and acquire
the lock inside AppendTableRanges before pushing/mutating the inner vector, or
alternatively switch partition_ranges_vec_ to a concurrent container; update any
callers (ScanNext/ScanNextClosure paths) to rely on the synchronized
AppendTableRanges so concurrent ScanNext callbacks no longer race.
| std::vector<txservice::ScanBatchTuple> &read_batch = | ||
| batch_read_op.batch_read_tx_req_->read_batch_; | ||
| const size_t N = read_batch.size(); | ||
| for (size_t i = 0; i < N; ++i) | ||
| { | ||
| TxKey &key = read_batch[i].key_; | ||
| TxRecord &rec = *read_batch[i].record_; | ||
|
|
There was a problem hiding this comment.
Guard hd_result_vec_ size before indexed access in the catalog batch path.
This branch indexes batch_read_op.hd_result_vec_[i] without the size invariant check that exists in the non-catalog path. If sizes diverge, this can become out-of-bounds access.
💡 Proposed fix
std::vector<txservice::ScanBatchTuple> &read_batch =
batch_read_op.batch_read_tx_req_->read_batch_;
+ assert(batch_read_op.hd_result_vec_.size() == read_batch.size());
const size_t N = read_batch.size();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| std::vector<txservice::ScanBatchTuple> &read_batch = | |
| batch_read_op.batch_read_tx_req_->read_batch_; | |
| const size_t N = read_batch.size(); | |
| for (size_t i = 0; i < N; ++i) | |
| { | |
| TxKey &key = read_batch[i].key_; | |
| TxRecord &rec = *read_batch[i].record_; | |
| std::vector<txservice::ScanBatchTuple> &read_batch = | |
| batch_read_op.batch_read_tx_req_->read_batch_; | |
| assert(batch_read_op.hd_result_vec_.size() == read_batch.size()); | |
| const size_t N = read_batch.size(); | |
| for (size_t i = 0; i < N; ++i) | |
| { | |
| TxKey &key = read_batch[i].key_; | |
| TxRecord &rec = *read_batch[i].record_; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tx_service/src/tx_execution.cpp` around lines 7590 - 7597, The catalog branch
accesses batch_read_op.hd_result_vec_[i] without checking its size; add the same
safety used in the non-catalog path by guarding that
batch_read_op.hd_result_vec_.size() is at least N (or adjust the loop to iterate
to std::min(N, batch_read_op.hd_result_vec_.size())) before any indexed access
to hd_result_vec_, and handle the shortfall (e.g., skip catalog-specific logic
or log/return an error) so accessing hd_result_vec_[i] cannot go out-of-bounds;
update the code around read_batch, N, and the loop that uses hd_result_vec_ to
implement this check.
There was a problem hiding this comment.
Pull request overview
This PR improves metadata access performance by (1) fetching table range information from the data store concurrently across KV partitions and (2) adding a catalog-specific batch read path that uses ReadLocal on catalog_ccm_name.
Changes:
- Add
is_catalog_batch_toBatchReadTxRequestand route catalog batch reads through aReadLocal(catalog_ccm_name, ...)fast path. - Refactor
FetchTableRangesto scan all range-slice KV partitions in parallel and coordinate completion via shared state (remaining_partitions_,finish_mux_). - Minor whitespace cleanup in a couple of CC/utility files.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| tx_service/src/tx_execution.cpp | Adds catalog batch-read fast path and adjusts post-processing to treat catalog batches differently. |
| tx_service/src/cc/cc_req_misc.cpp | Whitespace-only cleanup. |
| tx_service/include/tx_util.h | Whitespace-only cleanup. |
| tx_service/include/tx_request.h | Extends BatchReadTxRequest with is_catalog_batch_. |
| tx_service/include/cc/cc_req_misc.h | Updates FetchTableRangesCc to hold per-partition scan state and synchronization fields. |
| store_handler/data_store_service_client_closure.cpp | Updates FetchTableRangesCallback to aggregate concurrent partition scans and finish once all partitions complete. |
| store_handler/data_store_service_client.cpp | Starts FetchTableRanges scans concurrently across all KV partitions and initializes shared completion state. |
Comments suppressed due to low confidence (2)
tx_service/include/tx_request.h:1375
BatchReadTxRequestnow addsis_catalog_batch_, butSet(...)does not reset/initialize this new flag. If aBatchReadTxRequestinstance is reused viaSet, it can retain a staleis_catalog_batch_value and take the wrong execution path. Consider extendingSet(...)to explicitly set/resetis_catalog_batch_(and/or document that the request must not be reused across catalog vs non-catalog batches).
bool point_read_on_cache_miss_;
uint64_t corresponding_sk_commit_ts_;
uint64_t schema_version_;
bool local_cache_checked_;
bool is_catalog_batch_; // true: batch ReadLocal on catalog_ccm_name
tx_service/src/tx_execution.cpp:7608
- In the
is_catalog_batch_fast-path, the loop indexesbatch_read_op.hd_result_vec_[i]without a size check/assert like the non-catalog path has later (assert(hd_result_vec_.size() == read_batch.size())). Ifread_batch_is resized afterReset()(it’s a non-const reference in the request), this can become an out-of-bounds access. Add an early assert/check thathd_result_vec_.size()matchesread_batch.size()before the loop (or iterate usinghd_result_vec_.size()).
batch_read_op.is_running_ = true;
std::vector<txservice::ScanBatchTuple> &read_batch =
batch_read_op.batch_read_tx_req_->read_batch_;
const size_t N = read_batch.size();
for (size_t i = 0; i < N; ++i)
{
TxKey &key = read_batch[i].key_;
TxRecord &rec = *read_batch[i].record_;
bool finished = cc_handler_->ReadLocal(
catalog_ccm_name,
key,
rec,
ReadType::Inside,
tx_number_.load(std::memory_order_relaxed),
tx_term_,
CommandId(),
start_ts_,
batch_read_op.hd_result_vec_[i],
IsolationLevel::RepeatableRead,
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Here are some reminders before you submit the pull request
fixes eloqdb/tx_service#issue_id./mtr --suite=mono_main,mono_multi,mono_basicSummary by CodeRabbit
New Features
Improvements