diff --git a/CHANGES.md b/CHANGES.md index 5499cb066476..5c4f092b3c7c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -72,6 +72,9 @@ ## Breaking Changes +* Portable Java SDK now encodes SchemaCoders in a portable way ([34672](https://github.com/apache/beam/issues/34672)). + - Original custom Java coder encoding can still be obtained using [StreamingOptions.setUpdateCompatibilityVersion("2.72")](https://github.com/apache/beam/blob/2cf0930e7ae1aa389c26ce6639b584877a3e31d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L47). + - Fixes [36496](https://github.com/apache/beam/issues/36496), [30276](https://github.com/apache/beam/issues/30276), [29245](https://github.com/apache/beam/issues/29245). * The Python SDK container's `boot.go` now passes pipeline options through a file instead of the `PIPELINE_OPTIONS` environment variable. If a user pairs a new Python SDK container with an older SDK version (which does not support the file-based approach), the pipeline options will not be recognized and the pipeline will fail. Users must ensure their SDK and container versions are synchronized ([#37370](https://github.com/apache/beam/issues/37370)). ## Deprecations diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 08d84705c5c7..6a35204007e7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -137,7 +137,7 @@ public class DataflowPipelineTranslator { private static byte[] serializeWindowingStrategy( WindowingStrategy windowingStrategy, PipelineOptions options) { try { - SdkComponents sdkComponents = SdkComponents.create(); + SdkComponents sdkComponents = SdkComponents.create(options); String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index d826c3b2a38c..b3bbd2e7f2e6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1319,13 +1319,13 @@ public DataflowPipelineJob run(Pipeline pipeline) { // with the SDK harness image (which implements Fn API). // // The same Environment is used in different and contradictory ways, depending on whether - // it is a v1 or v2 job submission. + // it is a portable or non-portable job submission. RunnerApi.Environment defaultEnvironmentForDataflow = Environments.createDockerEnvironment(workerHarnessContainerImageURL); - // The SdkComponents for portable an non-portable job submission must be kept distinct. Both + // The SdkComponents for portable and non-portable job submission must be kept distinct. Both // need the default environment. - SdkComponents portableComponents = SdkComponents.create(); + SdkComponents portableComponents = SdkComponents.create(options); portableComponents.registerEnvironment( defaultEnvironmentForDataflow .toBuilder() @@ -1357,28 +1357,29 @@ public DataflowPipelineJob run(Pipeline pipeline) { dataflowOptions.setPipelineUrl(stagedPipeline.getLocation()); if (useUnifiedWorker(options)) { - LOG.info("Skipping v1 transform replacements since job will run on v2."); + LOG.info( + "Skipping non-portable transform replacements since job will run on portable worker."); } else { - // Now rewrite things to be as needed for v1 (mutates the pipeline) - // This way the job submitted is valid for v1 and v2, simultaneously + // Now rewrite things to be as needed for non-portable (mutates the pipeline). + // This way the job submitted is valid for portable and non-portable, simultaneously. replaceV1Transforms(pipeline); } - // Capture the SdkComponents for look up during step translations - SdkComponents dataflowV1Components = SdkComponents.create(); - dataflowV1Components.registerEnvironment( + // Capture the SdkComponents for look up during step translations. + SdkComponents dataflowNonPortableComponents = SdkComponents.create(options); + dataflowNonPortableComponents.registerEnvironment( defaultEnvironmentForDataflow .toBuilder() .addAllDependencies(getDefaultArtifacts()) .addAllCapabilities(Environments.getJavaCapabilities()) .build()); - // No need to perform transform upgrading for the Runner v1 proto. - RunnerApi.Pipeline dataflowV1PipelineProto = - PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false); + // No need to perform transform upgrading for the non-portable runner proto. + RunnerApi.Pipeline dataflowNonPortablePipelineProto = + PipelineTranslation.toProto(pipeline, dataflowNonPortableComponents, true, false); if (LOG.isDebugEnabled()) { LOG.debug( - "Dataflow v1 pipeline proto:\n{}", - TextFormat.printer().printToString(dataflowV1PipelineProto)); + "Dataflow non-portable worker pipeline proto:\n{}", + TextFormat.printer().printToString(dataflowNonPortablePipelineProto)); } // Set a unique client_request_id in the CreateJob request. @@ -1398,7 +1399,11 @@ public DataflowPipelineJob run(Pipeline pipeline) { JobSpecification jobSpecification = translator.translate( - pipeline, dataflowV1PipelineProto, dataflowV1Components, this, packages); + pipeline, + dataflowNonPortablePipelineProto, + dataflowNonPortableComponents, + this, + packages); if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) { List experiments = diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 8226dc2c7274..e3396687410b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -46,6 +46,7 @@ import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.Step; import com.google.api.services.dataflow.model.WorkerPool; +import com.google.auto.value.AutoValue; import java.io.File; import java.io.IOException; import java.io.Serializable; @@ -89,7 +90,10 @@ import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.ValueState; @@ -153,7 +157,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); private SdkComponents createSdkComponents(PipelineOptions options) { - SdkComponents sdkComponents = SdkComponents.create(); + SdkComponents sdkComponents = SdkComponents.create(options); String containerImageURL = DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); @@ -1125,7 +1129,7 @@ public String apply(byte[] input) { file1.deleteOnExit(); File file2 = File.createTempFile("file2-", ".txt"); file2.deleteOnExit(); - SdkComponents sdkComponents = SdkComponents.create(); + SdkComponents sdkComponents = SdkComponents.create(options); sdkComponents.registerEnvironment( Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options)) .toBuilder() @@ -1699,4 +1703,53 @@ public OffsetRange getInitialRange(@Element String element) { return null; } } + + @AutoValue + @DefaultSchema(AutoValueSchema.class) + public abstract static class SimpleAutoValue { + public abstract String getString(); + + public abstract Integer getInt32(); + + public abstract Long getInt64(); + + public static DataflowPipelineTranslatorTest.SimpleAutoValue of( + String string, Integer int32, Long int64) { + return new AutoValue_DataflowPipelineTranslatorTest_SimpleAutoValue(string, int32, int64); + } + } + + @Test + public void testSchemaCoderTranslation() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + Pipeline pipeline = Pipeline.create(options); + pipeline + .apply(Impulse.create()) + .apply( + MapElements.via( + new SimpleFunction() { + @Override + public SimpleAutoValue apply(byte[] input) { + return SimpleAutoValue.of("foo", 5, 10L); + } + })) + .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))); + { + SdkComponents sdkComponents = createSdkComponents(options); + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true); + Map coders = pipelineProto.getComponents().getCodersMap(); + assertTrue(coders.containsKey("SchemaCoder")); + assertEquals("beam:coder:schema:v1", coders.get("SchemaCoder").getSpec().getUrn()); + } + + // Prior to version 2.73, SchemaCoders are translated as custom java coders. + { + options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.72"); + SdkComponents sdkComponents = createSdkComponents(options); + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true); + Map coders = pipelineProto.getComponents().getCodersMap(); + assertTrue(coders.containsKey("SchemaCoder")); + assertEquals("beam:coders:javasdk:0.1", coders.get("SchemaCoder").getSpec().getUrn()); + } + } } diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java index 21d7550c38b9..9ea7404053d6 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java @@ -78,7 +78,8 @@ public void testLengthPrefixingOfKeyCoderInStatefulExecutableStage() throws Exce // Add another stateful stage with a non-standard key coder Pipeline p = Pipeline.create(); Coder keycoder = VoidCoder.of(); - assertThat(ModelCoderRegistrar.isKnownCoder(keycoder), is(false)); + ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar(); + assertThat(coderRegistrar.isKnownCoder(keycoder, p.getOptions()), is(false)); p.apply("impulse", Impulse.create()) .apply( "create", @@ -165,7 +166,8 @@ public void onTimer() {} public void testLengthPrefixingOfInputCoderExecutableStage() throws Exception { Pipeline p = Pipeline.create(); Coder voidCoder = VoidCoder.of(); - assertThat(ModelCoderRegistrar.isKnownCoder(voidCoder), is(false)); + ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar(); + assertThat(coderRegistrar.isKnownCoder(voidCoder, p.getOptions()), is(false)); p.apply("impulse", Impulse.create()) .apply( ParDo.of( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java index 22859dc68b93..2cc4bf0c6a03 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java @@ -25,11 +25,13 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableBiMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.dataflow.qual.Deterministic; @@ -62,6 +64,8 @@ private static class DefaultTranslationContext implements TranslationContext {} private static @MonotonicNonNull BiMap, String> knownCoderUrns; + private static @MonotonicNonNull List coderTranslatorRegistrars; + private static @MonotonicNonNull Map, CoderTranslator> knownTranslators; @@ -80,6 +84,53 @@ static BiMap, String> getKnownCoderUrns() { return knownCoderUrns; } + private static void initializeCoderTranslatorRegistrars() { + ImmutableList.Builder registrars = ImmutableList.builder(); + for (CoderTranslatorRegistrar coderTranslatorRegistrar : + ServiceLoader.load(CoderTranslatorRegistrar.class)) { + registrars.add(coderTranslatorRegistrar); + } + coderTranslatorRegistrars = registrars.build(); + } + + static boolean isKnownCoder(Coder coder, PipelineOptions options) { + if (coderTranslatorRegistrars == null) { + initializeCoderTranslatorRegistrars(); + } + for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) { + if (registrar.isKnownCoder(coder, options)) { + return true; + } + } + return false; + } + + static CoderTranslator getCoderTranslator(Class coderClass) { + if (coderTranslatorRegistrars == null) { + initializeCoderTranslatorRegistrars(); + } + for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) { + CoderTranslator translator = registrar.getCoderTranslator(coderClass); + if (translator != null) { + return translator; + } + } + return null; + } + + static Class getCoderForUrn(String coderUrn) { + if (coderTranslatorRegistrars == null) { + initializeCoderTranslatorRegistrars(); + } + for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) { + Class coder = registrar.getCoderForUrn(coderUrn); + if (coder != null) { + return coder; + } + } + return null; + } + @VisibleForTesting @Deterministic static Map, CoderTranslator> getKnownTranslators() { @@ -107,7 +158,7 @@ public static RunnerApi.MessageWithComponents toProto(Coder coder) throws IOE public static RunnerApi.Coder toProto(Coder coder, SdkComponents components) throws IOException { - if (getKnownCoderUrns().containsKey(coder.getClass())) { + if (isKnownCoder(coder, components.getPipelineOptions())) { return toKnownCoder(coder, components); } @@ -129,7 +180,10 @@ private static RunnerApi.Coder toUnknownCoderWrapper(UnknownCoderWrapper coder) private static RunnerApi.Coder toKnownCoder(Coder coder, SdkComponents components) throws IOException { - CoderTranslator translator = getKnownTranslators().get(coder.getClass()); + CoderTranslator translator = getCoderTranslator(coder.getClass()); + if (translator == null) { + throw new IOException("Unable to find CoderTranslator for known Coder"); + } List componentIds = registerComponents(coder, translator, components); return RunnerApi.Coder.newBuilder() .addAllComponentCoderIds(componentIds) @@ -186,8 +240,8 @@ private static Coder fromKnownCoder( components.getComponents().getCodersOrThrow(componentId), components, context); coderComponents.add(innerCoder); } - Class coderType = getKnownCoderUrns().inverse().get(coderUrn); - CoderTranslator translator = getKnownTranslators().get(coderType); + Class coderType = getCoderForUrn(coderUrn); + CoderTranslator translator = getCoderTranslator(coderType); if (translator != null) { return translator.fromComponents( coderComponents, coder.getSpec().getPayload().toByteArray(), context); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java index 3d89c4c7ff4a..78f5b61c0f0e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java @@ -28,6 +28,7 @@ * additional payload, which is not currently supported. This exists as a temporary measure. */ public interface CoderTranslator> { + /** Extract all component {@link Coder coders} within a coder. */ List> getComponents(T from); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java index b69d0290de52..44e8c2956aee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java @@ -19,6 +19,8 @@ import java.util.Map; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.checkerframework.checker.nullness.qual.Nullable; /** A registrar of {@link Coder} URNs to the associated {@link CoderTranslator}. */ @SuppressWarnings({ @@ -34,4 +36,18 @@ public interface CoderTranslatorRegistrar { /** Returns a mapping of URN to {@link CoderTranslator}. */ Map, CoderTranslator> getCoderTranslators(); + + /** + * Returns whether the given Coder is known to this CoderTranslatorRegistrar. If the Coder is + * known, then getCoderTranslator() will return a non-null CoderTranslator. + */ + boolean isKnownCoder(Coder coder, PipelineOptions options); + + /** Returns the CoderTranslator to use for this Coder, or null if the Coder is not known. */ + @Nullable + CoderTranslator getCoderTranslator(Class coderClass); + + /** Returns the Coder to use for the given Urn, or null if the Urn is for an unknown Coder. */ + @Nullable + Class getCoderForUrn(String coderUrn); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java index 84a90721a983..a847bf780dff 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import java.io.IOException; import java.util.Collections; import java.util.List; import org.apache.beam.model.pipeline.v1.SchemaApi; @@ -30,12 +31,19 @@ import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -177,6 +185,77 @@ public RowCoder fromComponents( }; } + static CoderTranslator> schema() { + return new CoderTranslator>() { + private static final String TO_ROW_FUNCTION_URN = "beam:torowfn:javasdk:v1"; + private static final String FROM_ROW_FUNCTION_URN = "beam:fromrowfn:javasdk:v1"; + private static final String TYPE_DESCRIPTOR_URN = "beam:typedescriptor:javasdk:v1"; + + @Override + public ImmutableList> getComponents(SchemaCoder from) { + return ImmutableList.of(); + } + + @Override + public byte[] getPayload(SchemaCoder from) { + SchemaApi.SchemaCoderPayload.Builder payload = SchemaApi.SchemaCoderPayload.newBuilder(); + payload.setSchema(SchemaTranslation.schemaToProto(from.getSchema(), true)); + payload + .getToRowFnBuilder() + .setUrn(TO_ROW_FUNCTION_URN) + .setPayload( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(from.getToRowFunction()))); + payload + .getFromRowFnBuilder() + .setUrn(FROM_ROW_FUNCTION_URN) + .setPayload( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(from.getFromRowFunction()))); + payload + .addAdditionalCoderInfosBuilder() + .setUrn(TYPE_DESCRIPTOR_URN) + .setPayload( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(from.getEncodedTypeDescriptor()))); + return payload.build().toByteArray(); + } + + @Override + public SchemaCoder fromComponents( + List> components, byte[] payload, TranslationContext context) { + checkArgument( + components.isEmpty(), "Expected empty component list, but received: %s", components); + try { + SchemaApi.SchemaCoderPayload schemaCoderPayload = + SchemaApi.SchemaCoderPayload.parseFrom(payload); + if (schemaCoderPayload.getAdditionalCoderInfosCount() == 0) { + throw new IllegalArgumentException("Missing serialized typeDescriptor"); + } + TypeDescriptor typeDescriptor = + (TypeDescriptor) + SerializableUtils.deserializeFromByteArray( + schemaCoderPayload.getAdditionalCoderInfos(0).getPayload().toByteArray(), + "typeDescriptor"); + SerializableFunction toRowFunction = + (SerializableFunction) + SerializableUtils.deserializeFromByteArray( + schemaCoderPayload.getToRowFn().getPayload().toByteArray(), "toRowFunction"); + SerializableFunction fromRowFunction = + (SerializableFunction) + SerializableUtils.deserializeFromByteArray( + schemaCoderPayload.getFromRowFn().getPayload().toByteArray(), + "fromRowFunction"); + + Schema schema = SchemaTranslation.schemaFromProto(schemaCoderPayload.getSchema()); + return SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction); + } catch (IOException | IllegalArgumentException e) { + throw new RuntimeException(e); + } + } + }; + } + static CoderTranslator> shardedKey() { return new SimpleStructuredCoderTranslator>() { @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java index 5b0d5aedd619..4945fa361e2c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java @@ -34,6 +34,9 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; import org.apache.beam.sdk.util.ShardedKey; @@ -71,6 +74,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { ModelCoders.PARAM_WINDOWED_VALUE_CODER_URN) .put(DoubleCoder.class, ModelCoders.DOUBLE_CODER_URN) .put(RowCoder.class, ModelCoders.ROW_CODER_URN) + .put(SchemaCoder.class, ModelCoders.SCHEMA_CODER_URN) .put(ShardedKey.Coder.class, ModelCoders.SHARDED_KEY_CODER_URN) .put(TimestampPrefixingWindowCoder.class, ModelCoders.CUSTOM_WINDOW_CODER_URN) .put(NullableCoder.class, ModelCoders.NULLABLE_CODER_URN) @@ -96,6 +100,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { CoderTranslators.paramWindowedValue()) .put(DoubleCoder.class, CoderTranslators.atomic(DoubleCoder.class)) .put(RowCoder.class, CoderTranslators.row()) + .put(SchemaCoder.class, CoderTranslators.schema()) .put(ShardedKey.Coder.class, CoderTranslators.shardedKey()) .put(TimestampPrefixingWindowCoder.class, CoderTranslators.timestampPrefixingWindow()) .put(NullableCoder.class, CoderTranslators.nullable()) @@ -123,10 +128,6 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { Coder.class.getSimpleName()); } - public static boolean isKnownCoder(Coder coder) { - return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass()); - } - @Override public Map, String> getCoderURNs() { return BEAM_MODEL_CODER_URNS; @@ -136,4 +137,23 @@ public Map, String> getCoderURNs() { public Map, CoderTranslator> getCoderTranslators() { return BEAM_MODEL_CODERS; } + + @Override + public boolean isKnownCoder(Coder coder, PipelineOptions options) { + if (coder.getClass() == SchemaCoder.class + && StreamingOptions.updateCompatibilityVersionLessThan(options, "2.73")) { + return false; + } + return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass()); + } + + @Override + public CoderTranslator getCoderTranslator(Class coderClass) { + return BEAM_MODEL_CODERS.getOrDefault(coderClass, null); + } + + @Override + public Class getCoderForUrn(String coderUrn) { + return BEAM_MODEL_CODER_URNS.inverse().getOrDefault(coderUrn, null); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java index 7b7546aceb61..5059cc1c6b83 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java @@ -61,6 +61,7 @@ private ModelCoders() {} getUrn(StandardCoders.Enum.PARAM_WINDOWED_VALUE); public static final String ROW_CODER_URN = getUrn(StandardCoders.Enum.ROW); + public static final String SCHEMA_CODER_URN = getUrn(StandardCoders.Enum.SCHEMA); public static final String STATE_BACKED_ITERABLE_CODER_URN = "beam:coder:state_backed_iterable:v1"; @@ -90,6 +91,7 @@ private ModelCoders() {} WINDOWED_VALUE_CODER_URN, DOUBLE_CODER_URN, ROW_CODER_URN, + SCHEMA_CODER_URN, PARAM_WINDOWED_VALUE_CODER_URN, STATE_BACKED_ITERABLE_CODER_URN, SHARDED_KEY_CODER_URN, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java index f79696214368..64c7898a37b9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java @@ -189,6 +189,7 @@ public SdkComponents getSdkComponents(Collection requirements) { windowingStrategies.asMap(), coders.asMap(), Collections.emptyMap(), - requirements); + requirements, + pipeline.getOptions()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java index 446697f24a81..58d10a712c40 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java @@ -63,6 +63,7 @@ public class SdkComponents { private final BiMap environmentIds = HashBiMap.create(); private final BiMap coderProtoToId = HashBiMap.create(); private final Set requirements; + private PipelineOptions pipelineOptions; private final Set reservedIds = new HashSet<>(); @@ -71,17 +72,7 @@ public class SdkComponents { /** Create a new {@link SdkComponents} with no components. */ public static SdkComponents create() { - return new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, ""); - } - - /** - * Create new {@link SdkComponents} importing all items from provided {@link Components} object. - * - *

WARNING: This action might cause some of duplicate items created. - */ - public static SdkComponents create( - RunnerApi.Components components, Collection requirements) { - return new SdkComponents(components, requirements, ""); + return new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "", null); } /*package*/ static SdkComponents create( @@ -91,31 +82,36 @@ public static SdkComponents create( Map> windowingStrategies, Map> coders, Map environments, - Collection requirements) { - SdkComponents sdkComponents = SdkComponents.create(components, requirements); + Collection requirements, + PipelineOptions pipelineOptions) { + SdkComponents sdkComponents = new SdkComponents(components, requirements, "", pipelineOptions); sdkComponents.transformIds.inverse().putAll(transforms); sdkComponents.pCollectionIds.inverse().putAll(pCollections); sdkComponents.windowingStrategyIds.inverse().putAll(windowingStrategies); sdkComponents.coderIds.inverse().putAll(coders); sdkComponents.environmentIds.inverse().putAll(environments); + sdkComponents.pipelineOptions = pipelineOptions; return sdkComponents; } public static SdkComponents create(PipelineOptions options) { SdkComponents sdkComponents = - new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, ""); + new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "", options); PortablePipelineOptions portablePipelineOptions = options.as(PortablePipelineOptions.class); sdkComponents.registerEnvironment( Environments.createOrGetDefaultEnvironment(portablePipelineOptions)); + sdkComponents.pipelineOptions = options; return sdkComponents; } private SdkComponents( @Nullable Components components, @Nullable Collection requirements, - String newIdPrefix) { + String newIdPrefix, + @Nullable PipelineOptions pipelineOptions) { this.newIdPrefix = newIdPrefix; this.requirements = new HashSet<>(); + this.pipelineOptions = pipelineOptions; if (components == null) { if (requirements != null) { @@ -153,7 +149,7 @@ public void mergeFrom( */ public SdkComponents withNewIdPrefix(String newIdPrefix) { SdkComponents sdkComponents = - new SdkComponents(componentsBuilder.build(), requirements, newIdPrefix); + new SdkComponents(componentsBuilder.build(), requirements, newIdPrefix, pipelineOptions); sdkComponents.transformIds.putAll(transformIds); sdkComponents.pCollectionIds.putAll(pCollectionIds); sdkComponents.windowingStrategyIds.putAll(windowingStrategyIds); @@ -174,7 +170,7 @@ public String registerPTransform( throws IOException { String name = getApplicationName(appliedPTransform); // If this transform is present in the components, nothing to do. return the existing name. - // Otherwise the transform must be translated and added to the components. + // Otherwise, the transform must be translated and added to the components. if (componentsBuilder.getTransformsOrDefault(name, null) != null) { return name; } @@ -375,4 +371,8 @@ public RunnerApi.Components toComponents() { public Collection requirements() { return ImmutableSet.copyOf(requirements); } + + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java index b8f92ff0053e..1ec0a74f5be1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.not; +import com.google.auto.value.AutoValue; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -45,14 +46,20 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -70,6 +77,34 @@ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) public class CoderTranslationTest { + @AutoValue + @DefaultSchema(AutoValueSchema.class) + public abstract static class SimpleAutoValue { + public abstract String getString(); + + public abstract int getInt32(); + + public abstract long getInt64(); + + public static SimpleAutoValue of(String string, Integer int32, Long int64) { + return new AutoValue_CoderTranslationTest_SimpleAutoValue(string, int32, int64); + } + } + + private static final SchemaRegistry REGISTRY = SchemaRegistry.createDefault(); + + private static SchemaCoder schemaCoderFrom(TypeDescriptor typeDescriptor) { + try { + return SchemaCoder.of( + REGISTRY.getSchema(typeDescriptor), + typeDescriptor, + REGISTRY.getToRowFunction(typeDescriptor), + REGISTRY.getFromRowFunction(typeDescriptor)); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + private static final Set> KNOWN_CODERS = ImmutableSet.>builder() .add(ByteArrayCoder.of()) @@ -94,6 +129,7 @@ public class CoderTranslationTest { Field.of("array", FieldType.array(FieldType.STRING)), Field.of("map", FieldType.map(FieldType.STRING, FieldType.INT32)), Field.of("bar", FieldType.logicalType(FixedBytes.of(123)))))) + .add(schemaCoderFrom(TypeDescriptor.of(SimpleAutoValue.class))) .add(ShardedKey.Coder.of(StringUtf8Coder.of())) .add(TimestampPrefixingWindowCoder.of(IntervalWindowCoder.of())) .add(NullableCoder.of(ByteArrayCoder.of())) @@ -127,7 +163,7 @@ public void validateKnownCoders() { } @Test - public void validateCoderTranslators() { + public void validateModelCoderTranslators() { assertThat( "Every Model Coder must have a Translator", new ModelCoderRegistrar().getCoderURNs().keySet(), diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index c3c3ccfd3266..9c4c0dfa4113 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -602,7 +602,7 @@ private Map loadRegisteredTransforms() { pipeline.getOptions().as(ExperimentalOptions.class), "use_sdf_read"); } else { LOG.warn( - "Using use_depreacted_read in portable runners is runner-dependent. The " + "Using use_deprecated_read in portable runners is runner-dependent. The " + "ExpansionService will respect that, but if your runner does not have support for " + "native Read transform, your Pipeline will fail during Pipeline submission."); } diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java index 14ab48f66699..8bd18fd8e250 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java @@ -21,9 +21,11 @@ import java.util.Map; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.construction.CoderTranslator; import org.apache.beam.sdk.util.construction.CoderTranslatorRegistrar; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; /** Coder registrar for AvroGenericCoder. */ @AutoService(CoderTranslatorRegistrar.class) @@ -42,4 +44,20 @@ public Map, String> getCoderURNs() { public Map, CoderTranslator> getCoderTranslators() { return ImmutableMap.of(AvroGenericCoder.class, new AvroGenericCoderTranslator()); } + + @Override + public boolean isKnownCoder(Coder coder, PipelineOptions options) { + return coder.getClass() == AvroGenericCoder.class; + } + + @Override + public @Nullable CoderTranslator getCoderTranslator( + Class coderClass) { + return coderClass == AvroGenericCoder.class ? new AvroGenericCoderTranslator() : null; + } + + @Override + public @Nullable Class getCoderForUrn(String coderUrn) { + return AVRO_CODER_URN.equals(coderUrn) ? AvroGenericCoder.class : null; + } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java index ef8d69bc1ec3..42a6f8d11c2a 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.coders.IterableLikeCoder; import org.apache.beam.sdk.fn.stream.PrefetchableIterable; import org.apache.beam.sdk.fn.stream.PrefetchableIterators; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.BufferedElementCountingOutputStream; import org.apache.beam.sdk.util.VarInt; import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable; @@ -52,6 +53,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -300,6 +302,26 @@ public Map, String> getCoderUR getCoderTranslators() { return ImmutableMap.of(StateBackedIterable.Coder.class, new Translator()); } + + @Override + public boolean isKnownCoder( + org.apache.beam.sdk.coders.Coder coder, PipelineOptions options) { + return coder.getClass() == StateBackedIterable.Coder.class; + } + + @Override + public @Nullable CoderTranslator getCoderTranslator( + Class coderClass) { + return coderClass == StateBackedIterable.Coder.class ? new Translator() : null; + } + + @Override + public @Nullable Class getCoderForUrn( + String coderUrn) { + return STATE_BACKED_ITERABLE_CODER_URN.equals(coderUrn) + ? StateBackedIterable.Coder.class + : null; + } } /**