From a50710cc6b6ab8c4ed79066dbeba5d9470262c6a Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Fri, 3 Jul 2026 15:02:21 +0500 Subject: [PATCH] Add KafkaStreamsTestRunner test harness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First part of the testing-infra sub-issue: extract the translate + drive boilerplate every runner test repeated into a reusable KafkaStreamsTestRunner. - run(Pipeline): translate + drive through a TopologyTestDriver, and — since the driver does not loop a low-level sink topic back into its source — auto-discover internal repartition topics (those that appear as both a sink and a source, via TopologyDescription) and round-trip them to quiescence. This generalises the manual repartition loop GroupByKeyTest did by hand. - translate(Pipeline) / streamsConfig(Pipeline) / leafProcessorName(Topology) for tests that need the Topology to attach a capture processor before driving. - testOptions(): the EMBEDDED harness + a unique application id. Migrate GroupByKeyTest and ExecutableStageTranslatorTest to run(), and WatermarkPropagationTest and KafkaStreamsRunnerTest to translate()/streamsConfig(). ChainedExecutableStageTest keeps its explicit translate because it asserts on the prepared pipeline proto's ExecutableStage count. Test-only; no runner behaviour change. Refs #39192 --- .../kafka/streams/KafkaStreamsRunnerTest.java | 78 +------ .../kafka/streams/KafkaStreamsTestRunner.java | 190 ++++++++++++++++++ .../ExecutableStageTranslatorTest.java | 72 +------ .../streams/translation/GroupByKeyTest.java | 105 +--------- .../translation/WatermarkPropagationTest.java | 74 +------ 5 files changed, 220 insertions(+), 299 deletions(-) create mode 100644 runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsTestRunner.java diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunnerTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunnerTest.java index 0c576d27122f..4574dd37d2ec 100644 --- a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunnerTest.java +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunnerTest.java @@ -24,22 +24,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Properties; import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.kafka.streams.translation.KStreamsPayload; -import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsPipelineTranslator; -import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; -import org.apache.beam.sdk.util.construction.PipelineTranslation; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.processor.api.Processor; @@ -51,8 +40,8 @@ /** * Pipeline-level integration tests that build a Beam {@link Pipeline} via the high-level Java SDK - * ({@code Pipeline.create().apply(Impulse.create())}), translate it via the runner, and execute the - * resulting Kafka Streams topology under {@link TopologyTestDriver}. + * ({@code Pipeline.create().apply(Impulse.create())}) and run it through {@link + * KafkaStreamsTestRunner}. * *

This is the test layer Jan requested on PR #38689: rather than building hand-rolled {@link * RunnerApi.Pipeline} protos, drive translation from the same surface a user would write. The tests @@ -61,33 +50,18 @@ */ public class KafkaStreamsRunnerTest { - private static final String JOB_ID = "kafka-streams-runner-test"; - private static final String APPLICATION_ID = "ks-runner-test"; - @Test public void impulseOnlyPipelineEmitsDataAndTerminalWatermark() { - Pipeline pipeline = Pipeline.create(pipelineOptions()); + Pipeline pipeline = Pipeline.create(KafkaStreamsTestRunner.testOptions()); pipeline.apply("impulse", Impulse.create()); - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); - - KafkaStreamsPipelineOptions options = - pipeline.getOptions().as(KafkaStreamsPipelineOptions.class); - KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); - JobInfo jobInfo = - JobInfo.create( - JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options)); - KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, options); - translator.translate(context, translator.prepareForTranslation(pipelineProto)); - CapturingProcessor capture = new CapturingProcessor(); - Topology topology = context.getTopology(); - // Wire a downstream test sink to every translated transform node so we can capture emissions. - // Impulse is the only transform here, so we attach to "impulse" (the processor name registered - // by ImpulseTranslator). - topology.addProcessor("capture", capture, expectedImpulseProcessorName(pipelineProto)); + Topology topology = KafkaStreamsTestRunner.translate(pipeline).getTopology(); + // Impulse is the only transform, so it is the topology leaf; capture what it forwards. + topology.addProcessor("capture", capture, KafkaStreamsTestRunner.leafProcessorName(topology)); - try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig())) { + try (TopologyTestDriver driver = + new TopologyTestDriver(topology, KafkaStreamsTestRunner.streamsConfig(pipeline))) { driver.advanceWallClockTime(Duration.ofSeconds(1)); driver.advanceWallClockTime(Duration.ofSeconds(1)); } @@ -101,42 +75,6 @@ public void impulseOnlyPipelineEmitsDataAndTerminalWatermark() { is(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); } - /** - * Finds the transform id that {@link Impulse} got assigned by the SDK so the test can attach a - * capturing processor to the matching Kafka Streams processor node (the translator names the - * processor after the transform id). - */ - private static String expectedImpulseProcessorName(RunnerApi.Pipeline pipelineProto) { - for (java.util.Map.Entry entry : - pipelineProto.getComponents().getTransformsMap().entrySet()) { - if ("beam:transform:impulse:v1".equals(entry.getValue().getSpec().getUrn())) { - return entry.getKey(); - } - } - throw new IllegalStateException("Impulse transform not found in pipeline proto"); - } - - private static PipelineOptions pipelineOptions() { - PipelineOptions options = - PipelineOptionsFactory.fromArgs("--applicationId=" + APPLICATION_ID).create(); - // Pipeline.create() requires a runner; CrashingRunner is the conventional "this pipeline is - // not going to be run() directly" choice used by other portable-runner tests. - options.setRunner(CrashingRunner.class); - options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID); - return options; - } - - private static Properties streamsConfig() { - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put( - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); - props.put( - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); - return props; - } - private static class CapturingProcessor implements ProcessorSupplier< byte[], KStreamsPayload, byte[], KStreamsPayload> { diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsTestRunner.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsTestRunner.java new file mode 100644 index 000000000000..a84bc583d0ec --- /dev/null +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsTestRunner.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsPipelineTranslator; +import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.apache.beam.sdk.testing.CrashingRunner; +import org.apache.beam.sdk.util.construction.Environments; +import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.test.TestRecord; + +/** + * Test harness that runs a Beam {@link Pipeline} through the Kafka Streams runner's translation and + * a {@link TopologyTestDriver}, so tests do not repeat the translate + drive boilerplate. + * + *

Usage: build a pipeline with {@link #testOptions()}, then call {@link #run(Pipeline)}. Side + * effects (e.g. a {@code SharedTestCollector} written by a recording DoFn) have completed when it + * returns. + * + *

{@link TopologyTestDriver} does not loop a low-level sink topic back into its source, so an + * internal repartition topic (one that is both a sink and a source in the topology — e.g. the one + * GroupByKey introduces) would otherwise dead-end. {@link #run(Pipeline)} discovers those topics + * from the {@link TopologyDescription} and round-trips them until no more records flow, standing in + * for the broker. + */ +public final class KafkaStreamsTestRunner { + + private static final int MAX_ROUND_TRIPS = 100; + + private KafkaStreamsTestRunner() {} + + /** Pipeline options for a Kafka Streams runner test: the EMBEDDED harness and a unique app id. */ + public static PipelineOptions testOptions() { + String applicationId = "ks-test-" + UUID.randomUUID(); + PipelineOptions options = + PipelineOptionsFactory.fromArgs("--applicationId=" + applicationId).create(); + options.setRunner(CrashingRunner.class); + options.as(KafkaStreamsPipelineOptions.class).setApplicationId(applicationId); + options + .as(PortablePipelineOptions.class) + .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED); + return options; + } + + /** + * Translates the pipeline into a Kafka Streams {@link KafkaStreamsTranslationContext}. Tests that + * need the {@link Topology} (e.g. to attach a capture processor before driving) use this and + * build their own {@link TopologyTestDriver}; simpler tests use {@link #run(Pipeline)}. + */ + public static KafkaStreamsTranslationContext translate(Pipeline pipeline) { + KafkaStreamsPipelineOptions options = + pipeline.getOptions().as(KafkaStreamsPipelineOptions.class); + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); + KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); + JobInfo jobInfo = + JobInfo.create( + options.getApplicationId(), + options.getJobName(), + "", + PipelineOptionsTranslation.toProto(options)); + KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, options); + translator.translate(context, translator.prepareForTranslation(pipelineProto)); + return context; + } + + /** Translates and drives the pipeline to quiescence through a {@link TopologyTestDriver}. */ + public static void run(Pipeline pipeline) { + KafkaStreamsTranslationContext context = translate(pipeline); + Topology topology = context.getTopology(); + try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig(pipeline))) { + // Fire the Impulse wall-clock punctuator and let the initial records flow. + driver.advanceWallClockTime(Duration.ofSeconds(1)); + driver.advanceWallClockTime(Duration.ofSeconds(1)); + roundTripInternalTopics(driver, internalTopics(topology)); + } + } + + /** + * The name of the single processor node with no successors (the topology leaf). Tests attach a + * capture processor here to observe what the last stage forwards. + */ + public static String leafProcessorName(Topology topology) { + for (TopologyDescription.Subtopology subtopology : topology.describe().subtopologies()) { + for (TopologyDescription.Node node : subtopology.nodes()) { + if (node instanceof TopologyDescription.Processor && node.successors().isEmpty()) { + return node.name(); + } + } + } + throw new IllegalStateException("no leaf processor found in topology"); + } + + /** Repartition/internal topics are the ones that appear as both a sink and a source. */ + private static Set internalTopics(Topology topology) { + Set sinkTopics = new HashSet<>(); + Set sourceTopics = new HashSet<>(); + for (TopologyDescription.Subtopology subtopology : topology.describe().subtopologies()) { + for (TopologyDescription.Node node : subtopology.nodes()) { + if (node instanceof TopologyDescription.Sink) { + String topic = ((TopologyDescription.Sink) node).topic(); + if (topic != null) { + sinkTopics.add(topic); + } + } else if (node instanceof TopologyDescription.Source) { + sourceTopics.addAll(((TopologyDescription.Source) node).topicSet()); + } + } + } + sinkTopics.retainAll(sourceTopics); + return sinkTopics; + } + + /** Drains each internal topic's output and feeds it back into the source until nothing flows. */ + private static void roundTripInternalTopics(TopologyTestDriver driver, Set topics) { + for (int round = 0; round < MAX_ROUND_TRIPS; round++) { + boolean progressed = false; + for (String topic : topics) { + TestOutputTopic out = + driver.createOutputTopic( + topic, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + List> records = out.readRecordsToList(); + if (records.isEmpty()) { + continue; + } + progressed = true; + TestInputTopic in = + driver.createInputTopic(topic, new ByteArraySerializer(), new ByteArraySerializer()); + for (TestRecord record : records) { + in.pipeInput(record); + } + } + if (!progressed) { + return; + } + } + throw new IllegalStateException( + "Internal topics did not reach quiescence after " + MAX_ROUND_TRIPS + " round trips"); + } + + /** Kafka Streams config for a {@link TopologyTestDriver} built from the pipeline's app id. */ + public static Properties streamsConfig(Pipeline pipeline) { + String applicationId = + pipeline.getOptions().as(KafkaStreamsPipelineOptions.class).getApplicationId(); + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put( + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); + props.put( + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); + return props; + } +} diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java index 98ec45ae2885..4750718c00f6 100644 --- a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java @@ -20,35 +20,19 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import java.time.Duration; import java.util.List; -import java.util.Properties; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.fnexecution.provisioning.JobInfo; -import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; +import org.apache.beam.runners.kafka.streams.KafkaStreamsTestRunner; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.PortablePipelineOptions; -import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.construction.Environments; -import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; -import org.apache.beam.sdk.util.construction.PipelineTranslation; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; import org.junit.Test; /** * End-to-end test for {@link ExecutableStageTranslator}: builds an {@code Impulse -> ParDo} - * pipeline with the high-level Beam Java SDK, fuses + translates it, and runs the resulting Kafka - * Streams topology under {@link TopologyTestDriver}. The fused ParDo executes in an in-process - * (EMBEDDED) Java SDK harness, so the {@link DoFn}'s {@code @ProcessElement} body runs for real — - * no Docker, no broker. + * pipeline with the high-level Beam Java SDK and runs it through {@link KafkaStreamsTestRunner}. + * The fused ParDo executes in an in-process (EMBEDDED) Java SDK harness, so the {@link DoFn}'s + * {@code @ProcessElement} body runs for real — no Docker, no broker. * *

Because the ParDo's output PCollection has no downstream consumer, it is not a stage output * and is never forwarded out of the harness — that is the documented behaviour. The test verifies @@ -57,9 +41,6 @@ */ public class ExecutableStageTranslatorTest { - private static final String JOB_ID = "kafka-streams-executable-stage-test"; - private static final String APPLICATION_ID = "ks-executable-stage-test"; - /** * Records the length of every input element seen by the harness so the test can verify the DoFn * ran. {@link SharedTestCollector} carries its identity via a UUID stored on the instance itself, @@ -82,31 +63,14 @@ public void processElement(@Element byte[] input, OutputReceiver out) { } @Test - public void impulseThenParDoExecutesDoFnInHarnessOncePerImpulseElement() throws Exception { + public void impulseThenParDoExecutesDoFnInHarnessOncePerImpulseElement() { try (SharedTestCollector collector = SharedTestCollector.create()) { - Pipeline pipeline = Pipeline.create(pipelineOptions()); + Pipeline pipeline = Pipeline.create(KafkaStreamsTestRunner.testOptions()); pipeline .apply("impulse", Impulse.create()) .apply("pardo", ParDo.of(new RecordingFn(collector))); - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); - - KafkaStreamsPipelineOptions options = - pipeline.getOptions().as(KafkaStreamsPipelineOptions.class); - KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); - JobInfo jobInfo = - JobInfo.create( - JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options)); - KafkaStreamsTranslationContext context = - translator.createTranslationContext(jobInfo, options); - - translator.translate(context, translator.prepareForTranslation(pipelineProto)); - - Topology topology = context.getTopology(); - try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig())) { - driver.advanceWallClockTime(Duration.ofSeconds(1)); - driver.advanceWallClockTime(Duration.ofSeconds(1)); - } + KafkaStreamsTestRunner.run(pipeline); List recorded = collector.recorded(); // Impulse emits exactly one empty byte[] in the GlobalWindow, so the DoFn must run exactly @@ -115,26 +79,4 @@ public void impulseThenParDoExecutesDoFnInHarnessOncePerImpulseElement() throws assertThat(recorded.get(0), is(0)); } } - - private static PipelineOptions pipelineOptions() { - PipelineOptions options = - PipelineOptionsFactory.fromArgs("--applicationId=" + APPLICATION_ID).create(); - options.setRunner(CrashingRunner.class); - options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID); - options - .as(PortablePipelineOptions.class) - .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED); - return options; - } - - private static Properties streamsConfig() { - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put( - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); - props.put( - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); - return props; - } } diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/GroupByKeyTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/GroupByKeyTest.java index 0aed80fa8585..38f19ded356c 100644 --- a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/GroupByKeyTest.java +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/GroupByKeyTest.java @@ -21,56 +21,30 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Properties; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.fnexecution.provisioning.JobInfo; -import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; +import org.apache.beam.runners.kafka.streams.KafkaStreamsTestRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.PortablePipelineOptions; -import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.construction.Environments; -import org.apache.beam.sdk.util.construction.PTransformTranslation; -import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; -import org.apache.beam.sdk.util.construction.PipelineTranslation; import org.apache.beam.sdk.values.KV; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TestOutputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.test.TestRecord; import org.junit.Test; /** * End-to-end test of GroupByKey: {@code Impulse -> emit KVs -> GroupByKey -> record groups}. * - *

GroupByKey shuffles through an internal repartition topic. {@link TopologyTestDriver} does not - * loop a low-level sink topic back into its source, so the test drives the upstream, drains the - * repartition topic, and pipes those records back into it — standing in for the broker round-trip. - * The downstream {@code RecordGroupFn} records each emitted group into a {@link - * SharedTestCollector}. + *

Driven through {@link KafkaStreamsTestRunner}, which round-trips the internal repartition + * topic that GroupByKey introduces. The downstream {@code RecordGroupFn} records each emitted group + * into a {@link SharedTestCollector}. */ public class GroupByKeyTest { - private static final String JOB_ID = "ks-gbk-test"; - private static final String APPLICATION_ID = "ks-gbk-test"; - /** Emits a few KVs from the single impulse element so there is something to group. */ private static class EmitKvsFn extends DoFn> { @ProcessElement @@ -99,9 +73,9 @@ public void processElement(@Element KV> group) { } @Test - public void groupsValuesByKeyAndFiresAtWatermark() throws Exception { + public void groupsValuesByKeyAndFiresAtWatermark() { try (SharedTestCollector collector = SharedTestCollector.create()) { - Pipeline pipeline = Pipeline.create(pipelineOptions()); + Pipeline pipeline = Pipeline.create(KafkaStreamsTestRunner.testOptions()); pipeline .apply("impulse", Impulse.create()) .apply("emit", ParDo.of(new EmitKvsFn())) @@ -109,76 +83,11 @@ public void groupsValuesByKeyAndFiresAtWatermark() throws Exception { .apply("gbk", GroupByKey.create()) .apply("record", ParDo.of(new RecordGroupFn(collector))); - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); - KafkaStreamsPipelineOptions options = - pipeline.getOptions().as(KafkaStreamsPipelineOptions.class); - KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); - JobInfo jobInfo = - JobInfo.create( - JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options)); - KafkaStreamsTranslationContext context = - translator.createTranslationContext(jobInfo, options); - - RunnerApi.Pipeline prepared = translator.prepareForTranslation(pipelineProto); - translator.translate(context, prepared); - String repartitionTopic = GroupByKeyTranslator.repartitionTopic(findGroupByKeyId(prepared)); - - Topology topology = context.getTopology(); - try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig())) { - // Fire the impulse; the upstream stage emits the KVs and the terminal watermark, which the - // re-key processor sends to the repartition sink. - driver.advanceWallClockTime(Duration.ofSeconds(1)); - driver.advanceWallClockTime(Duration.ofSeconds(1)); - - // Round-trip the repartition topic: drain what the sink wrote and feed it back to the - // source so the GroupByKey processor buffers the values and fires at the watermark. - TestOutputTopic repartitionOut = - driver.createOutputTopic( - repartitionTopic, new ByteArrayDeserializer(), new ByteArrayDeserializer()); - TestInputTopic repartitionIn = - driver.createInputTopic( - repartitionTopic, new ByteArraySerializer(), new ByteArraySerializer()); - for (TestRecord record : repartitionOut.readRecordsToList()) { - repartitionIn.pipeInput(record); - } - } + KafkaStreamsTestRunner.run(pipeline); List groups = collector.recorded(); assertThat(groups.size(), is(2)); assertThat(groups, hasItems("a=[1, 2]", "b=[3]")); } } - - private static String findGroupByKeyId(RunnerApi.Pipeline pipeline) { - return pipeline.getComponents().getTransformsMap().entrySet().stream() - .filter( - e -> - PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN.equals( - e.getValue().getSpec().getUrn())) - .map(java.util.Map.Entry::getKey) - .findFirst() - .orElseThrow(() -> new AssertionError("no GroupByKey transform in the pipeline")); - } - - private static PipelineOptions pipelineOptions() { - PipelineOptions options = - PipelineOptionsFactory.fromArgs("--applicationId=" + APPLICATION_ID).create(); - options.setRunner(CrashingRunner.class); - options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID); - options - .as(PortablePipelineOptions.class) - .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED); - return options; - } - - private static Properties streamsConfig() { - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put( - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); - props.put( - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); - return props; - } } diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/WatermarkPropagationTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/WatermarkPropagationTest.java index e4bdbe4d9733..cfd49476d38c 100644 --- a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/WatermarkPropagationTest.java +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/WatermarkPropagationTest.java @@ -23,26 +23,13 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Properties; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.fnexecution.provisioning.JobInfo; -import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; +import org.apache.beam.runners.kafka.streams.KafkaStreamsTestRunner; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.PortablePipelineOptions; -import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.construction.Environments; -import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; -import org.apache.beam.sdk.util.construction.PipelineTranslation; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; @@ -57,8 +44,6 @@ */ public class WatermarkPropagationTest { - private static final String APPLICATION_ID = "ks-watermark-propagation-test"; - /** Identity DoFn so the pipeline contains a fused ExecutableStage. */ private static class IdentityFn extends DoFn { @ProcessElement @@ -86,26 +71,19 @@ public void process(Record> record) { @Test public void terminalWatermarkPropagatesToDownstreamStampedAsSingleSource() throws Exception { - Pipeline pipeline = Pipeline.create(pipelineOptions()); + Pipeline pipeline = Pipeline.create(KafkaStreamsTestRunner.testOptions()); pipeline.apply("impulse", Impulse.create()).apply("identity", ParDo.of(new IdentityFn())); - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); - KafkaStreamsPipelineOptions options = - pipeline.getOptions().as(KafkaStreamsPipelineOptions.class); - KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); - JobInfo jobInfo = - JobInfo.create( - APPLICATION_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options)); - KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, options); - translator.translate(context, translator.prepareForTranslation(pipelineProto)); - // Attach a sink to the leaf ExecutableStage processor to capture the watermark it forwards. - Topology topology = context.getTopology(); + Topology topology = KafkaStreamsTestRunner.translate(pipeline).getTopology(); List> captured = new ArrayList<>(); topology.addProcessor( - "watermark-capture", () -> new WatermarkCapture(captured), findLeafProcessor(topology)); + "watermark-capture", + () -> new WatermarkCapture(captured), + KafkaStreamsTestRunner.leafProcessorName(topology)); - try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig())) { + try (TopologyTestDriver driver = + new TopologyTestDriver(topology, KafkaStreamsTestRunner.streamsConfig(pipeline))) { driver.advanceWallClockTime(Duration.ofSeconds(1)); driver.advanceWallClockTime(Duration.ofSeconds(1)); } @@ -116,40 +94,4 @@ public void terminalWatermarkPropagatesToDownstreamStampedAsSingleSource() throw assertThat(terminal.getSourcePartition(), is(0)); assertThat(terminal.getTotalSourcePartitions(), is(1)); } - - /** - * Returns the name of the single processor node with no successors (the leaf of the topology). - */ - private static String findLeafProcessor(Topology topology) { - for (TopologyDescription.Subtopology subtopology : topology.describe().subtopologies()) { - for (TopologyDescription.Node node : subtopology.nodes()) { - if (node instanceof TopologyDescription.Processor && node.successors().isEmpty()) { - return node.name(); - } - } - } - throw new IllegalStateException("no leaf processor found in topology"); - } - - private static PipelineOptions pipelineOptions() { - PipelineOptions options = - PipelineOptionsFactory.fromArgs("--applicationId=" + APPLICATION_ID).create(); - options.setRunner(CrashingRunner.class); - options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID); - options - .as(PortablePipelineOptions.class) - .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED); - return options; - } - - private static Properties streamsConfig() { - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put( - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); - props.put( - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); - return props; - } }