Skip to content

Commit 298cd32

Browse files
authored
fix(server): prevent consumer offset skip during concurrent produce+consume (#2958)
1 parent 6180d88 commit 298cd32

21 files changed

+1856
-202
lines changed

core/common/src/types/message/messages_batch_mut.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ impl IggyMessagesBatchMut {
324324
let first_offset = self.first_offset()?;
325325

326326
if start_offset < first_offset {
327-
return self.slice_by_index(0, count);
327+
return None;
328328
}
329329

330330
let last_offset = self.last_offset()?;

core/common/src/types/message/messages_batch_set.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,22 +237,30 @@ impl IggyMessagesBatchSet {
237237

238238
let mut result = Self::with_capacity(self.containers_count());
239239
let mut remaining_count = count;
240+
let mut current_offset = start_offset;
240241

241242
for container in self.iter() {
242243
if remaining_count == 0 {
243244
break;
244245
}
245246

246-
let first_offset = container.first_offset();
247-
if first_offset.is_none()
248-
|| first_offset.unwrap() + container.count() as u64 <= start_offset
249-
{
247+
let Some(batch_first) = container.first_offset() else {
248+
continue;
249+
};
250+
if batch_first + container.count() as u64 <= current_offset {
250251
continue;
251252
}
252253

253-
if let Some(sliced) = container.slice_by_offset(start_offset, remaining_count)
254+
// When current_offset is below this batch's range (cross-batch
255+
// reads), start from the batch's first offset instead.
256+
let effective_start = current_offset.max(batch_first);
257+
258+
if let Some(sliced) = container.slice_by_offset(effective_start, remaining_count)
254259
&& sliced.count() > 0
255260
{
261+
if let Some(last) = sliced.last_offset() {
262+
current_offset = last + 1;
263+
}
256264
remaining_count -= sliced.count();
257265
result.add_batch(sliced);
258266
}

core/common/src/types/segment.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ impl Segment {
6666
}
6767

6868
pub fn is_expired(&self, now: IggyTimestamp, expiry: IggyExpiry) -> bool {
69-
if !self.sealed {
69+
if !self.sealed || self.end_timestamp == 0 {
7070
return false;
7171
}
7272

@@ -79,3 +79,30 @@ impl Segment {
7979
}
8080
}
8181
}
82+
83+
#[cfg(test)]
84+
mod tests {
85+
use super::*;
86+
use crate::{IggyDuration, IggyTimestamp};
87+
use std::time::Duration;
88+
89+
#[test]
90+
fn zero_timestamp_segment_should_not_appear_expired() {
91+
// Reproduce Bug 3 from #2924: during bootstrap, segments with empty
92+
// indexes get end_timestamp = 0. is_expired() then evaluates
93+
// 0 + expiry <= now, which is always true, causing immediate deletion.
94+
let mut seg = Segment::new(5000, IggyByteSize::from(128 * 1024 * 1024u64));
95+
seg.sealed = true;
96+
seg.end_timestamp = 0; // simulates bootstrap with empty indexes
97+
98+
let now = IggyTimestamp::now();
99+
let expiry = IggyExpiry::ExpireDuration(IggyDuration::from(Duration::from_secs(600)));
100+
101+
// A segment with unknown timestamp (0) must NOT be considered expired.
102+
// Currently this FAILS - is_expired returns true because 0 + 600s <= now.
103+
assert!(
104+
!seg.is_expired(now, expiry),
105+
"BUG: segment with end_timestamp=0 appears instantly expired"
106+
);
107+
}
108+
}

core/integration/tests/server/message_cleanup.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ fn fair_size_cleanup_multipartition() -> CleanupScenarioFn {
7373
}
7474
}
7575

76+
fn expiry_respects_consumer_offset() -> CleanupScenarioFn {
77+
|client, path| {
78+
Box::pin(message_cleanup_scenario::run_expiry_respects_consumer_offset(client, path))
79+
}
80+
}
81+
7682
async fn run_cleanup_scenario(scenario: CleanupScenarioFn) {
7783
let mut harness = TestHarness::builder()
7884
.server(
@@ -116,6 +122,7 @@ async fn run_cleanup_scenario(scenario: CleanupScenarioFn) {
116122
combined_retention(),
117123
expiry_multipartition(),
118124
fair_size_cleanup_multipartition(),
125+
expiry_respects_consumer_offset(),
119126
])]
120127
#[tokio::test]
121128
#[parallel]
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/* Licensed to the Apache Software Foundation (ASF) under one
2+
* or more contributor license agreements. See the NOTICE file
3+
* distributed with this work for additional information
4+
* regarding copyright ownership. The ASF licenses this file
5+
* to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance
7+
* with the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing,
12+
* software distributed under the License is distributed on an
13+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
* KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations
16+
* under the License.
17+
*/
18+
19+
//! Regression test for issue #2715: consumer offset skip during concurrent produce+consume.
20+
//!
21+
//! Root cause ("State C"):
22+
//! 1. Journal commit() moves data to in-flight buffer (async persist begins)
23+
//! 2. New send arrives before persist completes - journal is non-empty again
24+
//! 3. Consumer polls with Next + auto_commit:
25+
//! Old code only checked in-flight when journal was empty (Case 0).
26+
//! With journal non-empty (Case 1-3), in-flight was invisible.
27+
//! auto_commit stored the journal offset, permanently skipping the in-flight range.
28+
//!
29+
//! Trigger: `messages_required_to_save = "1"` forces an inline journal commit on every
30+
//! batch. With concurrent sends, the next batch arrives while the previous is persisting,
31+
//! reliably producing State C.
32+
33+
use bytes::Bytes;
34+
use iggy::prelude::*;
35+
use integration::iggy_harness;
36+
use std::sync::Arc;
37+
use std::sync::atomic::{AtomicBool, Ordering};
38+
use std::time::{Duration, Instant};
39+
40+
const STREAM_NAME: &str = "issue-2715-stream";
41+
const TOPIC_NAME: &str = "issue-2715-topic";
42+
const TOTAL_MESSAGES: u64 = 500;
43+
const PRODUCER_BATCH_SIZE: u32 = 5;
44+
const CONSUMER_BATCH_SIZE: u32 = 10;
45+
const MAX_TEST_DURATION: Duration = Duration::from_secs(60);
46+
47+
/// Regression test for issue #2715: consumer must not skip offsets during
48+
/// concurrent produce+consume when the journal commits while in-flight data exists.
49+
///
50+
/// `messages_required_to_save = "1"` forces every batch to trigger an inline
51+
/// journal commit. The next send arrives before async persist completes, creating
52+
/// State C (journal non-empty + in-flight non-empty simultaneously).
53+
#[iggy_harness(server(
54+
partition.messages_required_to_save = "1",
55+
partition.enforce_fsync = false,
56+
message_saver.enabled = true,
57+
message_saver.interval = "100ms"
58+
))]
59+
async fn concurrent_produce_consume_no_offset_skip(harness: &TestHarness) {
60+
let stream_id = Identifier::named(STREAM_NAME).unwrap();
61+
let topic_id = Identifier::named(TOPIC_NAME).unwrap();
62+
63+
let setup = harness.tcp_root_client().await.unwrap();
64+
setup.create_stream(STREAM_NAME).await.unwrap();
65+
setup
66+
.create_topic(
67+
&stream_id,
68+
TOPIC_NAME,
69+
1,
70+
CompressionAlgorithm::None,
71+
None,
72+
IggyExpiry::NeverExpire,
73+
MaxTopicSize::ServerDefault,
74+
)
75+
.await
76+
.unwrap();
77+
drop(setup);
78+
79+
let producer_client = harness.tcp_root_client().await.unwrap();
80+
let consumer_client = harness.tcp_root_client().await.unwrap();
81+
82+
let producer_done = Arc::new(AtomicBool::new(false));
83+
let producer_done_clone = producer_done.clone();
84+
85+
// Consumer: polls with Next + auto_commit concurrently with the producer.
86+
// Checks that received offsets are strictly contiguous (no gaps).
87+
let consumer_task = tokio::spawn(async move {
88+
let stream = Identifier::named(STREAM_NAME).unwrap();
89+
let topic = Identifier::named(TOPIC_NAME).unwrap();
90+
let consumer = Consumer::default();
91+
let mut last_offset: Option<u64> = None;
92+
let mut total_received = 0u64;
93+
let mut consecutive_empty = 0u32;
94+
let deadline = Instant::now() + MAX_TEST_DURATION;
95+
96+
loop {
97+
if Instant::now() >= deadline {
98+
panic!(
99+
"Consumer timed out after {MAX_TEST_DURATION:?}. \
100+
Received {total_received}/{TOTAL_MESSAGES}, \
101+
last_offset: {last_offset:?}"
102+
);
103+
}
104+
105+
let result = consumer_client
106+
.poll_messages(
107+
&stream,
108+
&topic,
109+
Some(0),
110+
&consumer,
111+
&PollingStrategy::next(),
112+
CONSUMER_BATCH_SIZE,
113+
true, // auto_commit - this is what makes the skip permanent
114+
)
115+
.await;
116+
117+
match result {
118+
Ok(polled) if polled.messages.is_empty() => {
119+
if producer_done_clone.load(Ordering::Relaxed) {
120+
consecutive_empty += 1;
121+
if consecutive_empty >= 20 {
122+
break;
123+
}
124+
tokio::time::sleep(Duration::from_millis(50)).await;
125+
} else {
126+
tokio::time::sleep(Duration::from_millis(5)).await;
127+
}
128+
}
129+
Ok(polled) => {
130+
consecutive_empty = 0;
131+
for msg in &polled.messages {
132+
let offset = msg.header.offset;
133+
if let Some(last) = last_offset {
134+
assert_eq!(
135+
offset,
136+
last + 1,
137+
"Offset gap! Expected {}, got {} (skipped {} messages). \
138+
Issue #2715: in-flight buffer invisible when journal non-empty.",
139+
last + 1,
140+
offset,
141+
offset.saturating_sub(last + 1),
142+
);
143+
}
144+
last_offset = Some(offset);
145+
total_received += 1;
146+
}
147+
}
148+
Err(e) => {
149+
eprintln!("Consumer poll error (transient under load): {e:?}");
150+
tokio::time::sleep(Duration::from_millis(10)).await;
151+
}
152+
}
153+
}
154+
155+
total_received
156+
});
157+
158+
// Producer: sends TOTAL_MESSAGES in a tight loop to maximize State C window.
159+
let mut sent = 0u64;
160+
while sent < TOTAL_MESSAGES {
161+
let batch_count = PRODUCER_BATCH_SIZE.min((TOTAL_MESSAGES - sent) as u32);
162+
let mut messages: Vec<IggyMessage> = (0..batch_count)
163+
.map(|i| {
164+
IggyMessage::builder()
165+
.payload(Bytes::from(format!("msg-{}", sent + i as u64)))
166+
.build()
167+
.unwrap()
168+
})
169+
.collect();
170+
171+
producer_client
172+
.send_messages(
173+
&stream_id,
174+
&topic_id,
175+
&Partitioning::partition_id(0),
176+
&mut messages,
177+
)
178+
.await
179+
.unwrap_or_else(|e| panic!("Producer send failed at sent={sent}: {e}"));
180+
181+
sent += batch_count as u64;
182+
}
183+
184+
producer_done.store(true, Ordering::Relaxed);
185+
186+
let total_received = consumer_task.await.unwrap();
187+
assert_eq!(
188+
total_received,
189+
TOTAL_MESSAGES,
190+
"Consumer received {total_received}/{TOTAL_MESSAGES} - \
191+
missing {} messages (issue #2715 offset skip regression).",
192+
TOTAL_MESSAGES - total_received,
193+
);
194+
195+
producer_client.delete_stream(&stream_id).await.unwrap();
196+
}

0 commit comments

Comments
 (0)