abort tx if ReadLocalOp errored#350
Conversation
WalkthroughReadLocalOperation::Forward no longer unconditionally aborts on ACQUIRE_KEY_LOCK_FAILED_FOR_RW_CONFLICT. It now scans the transaction read-set and only aborts if a conflicting read exists outside catalog/cluster-config CCMs; otherwise it resets state and retries the operation via txm->Process. Changes
Sequence Diagram(s)sequenceDiagram
participant ReadOp as ReadLocalOperation
participant TxMeta as TransactionMetadata (read-set)
participant TxMgr as TransactionManager (txm)
ReadOp->>TxMeta: inspect read-set entries on ACQUIRE_KEY_LOCK_FAILED_FOR_RW_CONFLICT
alt non-catalog/cluster-config conflict found
ReadOp->>TxMgr: proceed to abort/post-process
else only catalog/cluster-config entries found
ReadOp-->>ReadOp: reset hd_result_, clear execute_immediately_
ReadOp->>TxMgr: txm->Process(this) (retry same operation)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
tx_service/src/tx_operation.cpp (1)
304-313: ReadLocalOperation now cleanly hands off all error cases for abort; comment could be slightly clearerRouting all the asserted error codes directly to
txm->PostProcess(*this)with an immediatereturnremoves any retry path inForwardand matches the PR intent to abort the transaction onReadLocalOperrors. The only minor nit is that the// Abort tx.comment may look misleading at a glance, since the actual abort decision is centralized inPostProcess; if you touch this again, consider rewording it to something like “// Hand off to PostProcess; tx will be aborted for this error” to make that relationship explicit.
3a42e7f to
b5eb900
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tx_service/src/tx_operation.cpp
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
tx_service/src/tx_operation.cpp
🔇 Additional comments (1)
tx_service/src/tx_operation.cpp (1)
320-330: Verify CCM allow-list consistency for deadlock avoidanceThe
MetaDataReadSet()checks differ between similar operations: ReadLocalOperation (lines 323–324) only exemptscatalog_ccm_name_svandcluster_config_ccm_name_sv, while LockWriteRangeBucketsOp (lines 635–637) also exemptsrange_bucket_ccm_name_sv.ReadLocalOperation's purpose is to detect when a transaction already holds metadata locks before retrying a range read lock acquisition blocked by DDL. If the transaction has already acquired bucket-related locks, the current code will abort. Confirm whether this exclusion of
range_bucket_ccm_name_svfrom the allow-list is intentional or if it should match the pattern in LockWriteRangeBucketsOp.
| if (hd_result_->ErrorCode() == | ||
| CcErrorCode::ACQUIRE_KEY_LOCK_FAILED_FOR_RW_CONFLICT) | ||
| { | ||
| // If acquire range read lock blocked by DDL, check if tx has | ||
| // already acquired other range read lock. If so we need to | ||
| // abort tx since it might cause dead lock with range split. If | ||
| // this tx has not acquired any range read lock, we can safely | ||
| // retry here. | ||
| const auto &rset = txm->rw_set_.MetaDataReadSet(); | ||
| for (const auto &[cce_addr, read_entry_pair] : rset) | ||
| { | ||
| // Abort tx. | ||
| txm->PostProcess(*this); | ||
| return; | ||
| if (read_entry_pair.second != catalog_ccm_name_sv && | ||
| read_entry_pair.second != cluster_config_ccm_name_sv) | ||
| { | ||
| // Abort tx. | ||
| txm->PostProcess(*this); | ||
| return; | ||
| } | ||
| } | ||
| hd_result_->Value().Reset(); | ||
| hd_result_->Reset(); | ||
| execute_immediately_ = false; | ||
| txm->Process(*this); | ||
| } | ||
| else | ||
| { | ||
| // Abort the tx. | ||
| txm->PostProcess(*this); | ||
| } |
There was a problem hiding this comment.
Potential infinite retry / PR intent mismatch on RW-conflict path
The new branch retries on ACQUIRE_KEY_LOCK_FAILED_FOR_RW_CONFLICT when the tx only touched catalog/cluster-config. That seems to contradict the PR description (“abort tx if ReadLocalOp errored”) and can still loop forever if the blocking condition is persistent/long-lived (DDL stuck, misbehaving node, etc.). Consider adding an explicit bound (retry counter, elapsed-time guard, or backoff via ReRunOp) or updating the PR title/description to reflect the new “conditional abort” behavior.
Possible direction (bounded retry using existing retry/backoff)
if (hd_result_->ErrorCode() ==
CcErrorCode::ACQUIRE_KEY_LOCK_FAILED_FOR_RW_CONFLICT)
{
+ if (retry_num_ <= 0) {
+ txm->PostProcess(*this);
+ return;
+ }
...
- hd_result_->Value().Reset();
- hd_result_->Reset();
- execute_immediately_ = false;
- txm->Process(*this);
+ hd_result_->Value().Reset();
+ hd_result_->Reset();
+ execute_immediately_ = false;
+ ReRunOp(txm); // or equivalent backoff hook for ReadLocalOperation
+ return;
}| const auto &rset = txm->rw_set_.MetaDataReadSet(); | ||
| for (const auto &[cce_addr, read_entry_pair] : rset) | ||
| { | ||
| // Abort tx. | ||
| txm->PostProcess(*this); | ||
| return; | ||
| if (read_entry_pair.second != catalog_ccm_name_sv && | ||
| read_entry_pair.second != cluster_config_ccm_name_sv) | ||
| { | ||
| // Abort tx. | ||
| txm->PostProcess(*this); | ||
| return; | ||
| } | ||
| } | ||
| hd_result_->Value().Reset(); | ||
| hd_result_->Reset(); | ||
| execute_immediately_ = false; | ||
| txm->Process(*this); | ||
| } |
There was a problem hiding this comment.
Retry path likely needs the same “reset running-state + return” pattern used elsewhere
Here you reset hd_result_ and call txm->Process(*this) but (unlike LockWriteRangeBucketsOp::Forward) you don’t reset is_running_ and you don’t return after Process(). If Process() can synchronously drive the op (or assumes !is_running_), this risks re-entrancy or inconsistent scheduling. Aligning with the existing retry pattern would make this safer and more consistent.
Suggested mechanical alignment with existing retry pattern
hd_result_->Value().Reset();
hd_result_->Reset();
+ is_running_ = false;
execute_immediately_ = false;
txm->Process(*this);
+ return;b5eb900 to
544b15d
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @tx_service/src/tx_operation.cpp:
- Around line 312-335: In the ACQUIRE_KEY_LOCK_FAILED_FOR_RW_CONFLICT branch
inside ReadLocalOperation (where you reset hd_result_ via
hd_result_->Value().Reset(); hd_result_->Reset(); set execute_immediately_ =
false; and call txm->Process(*this)), also set is_running_ = false and add an
explicit return; immediately after txm->Process(*this) so the operation follows
the same restart/state pattern as LockWriteRangeBucketsOp::Forward() and avoids
continuing on with a stale running flag.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tx_service/src/tx_operation.cpp
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
tx_service/src/tx_operation.cpp
| if (hd_result_->ErrorCode() == | ||
| CcErrorCode::ACQUIRE_KEY_LOCK_FAILED_FOR_RW_CONFLICT) | ||
| { | ||
| // If acquire range read lock blocked by DDL, check if tx has | ||
| // already acquired other range read lock. If so we need to | ||
| // abort tx since it might cause dead lock with range split. If | ||
| // this tx has not acquired any range read lock, we can safely | ||
| // retry here. | ||
| const auto &rset = txm->rw_set_.MetaDataReadSet(); | ||
| for (const auto &[cce_addr, read_entry_pair] : rset) | ||
| { | ||
| // Abort tx. | ||
| txm->PostProcess(*this); | ||
| return; | ||
| if (read_entry_pair.second != catalog_ccm_name_sv && | ||
| read_entry_pair.second != cluster_config_ccm_name_sv) | ||
| { | ||
| // Abort tx. | ||
| txm->PostProcess(*this); | ||
| return; | ||
| } | ||
| } | ||
| hd_result_->Value().Reset(); | ||
| hd_result_->Reset(); | ||
| execute_immediately_ = false; | ||
| txm->Process(*this); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1. Find the ReadLocalOperation class definition and Forward method
echo "=== ReadLocalOperation class definition ==="
rg -n "class\s+ReadLocalOperation" -A 30 -t cpp -t h
# 2. Look for is_running_ usage in ReadLocalOperation
echo "=== is_running_ usage in ReadLocalOperation ==="
rg -n "is_running_" tx_service/src/tx_operation.cpp -B 2 -A 2 | head -100
# 3. Find LockWriteRangeBucketsOp::Forward to compare the pattern
echo "=== LockWriteRangeBucketsOp::Forward pattern ==="
rg -n "class\s+LockWriteRangeBucketsOp|LockWriteRangeBucketsOp::Forward" -A 50 -t cpp | head -150Repository: eloqdata/tx_service
Length of output: 106
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find header files in tx_service
echo "=== Looking for operation class definitions in headers ==="
fd -e h -e hpp . tx_service | head -20
# Search for ReadLocalOperation in all files
echo "=== ReadLocalOperation references ==="
rg "ReadLocalOperation" --type-list
# Broader search
echo "=== Searching for ReadLocalOperation ==="
rg "ReadLocalOperation" . --max-count 20
# Search for LockWriteRangeBucketsOp
echo "=== Searching for LockWriteRangeBucketsOp ==="
rg "LockWriteRangeBucketsOp" . --max-count 20Repository: eloqdata/tx_service
Length of output: 1317
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if the file exists and get line count
echo "=== File information ==="
wc -l tx_service/src/tx_operation.cpp
# Read around the line range mentioned (312-335)
echo "=== Code around lines 300-350 ==="
sed -n '300,350p' tx_service/src/tx_operation.cpp | cat -n
# Search for class definitions in this file
echo "=== Class definitions in tx_operation.cpp ==="
rg "^class\s+\w+" tx_service/src/tx_operation.cpp -n
# Look for ReadLocalOperation
echo "=== ReadLocalOperation in tx_operation.cpp ==="
rg "ReadLocalOperation" tx_service/src/tx_operation.cpp -n -B 5 -A 5Repository: eloqdata/tx_service
Length of output: 3404
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find ReadLocalOperation class definition
echo "=== ReadLocalOperation class definition ==="
rg "class ReadLocalOperation" -A 30 -t h
# Find LockWriteRangeBucketsOp::Forward
echo "=== LockWriteRangeBucketsOp::Forward ==="
rg "LockWriteRangeBucketsOp::Forward" tx_service/src/tx_operation.cpp -n -A 80 | head -150
# Check is_running_ usage in ReadLocalOperation
echo "=== is_running_ in ReadLocalOperation ==="
rg "is_running_" tx_service/src/tx_operation.cpp -n -B 2 -A 2 | grep -A 2 -B 2 "ReadLocalOperation"
# Find all is_running_ resets to see the pattern
echo "=== is_running_ reset patterns ==="
rg "is_running_\s*=\s*false" tx_service/src/tx_operation.cpp -n -B 5 -A 1 | head -100Repository: eloqdata/tx_service
Length of output: 106
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for ReadLocalOperation in all header files
echo "=== Looking for ReadLocalOperation in headers ==="
find tx_service/include -type f \( -name "*.h" -o -name "*.hpp" \) -exec grep -l "ReadLocalOperation" {} \;
# Try broader search
echo "=== ReadLocalOperation anywhere ==="
rg "ReadLocalOperation" tx_service/ --type cpp --type h | head -30
# Look for forward declarations or includes in tx_operation.cpp
echo "=== Includes in tx_operation.cpp ==="
head -100 tx_service/src/tx_operation.cpp | grep -E "#include|class.*Operation"
# Find LockWriteRangeBucketsOp in the file
echo "=== LockWriteRangeBucketsOp::Forward in tx_operation.cpp ==="
rg "void LockWriteRangeBucketsOp::Forward" tx_service/src/tx_operation.cpp -A 100 | head -150Repository: eloqdata/tx_service
Length of output: 6672
🏁 Script executed:
#!/bin/bash
set -euo pipefestyle
# Check ReadLocalOperation struct definition
echo "=== ReadLocalOperation struct definition ==="
sed -n '/^struct ReadLocalOperation/,/^}/p' tx_service/include/tx_operation.h | head -60
# Check if ReadLocalOperation has is_running_ member
echo "=== is_running_ in ReadLocalOperation ==="
rg "struct ReadLocalOperation" -A 40 tx_service/include/tx_operation.h
# Check LockWriteRangeBucketsOp for comparison
echo "=== LockWriteRangeBucketsOp struct definition ==="
rg "struct LockWriteRangeBucketsOp" -A 40 tx_service/include/tx_operation.h | head -60
# Verify the exact differences in allowlist
echo "=== Confirming allowlist in both operations ==="
rg "range_bucket_ccm_name_sv|catalog_ccm_name_sv|cluster_config_ccm_name_sv" tx_service/src/tx_operation.cpp -B 3 -A 3 | head -80Repository: eloqdata/tx_service
Length of output: 119
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check ReadLocalOperation struct definition
echo "=== ReadLocalOperation struct definition ==="
sed -n '/^struct ReadLocalOperation/,/^}/p' tx_service/include/tx_operation.h | head -60
# Check if ReadLocalOperation has is_running_ member
echo "=== is_running_ in ReadLocalOperation ==="
rg "struct ReadLocalOperation" -A 40 tx_service/include/tx_operation.h
# Check LockWriteRangeBucketsOp for comparison
echo "=== LockWriteRangeBucketsOp struct definition ==="
rg "struct LockWriteRangeBucketsOp" -A 40 tx_service/include/tx_operation.h | head -60
# Verify the exact differences in allowlist
echo "=== Confirming allowlist in both operations ==="
rg "range_bucket_ccm_name_sv|catalog_ccm_name_sv|cluster_config_ccm_name_sv" tx_service/src/tx_operation.cpp -B 3 -A 3 | head -80Repository: eloqdata/tx_service
Length of output: 7062
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check TransactionOperation base class
echo "=== TransactionOperation base class ==="
rg "struct TransactionOperation|class TransactionOperation" tx_service/include/tx_operation.h -A 20 | head -50
# Check if is_running_ is inherited from base
echo "=== is_running_ definition ==="
rg "bool is_running_" tx_service/include/tx_operation.h
# Check TransactionExecution::Process signature
echo "=== TransactionExecution::Process for ReadLocalOperation ==="
rg "void.*Process.*ReadLocalOperation" tx_service/include/tx_execution.h -A 2
# Look at the actual implementation of Process
echo "=== Process implementation pattern ==="
rg "void TransactionExecution::Process\(ReadLocalOperation" tx_service/src/tx_execution.cpp -A 20 | head -50Repository: eloqdata/tx_service
Length of output: 2145
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Full TransactionOperation definition to confirm is_running_ location
echo "=== Full TransactionOperation struct ==="
rg "struct TransactionOperation" tx_service/include/tx_operation.h -A 40
# Check ReadLocalOperation::Forward to see if it checks is_running_
echo "=== ReadLocalOperation::Forward full implementation ==="
sed -n '292,360p' tx_service/src/tx_operation.cpp
# Check if there's any retry budget or backoff mechanism
echo "=== Retry budget/backoff mechanisms ==="
rg "retry|backoff|max_retry|timeout" tx_service/src/tx_operation.cpp -i -n | head -30
# Compare the two operations' Forward patterns side-by-side
echo "=== LockWriteRangeBucketsOp initial is_running_ check ==="
sed -n '/void LockWriteRangeBucketsOp::Forward/,/^}/p' tx_service/src/tx_operation.cpp | head -80Repository: eloqdata/tx_service
Length of output: 7993
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check how Process() sets is_running_
echo "=== How Process sets is_running_ ==="
rg "is_running_\s*=" tx_service/src/tx_execution.cpp -B 3 -A 1 | head -100
# Look at the exact sequence in TransactionExecution
echo "=== TransactionExecution Process methods pattern ==="
rg "void TransactionExecution::Process\(" tx_service/src/tx_execution.cpp -n | head -20
# Find one Process implementation to see if it sets is_running_
echo "=== One Process implementation example ==="
rg "void TransactionExecution::Process\(LockWriteRangeBucketsOp" tx_service/src/tx_execution.cpp -A 20 | head -40
# Check if there's a pattern of setting is_running_ before Process
echo "=== Setting is_running_ before calling Process ==="
rg "is_running_\s*=\s*true" tx_service/src/tx_execution.cpp -B 2 -A 2Repository: eloqdata/tx_service
Length of output: 4227
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check retry_num_ usage in the retry logic
echo "=== retry_num_ in conflict handling ==="
rg "ACQUIRE_KEY_LOCK_FAILED_FOR_RW_CONFLICT" tx_service/src/tx_operation.cpp -B 5 -A 30 | grep -E "retry_num_|ACQUIRE_KEY_LOCK_FAILED_FOR_RW_CONFLICT|is_running_"
# Check if ReadLocalOperation should use retry_num_
echo "=== retry_num_ usage patterns in other operations ==="
rg "retry_num_" tx_service/src/tx_operation.cpp -n | head -30
# Verify one more time what happens when Process() is called on an already-finished operation
echo "=== What Forward does when operation is already running ==="
sed -n '292,360p' tx_service/src/tx_operation.cpp | grep -E "is_running_|if.*IsFinished|Forward|Process"Repository: eloqdata/tx_service
Length of output: 2021
Add is_running_ reset and return; statement to match the established pattern in similar retry paths.
In the ACQUIRE_KEY_LOCK_FAILED_FOR_RW_CONFLICT branch, after resetting hd_result_ and calling txm->Process(*this), you should reset is_running_ and add an explicit return;. This matches the pattern established in LockWriteRangeBucketsOp::Forward() and ensures proper state management for the operation's restart.
The allowlist in this method correctly exempts only catalog_ccm_name_sv and cluster_config_ccm_name_sv—unlike LockWriteRangeBucketsOp, ReadLocalOperation does not deal with bucket-level locks, so excluding range_bucket_ccm_name_sv is appropriate.
Proposed fix
if (hd_result_->ErrorCode() ==
CcErrorCode::ACQUIRE_KEY_LOCK_FAILED_FOR_RW_CONFLICT)
{
// ...
const auto &rset = txm->rw_set_.MetaDataReadSet();
for (const auto &[cce_addr, read_entry_pair] : rset)
{
if (read_entry_pair.second != catalog_ccm_name_sv &&
read_entry_pair.second != cluster_config_ccm_name_sv)
{
// Abort tx.
txm->PostProcess(*this);
return;
}
}
hd_result_->Value().Reset();
hd_result_->Reset();
+ is_running_ = false;
execute_immediately_ = false;
txm->Process(*this);
+ return;
}🤖 Prompt for AI Agents
In @tx_service/src/tx_operation.cpp around lines 312 - 335, In the
ACQUIRE_KEY_LOCK_FAILED_FOR_RW_CONFLICT branch inside ReadLocalOperation (where
you reset hd_result_ via hd_result_->Value().Reset(); hd_result_->Reset(); set
execute_immediately_ = false; and call txm->Process(*this)), also set
is_running_ = false and add an explicit return; immediately after
txm->Process(*this) so the operation follows the same restart/state pattern as
LockWriteRangeBucketsOp::Forward() and avoids continuing on with a stale running
flag.
ReadLocalOp shouldn't keep retrying indefinitely as the error could be permanant.
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.