Skip to content

Commit de29d3f

Browse files
authored
add support to repair offsets on restore (#253)
1 parent 59e5682 commit de29d3f

File tree

7 files changed

+192
-13
lines changed

7 files changed

+192
-13
lines changed

kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ public Params withTime(final Time time) {
421421
public Params build() {
422422
this.responsiveKafkaClientSupplier = new ResponsiveKafkaClientSupplier(
423423
clientSupplier,
424+
responsiveConfig,
424425
streamsConfig,
425426
storeRegistry,
426427
metrics,

kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,15 @@ public class ResponsiveConfig extends AbstractConfig {
231231
+ "runs in. When set to RUN, runs the Kafka Streams app. When set to MIGRATE, runs app"
232232
+ " migration.";
233233

234+
// ------------------ Misc functional overrides ----------------------
235+
public static final String RESTORE_OFFSET_REPAIR_ENABLED_CONFIG = "responsive.restore.offset.repair.enabled";
236+
public static final boolean RESTORE_OFFSET_REPAIR_ENABLED_DEFAULT = false;
237+
public static final String RESTORE_OFFSET_REPAIR_ENABLED_DOC = "When set to 'true', " + RESTORE_OFFSET_REPAIR_ENABLED_CONFIG
238+
+ " will ignore OffsetOutOfRangeException and instead seek to the earliest available offset. This exception "
239+
+ "should only happen in situations where there is truncation/retention on the changelog topic and restoring from the latest "
240+
+ "committed offset in the remote store is no longer possible. Note that in some situations this may cause data "
241+
+ "loss, use this configuration with caution";
242+
234243
// ------------------ StreamsConfig overrides ----------------------
235244

236245
// These configuration values are required by Responsive, and a ConfigException will
@@ -452,6 +461,12 @@ public class ResponsiveConfig extends AbstractConfig {
452461
),
453462
Importance.LOW,
454463
RESPONSIVE_MODE_DOC
464+
).define(
465+
RESTORE_OFFSET_REPAIR_ENABLED_CONFIG,
466+
Type.BOOLEAN,
467+
RESTORE_OFFSET_REPAIR_ENABLED_DEFAULT,
468+
Importance.LOW,
469+
RESTORE_OFFSET_REPAIR_ENABLED_DOC
455470
);
456471

457472
/**

kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
2020

2121
import dev.responsive.kafka.api.config.CompatibilityMode;
22+
import dev.responsive.kafka.api.config.ResponsiveConfig;
2223
import dev.responsive.kafka.internal.metrics.EndOffsetsPoller;
2324
import dev.responsive.kafka.internal.metrics.MetricPublishingCommitListener;
2425
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
@@ -67,15 +68,25 @@ public final class ResponsiveKafkaClientSupplier implements KafkaClientSupplier
6768
private final String applicationId;
6869
private final boolean eos;
6970
private final CompatibilityMode compatibilityMode;
71+
private final boolean repairRestoreOffsetOutOfRange;
7072

7173
public ResponsiveKafkaClientSupplier(
7274
final KafkaClientSupplier clientSupplier,
75+
final ResponsiveConfig responsiveConfig,
7376
final StreamsConfig configs,
7477
final ResponsiveStoreRegistry storeRegistry,
7578
final ResponsiveMetrics metrics,
7679
final CompatibilityMode compatibilityMode
7780
) {
78-
this(new Factories() {}, clientSupplier, configs, storeRegistry, metrics, compatibilityMode);
81+
this(
82+
new Factories() {},
83+
clientSupplier,
84+
configs,
85+
storeRegistry,
86+
metrics,
87+
compatibilityMode,
88+
responsiveConfig.getBoolean(ResponsiveConfig.RESTORE_OFFSET_REPAIR_ENABLED_CONFIG)
89+
);
7990
}
8091

8192
ResponsiveKafkaClientSupplier(
@@ -84,13 +95,15 @@ public ResponsiveKafkaClientSupplier(
8495
final StreamsConfig configs,
8596
final ResponsiveStoreRegistry storeRegistry,
8697
final ResponsiveMetrics metrics,
87-
final CompatibilityMode compatibilityMode
98+
final CompatibilityMode compatibilityMode,
99+
final boolean repairRestoreOffsetOutOfRange
88100
) {
89101
this.factories = factories;
90102
this.wrapped = wrapped;
91103
this.storeRegistry = storeRegistry;
92104
this.metrics = metrics;
93105
this.compatibilityMode = compatibilityMode;
106+
this.repairRestoreOffsetOutOfRange = repairRestoreOffsetOutOfRange;
94107

95108
eos = !(AT_LEAST_ONCE.equals(
96109
configs.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
@@ -174,7 +187,8 @@ public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> con
174187
return factories.createRestoreConsumer(
175188
clientId,
176189
wrapped.getRestoreConsumer(config),
177-
storeRegistry::getCommittedOffset
190+
storeRegistry::getCommittedOffset,
191+
repairRestoreOffsetOutOfRange
178192
);
179193

180194
}
@@ -403,12 +417,14 @@ default MetricPublishingCommitListener createMetricsPublishingCommitListener(
403417
default ResponsiveRestoreConsumer<byte[], byte[]> createRestoreConsumer(
404418
final String clientId,
405419
final Consumer<byte[], byte[]> restoreConsumer,
406-
final Function<TopicPartition, OptionalLong> getCommittedOffset
420+
final Function<TopicPartition, OptionalLong> getCommittedOffset,
421+
final boolean repairRestoreOffsetOutOfRange
407422
) {
408423
return new ResponsiveRestoreConsumer<>(
409424
clientId,
410425
restoreConsumer,
411-
getCommittedOffset
426+
getCommittedOffset,
427+
repairRestoreOffsetOutOfRange
412428
);
413429
}
414430
}

kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveRestoreConsumer.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package dev.responsive.kafka.internal.clients;
22

3+
import dev.responsive.kafka.api.config.ResponsiveConfig;
34
import java.time.Duration;
45
import java.util.Collection;
56
import java.util.Collections;
@@ -12,6 +13,7 @@
1213
import org.apache.kafka.clients.consumer.Consumer;
1314
import org.apache.kafka.clients.consumer.ConsumerRecords;
1415
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
16+
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
1517
import org.apache.kafka.common.TopicPartition;
1618
import org.apache.kafka.common.utils.LogContext;
1719
import org.slf4j.Logger;
@@ -48,17 +50,20 @@ public class ResponsiveRestoreConsumer<K, V> extends DelegatingConsumer<K, V> {
4850

4951
private final Function<TopicPartition, OptionalLong> startOffsets;
5052
private final Set<TopicPartition> uninitializedOffsets = new HashSet<>();
53+
private final boolean repairRestoreOffsetOutOfRange;
5154

5255
public ResponsiveRestoreConsumer(
5356
final String clientId,
5457
final Consumer<K, V> delegate,
55-
final Function<TopicPartition, OptionalLong> startOffsets
58+
final Function<TopicPartition, OptionalLong> startOffsets,
59+
final boolean repairRestoreOffsetOutOfRange
5660
) {
5761
super(delegate);
5862
this.startOffsets = Objects.requireNonNull(startOffsets);
5963
this.log = new LogContext(
6064
String.format("responsive-restore-consumer [%s]", Objects.requireNonNull(clientId))
6165
).logger(ResponsiveConsumer.class);
66+
this.repairRestoreOffsetOutOfRange = repairRestoreOffsetOutOfRange;
6267
}
6368

6469
private Set<TopicPartition> initializeOffsets(final Collection<TopicPartition> partitions) {
@@ -101,7 +106,23 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
101106
log.error("Found uninitialized changelog partitions during poll: {}", uninitializedOffsets);
102107
throw new IllegalStateException("Restore consumer invoked poll without initializing offsets");
103108
}
104-
return super.poll(timeout);
109+
110+
try {
111+
return super.poll(timeout);
112+
} catch (final OffsetOutOfRangeException e) {
113+
// e.partitions() should never be empty, but we check it because accidentally
114+
// passing in empty partitions is catastrophic (it will seek all partitions to
115+
// beginning)
116+
if (repairRestoreOffsetOutOfRange && !e.partitions().isEmpty()) {
117+
log.warn("The restore consumer attempted to seek to offsets that are no longer in range. "
118+
+ ResponsiveConfig.RESTORE_OFFSET_REPAIR_ENABLED_CONFIG + " is enabled and will "
119+
+ "automatically seek " + e.offsetOutOfRangePartitions() + " to their earliest offset "
120+
+ "and continue restoration.", e);
121+
super.seekToBeginning(e.partitions());
122+
return super.poll(timeout);
123+
}
124+
throw e;
125+
}
105126
}
106127

107128
@Override

kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
105105
import org.apache.kafka.clients.producer.KafkaProducer;
106106
import org.apache.kafka.clients.producer.ProducerRecord;
107+
import org.apache.kafka.clients.producer.RecordMetadata;
107108
import org.apache.kafka.common.IsolationLevel;
108109
import org.apache.kafka.common.TopicPartition;
109110
import org.apache.kafka.common.config.TopicConfig;
@@ -226,6 +227,76 @@ private static Stream<Arguments> shouldFlushTestParams() {
226227
);
227228
}
228229

230+
@ParameterizedTest
231+
@EnumSource(KVSchema.class)
232+
public void shouldRepairOffsetsIfOutOfRangeAndConfigured(final KVSchema type) throws Exception {
233+
// Given:
234+
final Map<String, Object> properties = getMutableProperties();
235+
properties.put(ResponsiveConfig.RESTORE_OFFSET_REPAIR_ENABLED_CONFIG, true);
236+
final KafkaProducer<Long, Long> producer = new KafkaProducer<>(properties);
237+
final KafkaClientSupplier defaultClientSupplier = new DefaultKafkaClientSupplier();
238+
final CassandraClientFactory defaultFactory = new DefaultCassandraClientFactory();
239+
final TopicPartition input = new TopicPartition(inputTopic(), 0);
240+
final TopicPartition changelog = new TopicPartition(name + "-" + aggName() + "-changelog", 0);
241+
242+
// When:
243+
final long clOffset;
244+
try (final ResponsiveKafkaStreams streams
245+
= buildAggregatorApp(properties, defaultClientSupplier, defaultFactory, type, false)) {
246+
IntegrationTestUtils.startAppAndAwaitRunning(Duration.ofSeconds(30), streams);
247+
// Send some data through
248+
pipeInput(
249+
inputTopic(),
250+
1,
251+
producer,
252+
System::currentTimeMillis,
253+
0,
254+
1,
255+
LongStream.range(0, 100).toArray()
256+
);
257+
// Wait for it to be processed
258+
waitTillFullyConsumed(input, Duration.ofSeconds(120));
259+
260+
final List<ConsumerRecord<Long, Long>> changelogRecords
261+
= slurpPartition(changelog, properties);
262+
clOffset = changelogRecords.get(changelogRecords.size() - 1).offset();
263+
}
264+
265+
// produce some data so we can truncate the data that has been committed
266+
final RecordMetadata recordMetadata =
267+
producer.send(new ProducerRecord<>(changelog.topic(), changelog.partition(), -1L, -1L))
268+
.get();
269+
270+
// truncate the offset that exists in remote
271+
admin.deleteRecords(
272+
Map.of(changelog, RecordsToDelete.beforeOffset(recordMetadata.offset()))
273+
).all().get();
274+
275+
// run another application
276+
try (final ResponsiveKafkaStreams streams
277+
= buildAggregatorApp(properties, defaultClientSupplier, defaultFactory, type, false)) {
278+
IntegrationTestUtils.startAppAndAwaitRunning(Duration.ofSeconds(30), streams);
279+
// Send some data through
280+
pipeInput(
281+
inputTopic(),
282+
1,
283+
producer,
284+
System::currentTimeMillis,
285+
0,
286+
1,
287+
LongStream.range(0, 100).toArray()
288+
);
289+
// Wait for it to be processed
290+
waitTillFullyConsumed(input, Duration.ofSeconds(120));
291+
292+
// Verify it made progress
293+
final List<ConsumerRecord<Long, Long>> changelogRecords
294+
= slurpPartition(changelog, properties);
295+
final long last = changelogRecords.get(changelogRecords.size() - 1).offset();
296+
assertThat(last, greaterThan(clOffset));
297+
}
298+
}
299+
229300
@ParameterizedTest
230301
@EnumSource(KVSchema.class)
231302
public void shouldRestoreUnflushedChangelog(final KVSchema type) throws Exception {

kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,12 @@ public void shouldCreateGlobalAndRestoreConsumerInFullCompatibilityMode() {
247247

248248
// then:
249249
verify(factories, Mockito.atLeastOnce()).createGlobalConsumer(any(), any(), any());
250-
verify(factories, Mockito.atLeastOnce()).createRestoreConsumer(any(), any(), any());
250+
verify(factories, Mockito.atLeastOnce()).createRestoreConsumer(
251+
any(),
252+
any(),
253+
any(),
254+
anyBoolean()
255+
);
251256
}
252257

253258
@Test
@@ -261,7 +266,7 @@ public void shouldNotCreateGlobalOrRestoreConsumerInMetricsCompatibilityMode() {
261266

262267
// then:
263268
verify(factories, Mockito.never()).createGlobalConsumer(any(), any(), any());
264-
verify(factories, Mockito.never()).createRestoreConsumer(any(), any(), any());
269+
verify(factories, Mockito.never()).createRestoreConsumer(any(), any(), any(), anyBoolean());
265270
}
266271

267272
@NotNull
@@ -275,7 +280,8 @@ private ResponsiveKafkaClientSupplier supplier(
275280
new StreamsConfig(CONFIGS),
276281
storeRegistry,
277282
metrics,
278-
compat
283+
compat,
284+
false
279285
);
280286
}
281287

kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveRestoreConsumerTest.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,31 @@
11
package dev.responsive.kafka.internal.clients;
22

3+
import static org.mockito.ArgumentMatchers.any;
34
import static org.mockito.Mockito.clearInvocations;
5+
import static org.mockito.Mockito.doNothing;
46
import static org.mockito.Mockito.times;
57
import static org.mockito.Mockito.verify;
68
import static org.mockito.Mockito.verifyNoMoreInteractions;
79
import static org.mockito.Mockito.when;
810

9-
import dev.responsive.kafka.internal.clients.ResponsiveRestoreConsumer;
1011
import java.time.Duration;
12+
import java.util.Collection;
1113
import java.util.Collections;
1214
import java.util.List;
15+
import java.util.Map;
1316
import java.util.OptionalLong;
1417
import java.util.Set;
1518
import org.apache.kafka.clients.consumer.Consumer;
1619
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
20+
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
1721
import org.apache.kafka.common.TopicPartition;
1822
import org.junit.jupiter.api.Assertions;
1923
import org.junit.jupiter.api.BeforeEach;
2024
import org.junit.jupiter.api.Test;
2125
import org.junit.jupiter.api.extension.ExtendWith;
26+
import org.mockito.ArgumentCaptor;
2227
import org.mockito.Mock;
28+
import org.mockito.Mockito;
2329
import org.mockito.junit.jupiter.MockitoExtension;
2430

2531
@ExtendWith(MockitoExtension.class)
@@ -35,7 +41,11 @@ public class ResponsiveRestoreConsumerTest {
3541

3642
@BeforeEach
3743
public void setup() {
38-
restoreConsumer = new ResponsiveRestoreConsumer<>(
44+
restoreConsumer = getRestoreConsumer(false);
45+
}
46+
47+
private ResponsiveRestoreConsumer<?, ?> getRestoreConsumer(boolean repairOffsets) {
48+
return new ResponsiveRestoreConsumer<>(
3949
"restore-consumer",
4050
wrapped,
4151
tp -> {
@@ -46,7 +56,9 @@ public void setup() {
4656
return OptionalLong.of(456L);
4757
}
4858
return OptionalLong.empty();
49-
});
59+
},
60+
repairOffsets
61+
);
5062
}
5163

5264
@Test
@@ -193,6 +205,43 @@ public void shouldClearUninitializedPartitionsWhenUnsubscribed() {
193205
restoreConsumer.poll(Duration.ofMillis(100));
194206
}
195207

208+
@SuppressWarnings("unchecked")
209+
@Test
210+
public void shouldSeekToBeginningOnOffsetOutOfRangeIfRepairOffsetConfigured() {
211+
// Given:
212+
restoreConsumer = getRestoreConsumer(true);
213+
final ArgumentCaptor<Collection<TopicPartition>> seekedTps
214+
= ArgumentCaptor.forClass(Collection.class);
215+
216+
when(wrapped.poll(any()))
217+
.thenThrow(new OffsetOutOfRangeException(Map.of(TOPIC_PARTITION1, 1L)))
218+
.thenReturn(null);
219+
doNothing().when(wrapped).seekToBeginning(seekedTps.capture());
220+
221+
// When:
222+
restoreConsumer.assign(List.of(TOPIC_PARTITION1));
223+
restoreConsumer.seekToBeginning(List.of(TOPIC_PARTITION1));
224+
restoreConsumer.poll(Duration.ofMillis(100));
225+
226+
// Then:
227+
verify(wrapped, Mockito.times(2)).poll(any());
228+
verify(wrapped).seekToBeginning(Set.of(TOPIC_PARTITION1));
229+
}
230+
231+
@Test
232+
public void shouldThrowOffsetOutOfRangeExceptionIfNoRepairConfigured() {
233+
// Given:
234+
when(wrapped.poll(any()))
235+
.thenThrow(new OffsetOutOfRangeException(Map.of(TOPIC_PARTITION1, 1L)))
236+
.thenReturn(null);
237+
238+
// Then:
239+
Assertions.assertThrows(
240+
OffsetOutOfRangeException.class,
241+
() -> restoreConsumer.poll(Duration.ofMillis(100))
242+
);
243+
}
244+
196245
@SuppressWarnings("unchecked")
197246
private void clearConsumerInvocations() {
198247
clearInvocations(wrapped);

0 commit comments

Comments
 (0)