[GSoC 2026] Kafka Streams runner — KafkaStreamsTestRunner test harness#39211
Conversation
First part of the testing-infra sub-issue: extract the translate + drive boilerplate every runner test repeated into a reusable KafkaStreamsTestRunner. - run(Pipeline): translate + drive through a TopologyTestDriver, and — since the driver does not loop a low-level sink topic back into its source — auto-discover internal repartition topics (those that appear as both a sink and a source, via TopologyDescription) and round-trip them to quiescence. This generalises the manual repartition loop GroupByKeyTest did by hand. - translate(Pipeline) / streamsConfig(Pipeline) / leafProcessorName(Topology) for tests that need the Topology to attach a capture processor before driving. - testOptions(): the EMBEDDED harness + a unique application id. Migrate GroupByKeyTest and ExecutableStageTranslatorTest to run(), and WatermarkPropagationTest and KafkaStreamsRunnerTest to translate()/streamsConfig(). ChainedExecutableStageTest keeps its explicit translate because it asserts on the prepared pipeline proto's ExecutableStage count. Test-only; no runner behaviour change. Refs apache#39192
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a reusable test harness, Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces KafkaStreamsTestRunner, a reusable test harness that simplifies Beam pipeline translation and execution using TopologyTestDriver in Kafka Streams runner tests. It refactors several test classes (KafkaStreamsRunnerTest, ExecutableStageTranslatorTest, GroupByKeyTest, and WatermarkPropagationTest) to eliminate redundant boilerplate. Feedback on the new test runner includes: ensuring leafProcessorName is deterministic by verifying only a single leaf processor exists, adding a null check for topicSet() on source nodes to prevent potential NullPointerExceptions, and optimizing roundTripInternalTopics by instantiating TestInputTopic and TestOutputTopic outside of the loop.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| public static String leafProcessorName(Topology topology) { | ||
| for (TopologyDescription.Subtopology subtopology : topology.describe().subtopologies()) { | ||
| for (TopologyDescription.Node node : subtopology.nodes()) { | ||
| if (node instanceof TopologyDescription.Processor && node.successors().isEmpty()) { | ||
| return node.name(); | ||
| } | ||
| } | ||
| } | ||
| throw new IllegalStateException("no leaf processor found in topology"); | ||
| } |
There was a problem hiding this comment.
The iteration order of subtopology.nodes() (which returns a Set) is not guaranteed to be deterministic. If a topology contains multiple leaf processors (processors with no successors), leafProcessorName will return whichever leaf it encounters first, which could lead to non-deterministic test failures. Collecting all leaf processors and asserting that exactly one exists makes the test harness more robust and deterministic.
public static String leafProcessorName(Topology topology) {
List<String> leaves = new java.util.ArrayList<>();
for (TopologyDescription.Subtopology subtopology : topology.describe().subtopologies()) {
for (TopologyDescription.Node node : subtopology.nodes()) {
if (node instanceof TopologyDescription.Processor && node.successors().isEmpty()) {
leaves.add(node.name());
}
}
}
if (leaves.isEmpty()) {
throw new IllegalStateException("no leaf processor found in topology");
}
if (leaves.size() > 1) {
throw new IllegalStateException("multiple leaf processors found in topology: " + leaves);
}
return leaves.get(0);
}| } else if (node instanceof TopologyDescription.Source) { | ||
| sourceTopics.addAll(((TopologyDescription.Source) node).topicSet()); | ||
| } |
There was a problem hiding this comment.
If a source node in the topology is configured using a Pattern instead of a set of topic names, topicSet() will return null. Calling sourceTopics.addAll(null) will throw a NullPointerException. Adding a null check before calling addAll prevents potential crashes when pattern-based sources are used.
} else if (node instanceof TopologyDescription.Source) {
Set<String> topics = ((TopologyDescription.Source) node).topicSet();
if (topics != null) {
sourceTopics.addAll(topics);
}
}| private static void roundTripInternalTopics(TopologyTestDriver driver, Set<String> topics) { | ||
| for (int round = 0; round < MAX_ROUND_TRIPS; round++) { | ||
| boolean progressed = false; | ||
| for (String topic : topics) { | ||
| TestOutputTopic<byte[], byte[]> out = | ||
| driver.createOutputTopic( | ||
| topic, new ByteArrayDeserializer(), new ByteArrayDeserializer()); | ||
| List<TestRecord<byte[], byte[]>> records = out.readRecordsToList(); | ||
| if (records.isEmpty()) { | ||
| continue; | ||
| } | ||
| progressed = true; | ||
| TestInputTopic<byte[], byte[]> in = | ||
| driver.createInputTopic(topic, new ByteArraySerializer(), new ByteArraySerializer()); | ||
| for (TestRecord<byte[], byte[]> record : records) { | ||
| in.pipeInput(record); | ||
| } | ||
| } | ||
| if (!progressed) { | ||
| return; | ||
| } | ||
| } | ||
| throw new IllegalStateException( | ||
| "Internal topics did not reach quiescence after " + MAX_ROUND_TRIPS + " round trips"); | ||
| } |
There was a problem hiding this comment.
Creating TestInputTopic and TestOutputTopic instances inside the round-trip loop on every iteration is inefficient as it repeatedly allocates new objects and performs lookups in TopologyTestDriver's internal maps. Instantiating them once outside the loop improves performance and reduces overhead.
private static void roundTripInternalTopics(TopologyTestDriver driver, Set<String> topics) {
java.util.Map<String, TestOutputTopic<byte[], byte[]>> outputs = new java.util.HashMap<>();
java.util.Map<String, TestInputTopic<byte[], byte[]>> inputs = new java.util.HashMap<>();
for (String topic : topics) {
outputs.put(
topic,
driver.createOutputTopic(
topic, new ByteArrayDeserializer(), new ByteArrayDeserializer()));
inputs.put(
topic,
driver.createInputTopic(topic, new ByteArraySerializer(), new ByteArraySerializer()));
}
for (int round = 0; round < MAX_ROUND_TRIPS; round++) {
boolean progressed = false;
for (String topic : topics) {
TestOutputTopic<byte[], byte[]> out = outputs.get(topic);
List<TestRecord<byte[], byte[]>> records = out.readRecordsToList();
if (records.isEmpty()) {
continue;
}
progressed = true;
TestInputTopic<byte[], byte[]> in = inputs.get(topic);
for (TestRecord<byte[], byte[]> record : records) {
in.pipeInput(record);
}
}
if (!progressed) {
return;
}
}
throw new IllegalStateException(
"Internal topics did not reach quiescence after " + MAX_ROUND_TRIPS + " round trips");
}|
Assigning reviewers: R: @kennknowles added as fallback since no labels match configuration Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Summary
First part of the testing-infra sub-issue (#39192): a reusable
KafkaStreamsTestRunnerthat removes the translate + drive boilerplate every test repeated, and is the foundation for Create support and the first@ValidatesRunnertests (the next two parts).run(Pipeline)translates + drives through aTopologyTestDriver, andauto-discovers internal repartition topics (both a sink and a source, via
TopologyDescription) and round-trips them to quiescence — generalising themanual repartition loop
GroupByKeyTestused to do by hand.translate()/streamsConfig()/leafProcessorName()for tests that needthe
Topology(to attach a capture processor before driving).testOptions()for the EMBEDDED harness + a unique app id.Migrates 4 tests onto it;
ChainedExecutableStageTestkeeps its explicittranslate because it asserts on the prepared proto's ExecutableStage count.
Testing
Test-only change, no runner behaviour change.
:checkgreen, 36 tests.Refs #39192
cc @je-ik