eventstore: fix checkpoint update race#5019
Conversation
|
Skipping CI for Draft Pull Request. |
📝 WalkthroughWalkthroughDispatcher checkpoint becomes atomic; dispatcher updates use monotonic compare-and-increase. Subscription checkpoint advancement is rewritten as a CAS loop reading dispatcher.Load(), emitting updates only on successful CAS. Iterator EndKey is now exclusive. Uploader ignores stale subscription updates. Tests add a concurrency case and use comparable keys. ChangesSubscription Checkpoint Race Condition and Iterator Boundary Fix
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning Tools execution failed with the following error: Failed to run tools: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error) Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request optimizes the event store by implementing SST file filtering based on transaction commit timestamp ranges, allowing Pebble to skip irrelevant files during scans. It refactors key encoding and decoding logic, introduces shared Pebble caches with proper lifecycle management, and improves concurrency by using atomic types for checkpoint timestamps. Additionally, it refines the handling of subscription state updates to ensure monotonic advancement. Feedback was provided to improve the clarity of a log message regarding stale state updates.
926bb42 to
4bf750b
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@logservice/eventstore/event_store.go`:
- Around line 715-716: The dispatcher checkpoint write at
dispatcherStat.checkpointTs.Store(checkpointTs) can regress a newer
per-dispatcher value; change the update to only advance the per-dispatcher
checkpoint if checkpointTs is greater than the currently stored value by reading
dispatcherStat.checkpointTs (via its Load or atomic read) and using an atomic
compare-and-swap loop to store the new checkpoint only when it is strictly
larger (i.e., retry CAS until success or current >= checkpointTs) so
dispatcherStat.checkpointTs never moves backward and min recomputation reflects
true progress.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fd2ec042-f448-4c1a-ba09-be650be4b428
📒 Files selected for processing (2)
logservice/eventstore/event_store.gologservice/eventstore/event_store_test.go
…store001 # Conflicts: # logservice/eventstore/event_store.go
There was a problem hiding this comment.
♻️ Duplicate comments (1)
logservice/eventstore/event_store.go (1)
727-727:⚠️ Potential issue | 🟠 Major | ⚡ Quick winKeep the per-dispatcher checkpoint monotonic too.
The CAS on
subStat.checkpointTsprevents the subscription value from moving backward, but Line 727 can still overwrite a newer dispatcher checkpoint with a stale one. After that, the next min recomputation can stay pinned below real progress, so GC and uploaded subscription state stop advancing until this dispatcher reports again.Suggested fix
- dispatcherStat.checkpointTs.Store(checkpointTs) + util.CompareAndMonotonicIncrease(&dispatcherStat.checkpointTs, checkpointTs)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@logservice/eventstore/event_store.go` at line 727, The dispatcher checkpoint write at dispatcherStat.checkpointTs.Store(checkpointTs) can stomp a newer value; change it to a monotonic CAS update: load the current dispatcher checkpoint (dispatcherStat.checkpointTs), and only attempt an atomic compare-and-swap to set it to checkpointTs if checkpointTs is greater than the loaded value (repeat as needed to handle races). Ensure the same monotonicity semantics as subStat.checkpointTs (i.e., never move backward) so that dispatcherStat.checkpointTs only advances when checkpointTs > current.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@logservice/eventstore/event_store.go`:
- Line 727: The dispatcher checkpoint write at
dispatcherStat.checkpointTs.Store(checkpointTs) can stomp a newer value; change
it to a monotonic CAS update: load the current dispatcher checkpoint
(dispatcherStat.checkpointTs), and only attempt an atomic compare-and-swap to
set it to checkpointTs if checkpointTs is greater than the loaded value (repeat
as needed to handle races). Ensure the same monotonicity semantics as
subStat.checkpointTs (i.e., never move backward) so that
dispatcherStat.checkpointTs only advances when checkpointTs > current.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 1a62be06-10f1-409a-ab7d-1830a0b00d41
📒 Files selected for processing (2)
logservice/eventstore/event_store.gologservice/eventstore/event_store_test.go
|
/gemini review |
|
@coderabbitai review |
|
/gemini summary |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Code Review
This pull request refactors checkpointTs in dispatcherStat to use atomic.Uint64 to prevent race conditions during concurrent updates, and updates related logic to use atomic operations. It also replaces a panic with a warning when handling stale subscription state updates and fixes a boundary check in eventStoreIter.Next to use an exclusive upper bound. A new concurrent test is added to verify these changes. The reviewer suggested using CompareAndMonotonicIncrease instead of CompareAndIncrease to avoid executing downstream logic unnecessarily when the checkpoint timestamp does not actually advance.
Summary of ChangesThis pull request addresses race conditions in the eventstore's checkpoint update mechanism. By transitioning key checkpoint fields to atomic types and implementing monotonic update logic, the system now safely handles concurrent updates and prevents regression of checkpoint timestamps. Additionally, the change refines iterator boundary conditions and improves error handling for stale state updates to ensure consistent progress tracking. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Activity
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@logservice/eventstore/event_store_test.go`:
- Around line 958-965: After wg.Wait() add a deterministic assertion that
subStat.checkpointTs.Load() equals 100 before advancing resolvedTs to ensure
stale concurrent updates didn't move the subscription checkpoint; specifically,
right after close(startCh) and wg.Wait(), assert subStat.checkpointTs.Load() ==
100, then proceed to call subStat.resolvedTs.Store(900) and
store.UpdateDispatcherCheckpointTs(dispatcherID2, 900).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 18d7edca-a0d8-4fd0-acdb-257f2b3492da
📒 Files selected for processing (2)
logservice/eventstore/event_store.gologservice/eventstore/event_store_test.go
| close(startCh) | ||
| wg.Wait() | ||
|
|
||
| require.Equal(t, uint64(900), stat1.checkpointTs.Load()) | ||
|
|
||
| subStat.resolvedTs.Store(900) | ||
| store.UpdateDispatcherCheckpointTs(dispatcherID2, 900) | ||
| require.Equal(t, uint64(900), subStat.checkpointTs.Load()) |
There was a problem hiding this comment.
Assert stale concurrent updates did not advance the subscription checkpoint.
Right after wg.Wait(), add an assertion that subStat.checkpointTs is still 100 before resolvedTs is advanced. This closes a gap where premature checkpoint movement could regress without being caught.
Suggested patch
close(startCh)
wg.Wait()
require.Equal(t, uint64(900), stat1.checkpointTs.Load())
+ require.Equal(t, uint64(100), subStat.checkpointTs.Load())
subStat.resolvedTs.Store(900)
store.UpdateDispatcherCheckpointTs(dispatcherID2, 900)
require.Equal(t, uint64(900), subStat.checkpointTs.Load())As per coding guidelines, "Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| close(startCh) | |
| wg.Wait() | |
| require.Equal(t, uint64(900), stat1.checkpointTs.Load()) | |
| subStat.resolvedTs.Store(900) | |
| store.UpdateDispatcherCheckpointTs(dispatcherID2, 900) | |
| require.Equal(t, uint64(900), subStat.checkpointTs.Load()) | |
| close(startCh) | |
| wg.Wait() | |
| require.Equal(t, uint64(900), stat1.checkpointTs.Load()) | |
| require.Equal(t, uint64(100), subStat.checkpointTs.Load()) | |
| subStat.resolvedTs.Store(900) | |
| store.UpdateDispatcherCheckpointTs(dispatcherID2, 900) | |
| require.Equal(t, uint64(900), subStat.checkpointTs.Load()) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@logservice/eventstore/event_store_test.go` around lines 958 - 965, After
wg.Wait() add a deterministic assertion that subStat.checkpointTs.Load() equals
100 before advancing resolvedTs to ensure stale concurrent updates didn't move
the subscription checkpoint; specifically, right after close(startCh) and
wg.Wait(), assert subStat.checkpointTs.Load() == 100, then proceed to call
subStat.resolvedTs.Store(900) and
store.UpdateDispatcherCheckpointTs(dispatcherID2, 900).
| if newCheckpointTs < oldCheckpointTs { | ||
| return | ||
| } | ||
| if !subStat.checkpointTs.CompareAndSwap(oldCheckpointTs, newCheckpointTs) { |
There was a problem hiding this comment.
Can this cause the cdc to get stuck?
There was a problem hiding this comment.
As the checkpoint only increases, the loop will either succeed or terminate when the condition newCheckpointTs <= oldCheckpointTs is met.
There was a problem hiding this comment.
It seems impossible for the swap to fail here. I think that the log needs to be added
| if newCheckpointTs < oldCheckpointTs { | ||
| return | ||
| } | ||
| if !subStat.checkpointTs.CompareAndSwap(oldCheckpointTs, newCheckpointTs) { |
There was a problem hiding this comment.
It seems impossible for the swap to fail here. I think that the log needs to be added
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3AceShowHand, wk989898 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
What problem does this PR solve?
Issue Number: close #4992
What is changed and how it works?
This pull request addresses race conditions in the eventstore's checkpoint update mechanism. By transitioning key checkpoint fields to atomic types and implementing monotonic update logic, the system now safely handles concurrent updates and prevents regression of checkpoint timestamps. Additionally, the change refines iterator boundary conditions and improves error handling for stale state updates to ensure consistent progress tracking.
Highlights
dispatcherStat.checkpointTstoatomic.Uint64to ensure thread-safe concurrent updates and prevent race conditions.<) instead of inclusive (<=) for more accurate key filtering.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Bug Fixes
Changes
Tests