fix(server): prevent consumer offset skip during concurrent produce+consume#2958
Merged
fix(server): prevent consumer offset skip during concurrent produce+consume#2958
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2958 +/- ##
============================================
+ Coverage 72.02% 72.07% +0.04%
Complexity 930 930
============================================
Files 1124 1124
Lines 93669 93832 +163
Branches 71017 71192 +175
============================================
+ Hits 67469 67627 +158
+ Misses 23631 23612 -19
- Partials 2569 2593 +24
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
12acd1e to
1d05ba5
Compare
Contributor
Author
|
This PR waits for @lukaszzborek comment. Once he says that his problem is no longer visible, we can merge. |
5a0958b to
3394c61
Compare
Contributor
|
After those changes, I don't see a problem with skipping offset. So I think is fixed now |
numinnex
reviewed
Mar 21, 2026
…mer message skip After server restart, MemoryMessageJournal was created via Default with next_offset=0. The three-tier message routing used this value as in_memory_floor, which prevented all disk reads when the journal had data. Consumers using PollingStrategy::Next with auto_commit permanently skipped the last few disk messages when new messages arrived in the journal concurrently. Root cause: journal.init() existed but was never called in production code - only in tests. After restart with N messages on disk, the first journal append set current_offset = 0 + batch_count - 1 instead of N + batch_count - 1. slice_by_offset then silently returned messages from the wrong range (clamping to index 0 when start_offset < first_offset). The fix has four layers: 1. Initialize journal next_offset at both bootstrap paths (shard/mod.rs and the lazy init_partition_inner path in shard/system/partitions.rs, which also lacked the current_offset, should_increment, and consumer clamping fixes from the previous commit). 2. Self-heal next_offset in journal.append() on first append when messages_count==0, with debug_assert validation. 3. Change slice_by_offset to return None when start_offset < first_offset instead of silently returning data from a higher offset range. 4. Remove Default from MemoryMessageJournal so the bug class is structurally impossible. Add explicit empty()/at_offset() constructors. Replace inner() exposure with typed query methods (first_offset, last_offset, first_timestamp, last_timestamp) on the Journal trait. Rename base_offset to next_offset. Delete dead Clone impl. Document single-threaded safety invariant on the snapshot-then-read pattern. Fixes #2715 Fixes #2924
numinnex
approved these changes
Mar 23, 2026
mmodzelewski
approved these changes
Mar 23, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
After server restart, MemoryMessageJournal was created via
Default with base_offset=0. The three-tier message routing
used this value as in_memory_floor, which prevented all disk
reads when the journal had data. Consumers using
PollingStrategy::Next with auto_commit permanently skipped
the last few disk messages when new messages arrived in the
journal concurrently.
Root cause: journal.init() existed but was never called in
production code - only in tests. After restart with N
messages on disk, the first journal append set
current_offset = 0 + batch_count - 1 instead of
N + batch_count - 1. slice_by_offset then silently returned
messages from the wrong range (clamping to index 0 when
start_offset < first_offset).
The fix has six layers:
Initialize journal base_offset at both bootstrap paths
(shard/mod.rs and the lazy init_partition_inner path in
shard/system/partitions.rs). Fix current_offset
computation to use max end_offset across segments with
data, not just the active segment. Fix
should_increment_offset to check any segment has data,
not just current_offset > 0.
Self-heal base_offset in journal.append() on first
append when messages_count==0, with debug_assert
validation.
Change slice_by_offset to return None when
start_offset < first_offset instead of silently
returning data from a higher offset range.
Remove Default from MemoryMessageJournal so the bug
class is structurally impossible. Add explicit
empty()/at_offset() constructors. Add typed query
methods (first_offset, last_offset, first_timestamp,
last_timestamp) on the Journal trait. Delete dead
Clone impl.
Clamp consumer offsets that are ahead of partition
offset after crash (OOM, SIGKILL) at both bootstrap
paths. Prevents permanent empty polls when
auto_commit persisted an offset beyond what was
flushed to disk.
Add consumer offset barrier to time-based expiry
(delete_expired_segments_for_partition), matching the
existing size-based barrier. Log a warning when the
barrier blocks segment deletion, identifying the
blocking consumer kind, ID, and offset. Fix
is_expired to treat end_timestamp=0 as non-expired
(prevents instant deletion of segments with empty
indexes during bootstrap).
Reference issues: #2715 and #2924.