Skip to content
Open
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@

## Breaking Changes

* Portable Java SDK now encodes SchemaCoders in a portable way ([34672](https://github.com/apache/beam/issues/34672)).
- Original custom Java coder encoding can still be obtained using [StreamingOptions.setUpdateCompatibilityVersion("2.72")](https://github.com/apache/beam/blob/2cf0930e7ae1aa389c26ce6639b584877a3e31d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L47).
- Fixes [36496](https://github.com/apache/beam/issues/36496), [30276](https://github.com/apache/beam/issues/30276), [29245](https://github.com/apache/beam/issues/29245).
* The Python SDK container's `boot.go` now passes pipeline options through a file instead of the `PIPELINE_OPTIONS` environment variable. If a user pairs a new Python SDK container with an older SDK version (which does not support the file-based approach), the pipeline options will not be recognized and the pipeline will fail. Users must ensure their SDK and container versions are synchronized ([#37370](https://github.com/apache/beam/issues/37370)).

## Deprecations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public class DataflowPipelineTranslator {
private static byte[] serializeWindowingStrategy(
WindowingStrategy<?, ?> windowingStrategy, PipelineOptions options) {
try {
SdkComponents sdkComponents = SdkComponents.create();
SdkComponents sdkComponents = SdkComponents.create(options);

String workerHarnessContainerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1319,13 +1319,13 @@ public DataflowPipelineJob run(Pipeline pipeline) {
// with the SDK harness image (which implements Fn API).
//
// The same Environment is used in different and contradictory ways, depending on whether
// it is a v1 or v2 job submission.
// it is a portable or non-portable job submission.
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(workerHarnessContainerImageURL);

// The SdkComponents for portable an non-portable job submission must be kept distinct. Both
// The SdkComponents for portable and non-portable job submission must be kept distinct. Both
// need the default environment.
SdkComponents portableComponents = SdkComponents.create();
SdkComponents portableComponents = SdkComponents.create(options);
portableComponents.registerEnvironment(
defaultEnvironmentForDataflow
.toBuilder()
Expand Down Expand Up @@ -1357,28 +1357,29 @@ public DataflowPipelineJob run(Pipeline pipeline) {
dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());

if (useUnifiedWorker(options)) {
LOG.info("Skipping v1 transform replacements since job will run on v2.");
LOG.info(
"Skipping non-portable transform replacements since job will run on portable worker.");
} else {
// Now rewrite things to be as needed for v1 (mutates the pipeline)
// This way the job submitted is valid for v1 and v2, simultaneously
// Now rewrite things to be as needed for non-portable (mutates the pipeline).
// This way the job submitted is valid for portable and non-portable, simultaneously.
replaceV1Transforms(pipeline);
}
// Capture the SdkComponents for look up during step translations
SdkComponents dataflowV1Components = SdkComponents.create();
dataflowV1Components.registerEnvironment(
// Capture the SdkComponents for look up during step translations.
SdkComponents dataflowNonPortableComponents = SdkComponents.create(options);
dataflowNonPortableComponents.registerEnvironment(
defaultEnvironmentForDataflow
.toBuilder()
.addAllDependencies(getDefaultArtifacts())
.addAllCapabilities(Environments.getJavaCapabilities())
.build());
// No need to perform transform upgrading for the Runner v1 proto.
RunnerApi.Pipeline dataflowV1PipelineProto =
PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false);
// No need to perform transform upgrading for the non-portable runner proto.
RunnerApi.Pipeline dataflowNonPortablePipelineProto =
PipelineTranslation.toProto(pipeline, dataflowNonPortableComponents, true, false);

if (LOG.isDebugEnabled()) {
LOG.debug(
"Dataflow v1 pipeline proto:\n{}",
TextFormat.printer().printToString(dataflowV1PipelineProto));
"Dataflow non-portable worker pipeline proto:\n{}",
TextFormat.printer().printToString(dataflowNonPortablePipelineProto));
}

// Set a unique client_request_id in the CreateJob request.
Expand All @@ -1398,7 +1399,11 @@ public DataflowPipelineJob run(Pipeline pipeline) {

JobSpecification jobSpecification =
translator.translate(
pipeline, dataflowV1PipelineProto, dataflowV1Components, this, packages);
pipeline,
dataflowNonPortablePipelineProto,
dataflowNonPortableComponents,
this,
packages);

if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) {
List<String> experiments =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
import com.google.auto.value.AutoValue;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -89,7 +90,10 @@
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
Expand Down Expand Up @@ -153,7 +157,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
@Rule public transient ExpectedException thrown = ExpectedException.none();

private SdkComponents createSdkComponents(PipelineOptions options) {
SdkComponents sdkComponents = SdkComponents.create();
SdkComponents sdkComponents = SdkComponents.create(options);

String containerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
Expand Down Expand Up @@ -1125,7 +1129,7 @@ public String apply(byte[] input) {
file1.deleteOnExit();
File file2 = File.createTempFile("file2-", ".txt");
file2.deleteOnExit();
SdkComponents sdkComponents = SdkComponents.create();
SdkComponents sdkComponents = SdkComponents.create(options);
sdkComponents.registerEnvironment(
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
.toBuilder()
Expand Down Expand Up @@ -1699,4 +1703,53 @@ public OffsetRange getInitialRange(@Element String element) {
return null;
}
}

@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class SimpleAutoValue {
public abstract String getString();

public abstract Integer getInt32();

public abstract Long getInt64();

public static DataflowPipelineTranslatorTest.SimpleAutoValue of(
String string, Integer int32, Long int64) {
return new AutoValue_DataflowPipelineTranslatorTest_SimpleAutoValue(string, int32, int64);
}
}

@Test
public void testSchemaCoderTranslation() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(Impulse.create())
.apply(
MapElements.via(
new SimpleFunction<byte[], SimpleAutoValue>() {
@Override
public SimpleAutoValue apply(byte[] input) {
return SimpleAutoValue.of("foo", 5, 10L);
}
}))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
{
SdkComponents sdkComponents = createSdkComponents(options);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
Map<String, RunnerApi.Coder> coders = pipelineProto.getComponents().getCodersMap();
assertTrue(coders.containsKey("SchemaCoder"));
assertEquals("beam:coder:schema:v1", coders.get("SchemaCoder").getSpec().getUrn());
}

// Prior to version 2.73, SchemaCoders are translated as custom java coders.
{
options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.72");
SdkComponents sdkComponents = createSdkComponents(options);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
Map<String, RunnerApi.Coder> coders = pipelineProto.getComponents().getCodersMap();
assertTrue(coders.containsKey("SchemaCoder"));
assertEquals("beam:coders:javasdk:0.1", coders.get("SchemaCoder").getSpec().getUrn());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public void testLengthPrefixingOfKeyCoderInStatefulExecutableStage() throws Exce
// Add another stateful stage with a non-standard key coder
Pipeline p = Pipeline.create();
Coder<Void> keycoder = VoidCoder.of();
assertThat(ModelCoderRegistrar.isKnownCoder(keycoder), is(false));
ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
assertThat(coderRegistrar.isKnownCoder(keycoder, p.getOptions()), is(false));
p.apply("impulse", Impulse.create())
.apply(
"create",
Expand Down Expand Up @@ -165,7 +166,8 @@ public void onTimer() {}
public void testLengthPrefixingOfInputCoderExecutableStage() throws Exception {
Pipeline p = Pipeline.create();
Coder<Void> voidCoder = VoidCoder.of();
assertThat(ModelCoderRegistrar.isKnownCoder(voidCoder), is(false));
ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
assertThat(coderRegistrar.isKnownCoder(voidCoder, p.getOptions()), is(false));
p.apply("impulse", Impulse.create())
.apply(
ParDo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.dataflow.qual.Deterministic;
Expand Down Expand Up @@ -62,6 +64,8 @@ private static class DefaultTranslationContext implements TranslationContext {}

private static @MonotonicNonNull BiMap<Class<? extends Coder>, String> knownCoderUrns;

private static @MonotonicNonNull List<CoderTranslatorRegistrar> coderTranslatorRegistrars;

private static @MonotonicNonNull Map<Class<? extends Coder>, CoderTranslator<? extends Coder>>
knownTranslators;

Expand All @@ -80,6 +84,53 @@ static BiMap<Class<? extends Coder>, String> getKnownCoderUrns() {
return knownCoderUrns;
}

private static void initializeCoderTranslatorRegistrars() {
ImmutableList.Builder<CoderTranslatorRegistrar> registrars = ImmutableList.builder();
for (CoderTranslatorRegistrar coderTranslatorRegistrar :
ServiceLoader.load(CoderTranslatorRegistrar.class)) {
registrars.add(coderTranslatorRegistrar);
}
coderTranslatorRegistrars = registrars.build();
}

static boolean isKnownCoder(Coder<?> coder, PipelineOptions options) {
if (coderTranslatorRegistrars == null) {
initializeCoderTranslatorRegistrars();
}
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
if (registrar.isKnownCoder(coder, options)) {
return true;
}
}
return false;
}

static CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends Coder> coderClass) {
if (coderTranslatorRegistrars == null) {
initializeCoderTranslatorRegistrars();
}
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
CoderTranslator translator = registrar.getCoderTranslator(coderClass);
if (translator != null) {
return translator;
}
}
return null;
}

static Class<? extends Coder> getCoderForUrn(String coderUrn) {
if (coderTranslatorRegistrars == null) {
initializeCoderTranslatorRegistrars();
}
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
Class<? extends Coder> coder = registrar.getCoderForUrn(coderUrn);
if (coder != null) {
return coder;
}
}
return null;
}

@VisibleForTesting
@Deterministic
static Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getKnownTranslators() {
Expand Down Expand Up @@ -107,7 +158,7 @@ public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOE

public static RunnerApi.Coder toProto(Coder<?> coder, SdkComponents components)
throws IOException {
if (getKnownCoderUrns().containsKey(coder.getClass())) {
if (isKnownCoder(coder, components.getPipelineOptions())) {
return toKnownCoder(coder, components);
}

Expand All @@ -129,7 +180,10 @@ private static RunnerApi.Coder toUnknownCoderWrapper(UnknownCoderWrapper coder)

private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components)
throws IOException {
CoderTranslator translator = getKnownTranslators().get(coder.getClass());
CoderTranslator translator = getCoderTranslator(coder.getClass());
if (translator == null) {
throw new IOException("Unable to find CoderTranslator for known Coder");
}
List<String> componentIds = registerComponents(coder, translator, components);
return RunnerApi.Coder.newBuilder()
.addAllComponentCoderIds(componentIds)
Expand Down Expand Up @@ -186,8 +240,8 @@ private static Coder<?> fromKnownCoder(
components.getComponents().getCodersOrThrow(componentId), components, context);
coderComponents.add(innerCoder);
}
Class<? extends Coder> coderType = getKnownCoderUrns().inverse().get(coderUrn);
CoderTranslator<?> translator = getKnownTranslators().get(coderType);
Class<? extends Coder> coderType = getCoderForUrn(coderUrn);
CoderTranslator<?> translator = getCoderTranslator(coderType);
if (translator != null) {
return translator.fromComponents(
coderComponents, coder.getSpec().getPayload().toByteArray(), context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* additional payload, which is not currently supported. This exists as a temporary measure.
*/
public interface CoderTranslator<T extends Coder<?>> {

/** Extract all component {@link Coder coders} within a coder. */
List<? extends Coder<?>> getComponents(T from);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.checkerframework.checker.nullness.qual.Nullable;

/** A registrar of {@link Coder} URNs to the associated {@link CoderTranslator}. */
@SuppressWarnings({
Expand All @@ -34,4 +36,18 @@ public interface CoderTranslatorRegistrar {

/** Returns a mapping of URN to {@link CoderTranslator}. */
Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getCoderTranslators();

/**
* Returns whether the given Coder is known to this CoderTranslatorRegistrar. If the Coder is
* known, then getCoderTranslator() will return a non-null CoderTranslator.
*/
boolean isKnownCoder(Coder<?> coder, PipelineOptions options);

/** Returns the CoderTranslator to use for this Coder, or null if the Coder is not known. */
@Nullable
CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends Coder> coderClass);

/** Returns the Coder to use for the given Urn, or null if the Urn is for an unknown Coder. */
@Nullable
Class<? extends Coder> getCoderForUrn(String coderUrn);
}
Loading
Loading