Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.kafka.streams.translation.KStreamsPayload;
import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsPipelineTranslator;
import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.processor.api.Processor;
Expand All @@ -51,8 +40,8 @@

/**
* Pipeline-level integration tests that build a Beam {@link Pipeline} via the high-level Java SDK
* ({@code Pipeline.create().apply(Impulse.create())}), translate it via the runner, and execute the
* resulting Kafka Streams topology under {@link TopologyTestDriver}.
* ({@code Pipeline.create().apply(Impulse.create())}) and run it through {@link
* KafkaStreamsTestRunner}.
*
* <p>This is the test layer Jan requested on PR #38689: rather than building hand-rolled {@link
* RunnerApi.Pipeline} protos, drive translation from the same surface a user would write. The tests
Expand All @@ -61,33 +50,18 @@
*/
public class KafkaStreamsRunnerTest {

private static final String JOB_ID = "kafka-streams-runner-test";
private static final String APPLICATION_ID = "ks-runner-test";

@Test
public void impulseOnlyPipelineEmitsDataAndTerminalWatermark() {
Pipeline pipeline = Pipeline.create(pipelineOptions());
Pipeline pipeline = Pipeline.create(KafkaStreamsTestRunner.testOptions());
pipeline.apply("impulse", Impulse.create());

RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);

KafkaStreamsPipelineOptions options =
pipeline.getOptions().as(KafkaStreamsPipelineOptions.class);
KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator();
JobInfo jobInfo =
JobInfo.create(
JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options));
KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, options);
translator.translate(context, translator.prepareForTranslation(pipelineProto));

CapturingProcessor capture = new CapturingProcessor();
Topology topology = context.getTopology();
// Wire a downstream test sink to every translated transform node so we can capture emissions.
// Impulse is the only transform here, so we attach to "impulse" (the processor name registered
// by ImpulseTranslator).
topology.addProcessor("capture", capture, expectedImpulseProcessorName(pipelineProto));
Topology topology = KafkaStreamsTestRunner.translate(pipeline).getTopology();
// Impulse is the only transform, so it is the topology leaf; capture what it forwards.
topology.addProcessor("capture", capture, KafkaStreamsTestRunner.leafProcessorName(topology));

try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig())) {
try (TopologyTestDriver driver =
new TopologyTestDriver(topology, KafkaStreamsTestRunner.streamsConfig(pipeline))) {
driver.advanceWallClockTime(Duration.ofSeconds(1));
driver.advanceWallClockTime(Duration.ofSeconds(1));
}
Expand All @@ -101,42 +75,6 @@ public void impulseOnlyPipelineEmitsDataAndTerminalWatermark() {
is(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
}

/**
* Finds the transform id that {@link Impulse} got assigned by the SDK so the test can attach a
* capturing processor to the matching Kafka Streams processor node (the translator names the
* processor after the transform id).
*/
private static String expectedImpulseProcessorName(RunnerApi.Pipeline pipelineProto) {
for (java.util.Map.Entry<String, RunnerApi.PTransform> entry :
pipelineProto.getComponents().getTransformsMap().entrySet()) {
if ("beam:transform:impulse:v1".equals(entry.getValue().getSpec().getUrn())) {
return entry.getKey();
}
}
throw new IllegalStateException("Impulse transform not found in pipeline proto");
}

private static PipelineOptions pipelineOptions() {
PipelineOptions options =
PipelineOptionsFactory.fromArgs("--applicationId=" + APPLICATION_ID).create();
// Pipeline.create() requires a runner; CrashingRunner is the conventional "this pipeline is
// not going to be run() directly" choice used by other portable-runner tests.
options.setRunner(CrashingRunner.class);
options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID);
return options;
}

private static Properties streamsConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put(
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
return props;
}

private static class CapturingProcessor
implements ProcessorSupplier<
byte[], KStreamsPayload<byte[]>, byte[], KStreamsPayload<byte[]>> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.kafka.streams;

import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsPipelineTranslator;
import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.util.construction.Environments;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.TestRecord;

/**
* Test harness that runs a Beam {@link Pipeline} through the Kafka Streams runner's translation and
* a {@link TopologyTestDriver}, so tests do not repeat the translate + drive boilerplate.
*
* <p>Usage: build a pipeline with {@link #testOptions()}, then call {@link #run(Pipeline)}. Side
* effects (e.g. a {@code SharedTestCollector} written by a recording DoFn) have completed when it
* returns.
*
* <p>{@link TopologyTestDriver} does not loop a low-level sink topic back into its source, so an
* internal repartition topic (one that is both a sink and a source in the topology — e.g. the one
* GroupByKey introduces) would otherwise dead-end. {@link #run(Pipeline)} discovers those topics
* from the {@link TopologyDescription} and round-trips them until no more records flow, standing in
* for the broker.
*/
public final class KafkaStreamsTestRunner {

private static final int MAX_ROUND_TRIPS = 100;

private KafkaStreamsTestRunner() {}

/** Pipeline options for a Kafka Streams runner test: the EMBEDDED harness and a unique app id. */
public static PipelineOptions testOptions() {
String applicationId = "ks-test-" + UUID.randomUUID();
PipelineOptions options =
PipelineOptionsFactory.fromArgs("--applicationId=" + applicationId).create();
options.setRunner(CrashingRunner.class);
options.as(KafkaStreamsPipelineOptions.class).setApplicationId(applicationId);
options
.as(PortablePipelineOptions.class)
.setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
return options;
}

/**
* Translates the pipeline into a Kafka Streams {@link KafkaStreamsTranslationContext}. Tests that
* need the {@link Topology} (e.g. to attach a capture processor before driving) use this and
* build their own {@link TopologyTestDriver}; simpler tests use {@link #run(Pipeline)}.
*/
public static KafkaStreamsTranslationContext translate(Pipeline pipeline) {
KafkaStreamsPipelineOptions options =
pipeline.getOptions().as(KafkaStreamsPipelineOptions.class);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator();
JobInfo jobInfo =
JobInfo.create(
options.getApplicationId(),
options.getJobName(),
"",
PipelineOptionsTranslation.toProto(options));
KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, options);
translator.translate(context, translator.prepareForTranslation(pipelineProto));
return context;
}

/** Translates and drives the pipeline to quiescence through a {@link TopologyTestDriver}. */
public static void run(Pipeline pipeline) {
KafkaStreamsTranslationContext context = translate(pipeline);
Topology topology = context.getTopology();
try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig(pipeline))) {
// Fire the Impulse wall-clock punctuator and let the initial records flow.
driver.advanceWallClockTime(Duration.ofSeconds(1));
driver.advanceWallClockTime(Duration.ofSeconds(1));
roundTripInternalTopics(driver, internalTopics(topology));
}
}

/**
* The name of the single processor node with no successors (the topology leaf). Tests attach a
* capture processor here to observe what the last stage forwards.
*/
public static String leafProcessorName(Topology topology) {
for (TopologyDescription.Subtopology subtopology : topology.describe().subtopologies()) {
for (TopologyDescription.Node node : subtopology.nodes()) {
if (node instanceof TopologyDescription.Processor && node.successors().isEmpty()) {
return node.name();
}
}
}
throw new IllegalStateException("no leaf processor found in topology");
}
Comment on lines +119 to +128

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The iteration order of subtopology.nodes() (which returns a Set) is not guaranteed to be deterministic. If a topology contains multiple leaf processors (processors with no successors), leafProcessorName will return whichever leaf it encounters first, which could lead to non-deterministic test failures. Collecting all leaf processors and asserting that exactly one exists makes the test harness more robust and deterministic.

  public static String leafProcessorName(Topology topology) {
    List<String> leaves = new java.util.ArrayList<>();
    for (TopologyDescription.Subtopology subtopology : topology.describe().subtopologies()) {
      for (TopologyDescription.Node node : subtopology.nodes()) {
        if (node instanceof TopologyDescription.Processor && node.successors().isEmpty()) {
          leaves.add(node.name());
        }
      }
    }
    if (leaves.isEmpty()) {
      throw new IllegalStateException("no leaf processor found in topology");
    }
    if (leaves.size() > 1) {
      throw new IllegalStateException("multiple leaf processors found in topology: " + leaves);
    }
    return leaves.get(0);
  }


/** Repartition/internal topics are the ones that appear as both a sink and a source. */
private static Set<String> internalTopics(Topology topology) {
Set<String> sinkTopics = new HashSet<>();
Set<String> sourceTopics = new HashSet<>();
for (TopologyDescription.Subtopology subtopology : topology.describe().subtopologies()) {
for (TopologyDescription.Node node : subtopology.nodes()) {
if (node instanceof TopologyDescription.Sink) {
String topic = ((TopologyDescription.Sink) node).topic();
if (topic != null) {
sinkTopics.add(topic);
}
} else if (node instanceof TopologyDescription.Source) {
sourceTopics.addAll(((TopologyDescription.Source) node).topicSet());
}
Comment on lines +141 to +143

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If a source node in the topology is configured using a Pattern instead of a set of topic names, topicSet() will return null. Calling sourceTopics.addAll(null) will throw a NullPointerException. Adding a null check before calling addAll prevents potential crashes when pattern-based sources are used.

        } else if (node instanceof TopologyDescription.Source) {
          Set<String> topics = ((TopologyDescription.Source) node).topicSet();
          if (topics != null) {
            sourceTopics.addAll(topics);
          }
        }

}
}
sinkTopics.retainAll(sourceTopics);
return sinkTopics;
}

/** Drains each internal topic's output and feeds it back into the source until nothing flows. */
private static void roundTripInternalTopics(TopologyTestDriver driver, Set<String> topics) {
for (int round = 0; round < MAX_ROUND_TRIPS; round++) {
boolean progressed = false;
for (String topic : topics) {
TestOutputTopic<byte[], byte[]> out =
driver.createOutputTopic(
topic, new ByteArrayDeserializer(), new ByteArrayDeserializer());
List<TestRecord<byte[], byte[]>> records = out.readRecordsToList();
if (records.isEmpty()) {
continue;
}
progressed = true;
TestInputTopic<byte[], byte[]> in =
driver.createInputTopic(topic, new ByteArraySerializer(), new ByteArraySerializer());
for (TestRecord<byte[], byte[]> record : records) {
in.pipeInput(record);
}
}
if (!progressed) {
return;
}
}
throw new IllegalStateException(
"Internal topics did not reach quiescence after " + MAX_ROUND_TRIPS + " round trips");
}
Comment on lines +151 to +175

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Creating TestInputTopic and TestOutputTopic instances inside the round-trip loop on every iteration is inefficient as it repeatedly allocates new objects and performs lookups in TopologyTestDriver's internal maps. Instantiating them once outside the loop improves performance and reduces overhead.

  private static void roundTripInternalTopics(TopologyTestDriver driver, Set<String> topics) {
    java.util.Map<String, TestOutputTopic<byte[], byte[]>> outputs = new java.util.HashMap<>();
    java.util.Map<String, TestInputTopic<byte[], byte[]>> inputs = new java.util.HashMap<>();
    for (String topic : topics) {
      outputs.put(
          topic,
          driver.createOutputTopic(
              topic, new ByteArrayDeserializer(), new ByteArrayDeserializer()));
      inputs.put(
          topic,
          driver.createInputTopic(topic, new ByteArraySerializer(), new ByteArraySerializer()));
    }

    for (int round = 0; round < MAX_ROUND_TRIPS; round++) {
      boolean progressed = false;
      for (String topic : topics) {
        TestOutputTopic<byte[], byte[]> out = outputs.get(topic);
        List<TestRecord<byte[], byte[]>> records = out.readRecordsToList();
        if (records.isEmpty()) {
          continue;
        }
        progressed = true;
        TestInputTopic<byte[], byte[]> in = inputs.get(topic);
        for (TestRecord<byte[], byte[]> record : records) {
          in.pipeInput(record);
        }
      }
      if (!progressed) {
        return;
      }
    }
    throw new IllegalStateException(
        "Internal topics did not reach quiescence after " + MAX_ROUND_TRIPS + " round trips");
  }


/** Kafka Streams config for a {@link TopologyTestDriver} built from the pipeline's app id. */
public static Properties streamsConfig(Pipeline pipeline) {
String applicationId =
pipeline.getOptions().as(KafkaStreamsPipelineOptions.class).getApplicationId();
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put(
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
return props;
}
}
Loading
Loading