Skip to content

Commit 119b947

Browse files
authored
Fix for TTD when loading properties from file (#439)
1 parent 31d3b66 commit 119b947

File tree

3 files changed

+58
-10
lines changed

3 files changed

+58
-10
lines changed

responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import static dev.responsive.kafka.api.async.internals.AsyncUtils.getAsyncThreadPool;
1616
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG;
17+
import static dev.responsive.kafka.api.config.ResponsiveConfig.responsiveConfig;
1718
import static dev.responsive.kafka.internal.stores.TTDRestoreListener.mockRestoreListener;
1819

1920
import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistration;
@@ -191,7 +192,7 @@ private static Properties testDriverProps(
191192
.withMetrics(metrics)
192193
.withTopologyDescription(topologyDescription);
193194

194-
AsyncUtils.configuredAsyncThreadPool(ResponsiveConfig.responsiveConfig(baseProps), 1, metrics)
195+
AsyncUtils.configuredAsyncThreadPool(responsiveConfig(baseProps), 1, metrics)
195196
.ifPresent(threadPoolRegistry -> {
196197
threadPoolRegistry.startNewAsyncThreadPool(Thread.currentThread().getName());
197198
sessionConfig.withAsyncThreadPoolRegistry(threadPoolRegistry);
@@ -231,7 +232,7 @@ private static MockTime mockTime(final Instant initialWallClockTime) {
231232
private static Optional<AsyncThreadPoolRegistration> getAsyncThreadPoolRegistration(
232233
final Properties props
233234
) {
234-
final int asyncThreadPoolSize = (int) props.getOrDefault(ASYNC_THREAD_POOL_SIZE_CONFIG, 0);
235+
final int asyncThreadPoolSize = responsiveConfig(props).getInt(ASYNC_THREAD_POOL_SIZE_CONFIG);
235236

236237
if (asyncThreadPoolSize > 0) {
237238
final Map<String, Object> configMap = new HashMap<>();

responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
import static dev.responsive.kafka.api.async.AsyncFixedKeyProcessorSupplier.createAsyncProcessorSupplier;
1616
import static dev.responsive.kafka.testutils.processors.Deduplicator.deduplicatorApp;
17+
import static org.hamcrest.MatcherAssert.assertThat;
18+
import static org.hamcrest.Matchers.equalTo;
1719
import static org.junit.jupiter.api.Assertions.assertNotNull;
1820
import static org.junit.jupiter.api.Assertions.assertNull;
1921

@@ -24,6 +26,7 @@
2426
import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration;
2527
import dev.responsive.kafka.internal.stores.SchemaTypes;
2628
import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema;
29+
import java.io.InputStream;
2730
import java.time.Duration;
2831
import java.time.Instant;
2932
import java.util.List;
@@ -45,8 +48,8 @@
4548
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
4649
import org.apache.kafka.streams.state.KeyValueStore;
4750
import org.apache.kafka.streams.state.ValueAndTimestamp;
48-
import org.hamcrest.MatcherAssert;
4951
import org.hamcrest.Matchers;
52+
import org.junit.jupiter.api.Test;
5053
import org.junit.jupiter.params.ParameterizedTest;
5154
import org.junit.jupiter.params.provider.EnumSource;
5255

@@ -87,7 +90,7 @@ public void shouldRunWithoutResponsiveConnectionAndNoTtl(final KVSchema type) {
8790

8891
// Then:
8992
final List<String> outputs = output.readValuesToList();
90-
MatcherAssert.assertThat(outputs, Matchers.contains(
93+
assertThat(outputs, Matchers.contains(
9194
"a,100,1,1,alice,CA",
9295
"c,102,1,1,alice,CA",
9396
"d,103,3,3,carol,CA"
@@ -145,7 +148,7 @@ public void shouldEnforceKeyBasedTtlByAdvancingStreamTime(final KVSchema type) {
145148

146149
// Then:
147150
final List<String> outputs = output.readValuesToList();
148-
MatcherAssert.assertThat(outputs, Matchers.contains(
151+
assertThat(outputs, Matchers.contains(
149152
"a,100,1,1,alice,CA",
150153
"d,103,3,3,carol,CA",
151154
"e,104,1,1,alex,CA",
@@ -157,8 +160,7 @@ public void shouldEnforceKeyBasedTtlByAdvancingStreamTime(final KVSchema type) {
157160

158161
@ParameterizedTest
159162
@EnumSource(SchemaTypes.KVSchema.class)
160-
public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type)
161-
throws InterruptedException {
163+
public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type) {
162164
// Given:
163165
final Duration defaultTtl = Duration.ofMillis(15);
164166

@@ -206,7 +208,7 @@ public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type
206208

207209
// Then:
208210
final List<String> outputs = output.readValuesToList();
209-
MatcherAssert.assertThat(outputs, Matchers.containsInAnyOrder(
211+
assertThat(outputs, Matchers.containsInAnyOrder(
210212
"a,100,1,1,alice,CA",
211213
"d,103,3,3,carol,CA",
212214
"e,104,1,1,alex,CA",
@@ -263,10 +265,51 @@ public void shouldDeduplicateWithTtlProviderToExpireOldRecords(final KVSchema ty
263265
}
264266
}
265267

268+
@Test
269+
public void shouldLoadPropertiesFromFile() throws Exception {
270+
final String propsPath = "ttd-app.properties";
271+
final InputStream inputStream = getClass().getClassLoader().getResourceAsStream(propsPath);
272+
273+
if (inputStream == null) {
274+
throw new RuntimeException("ttd-app.properties not found under test/resources/");
275+
}
276+
277+
final Properties properties = new Properties();
278+
properties.load(inputStream);
279+
280+
final var params = ResponsiveKeyValueParams.keyValue("store");
281+
final Topology topology = deduplicatorApp("input", "output", params);
282+
try (final var driver = new ResponsiveTopologyTestDriver(topology, properties, STARTING_TIME)) {
283+
final TestInputTopic<String, String> inputTopic = driver.createInputTopic(
284+
"input",
285+
new StringSerializer(),
286+
new StringSerializer(),
287+
STARTING_TIME,
288+
Duration.ZERO
289+
);
290+
291+
final TestOutputTopic<String, String> output = driver.createOutputTopic(
292+
"output", new StringDeserializer(), new StringDeserializer());
293+
294+
inputTopic.pipeInput("A", "A1");
295+
inputTopic.pipeInput("A", "A2");
296+
inputTopic.pipeInput("B", "B1");
297+
inputTopic.pipeInput("A", "A3");
298+
299+
final List<String> outputs = output.readValuesToList();
300+
assertThat(outputs.size(), equalTo(2));
301+
assertThat(outputs, Matchers.containsInAnyOrder(
302+
"A1",
303+
"B1"
304+
));
305+
}
306+
307+
}
308+
266309
private ResponsiveTopologyTestDriver setupDriver(final Topology topology) {
267310
final Properties props = new Properties();
268-
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
269-
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
311+
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
312+
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
270313
props.put(ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, 2);
271314

272315
return new ResponsiveTopologyTestDriver(topology, props, STARTING_TIME);
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
2+
default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
3+
4+
responsive.async.thread.pool.size=1

0 commit comments

Comments
 (0)