diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java index 94288e57a41e..49f81f221a6a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java @@ -29,8 +29,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.ReadPayload; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.Read.Unbounded; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -43,21 +41,24 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; /** - * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded} {@link PTransform - * PTransformTranslation} into {@link ReadPayload} protos. + * Methods for translating {@link SplittableParDo.PrimitiveBoundedRead} and {@link + * SplittableParDo.PrimitiveUnboundedRead} {@link PTransform PTransformTranslation} into {@link + * ReadPayload} protos. */ public class ReadTranslation { private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "beam:java:boundedsource:v1"; private static final String JAVA_SERIALIZED_UNBOUNDED_SOURCE = "beam:java:unboundedsource:v1"; - public static ReadPayload toProto(Read.Bounded read, SdkComponents components) { + public static ReadPayload toProto( + SplittableParDo.PrimitiveBoundedRead read, SdkComponents components) { return ReadPayload.newBuilder() .setIsBounded(IsBounded.Enum.BOUNDED) .setSource(toProto(read.getSource(), components)) .build(); } - public static ReadPayload toProto(Unbounded read, SdkComponents components) { + public static ReadPayload toProto( + SplittableParDo.PrimitiveUnboundedRead read, SdkComponents components) { return ReadPayload.newBuilder() .setIsBounded(IsBounded.Enum.UNBOUNDED) .setSource(toProto(read.getSource(), components)) @@ -141,9 +142,10 @@ public static PCollection.IsBounded sourceIsBounded(AppliedPTransform t } } - /** A {@link TransformPayloadTranslator} for {@link Read.Unbounded}. */ + /** A {@link TransformPayloadTranslator} for {@link SplittableParDo.PrimitiveUnboundedRead}. */ public static class UnboundedReadPayloadTranslator - implements PTransformTranslation.TransformPayloadTranslator> { + implements PTransformTranslation.TransformPayloadTranslator< + SplittableParDo.PrimitiveUnboundedRead> { public static TransformPayloadTranslator create() { return new UnboundedReadPayloadTranslator(); } @@ -151,13 +153,14 @@ public static TransformPayloadTranslator create() { private UnboundedReadPayloadTranslator() {} @Override - public String getUrn(Read.Unbounded transform) { + public String getUrn(SplittableParDo.PrimitiveUnboundedRead transform) { return PTransformTranslation.READ_TRANSFORM_URN; } @Override public FunctionSpec translate( - AppliedPTransform> transform, SdkComponents components) { + AppliedPTransform> transform, + SdkComponents components) { ReadPayload payload = toProto(transform.getTransform(), components); return RunnerApi.FunctionSpec.newBuilder() .setUrn(getUrn(transform.getTransform())) @@ -166,9 +169,10 @@ public FunctionSpec translate( } } - /** A {@link TransformPayloadTranslator} for {@link Read.Bounded}. */ + /** A {@link TransformPayloadTranslator} for {@link SplittableParDo.PrimitiveBoundedRead}. */ public static class BoundedReadPayloadTranslator - implements PTransformTranslation.TransformPayloadTranslator> { + implements PTransformTranslation.TransformPayloadTranslator< + SplittableParDo.PrimitiveBoundedRead> { public static TransformPayloadTranslator create() { return new BoundedReadPayloadTranslator(); } @@ -176,13 +180,14 @@ public static TransformPayloadTranslator create() { private BoundedReadPayloadTranslator() {} @Override - public String getUrn(Read.Bounded transform) { + public String getUrn(SplittableParDo.PrimitiveBoundedRead transform) { return PTransformTranslation.READ_TRANSFORM_URN; } @Override public FunctionSpec translate( - AppliedPTransform> transform, SdkComponents components) { + AppliedPTransform> transform, + SdkComponents components) { ReadPayload payload = toProto(transform.getTransform(), components); return RunnerApi.FunctionSpec.newBuilder() .setUrn(getUrn(transform.getTransform())) @@ -198,8 +203,8 @@ public static class Registrar implements TransformPayloadTranslatorRegistrar { public Map, ? extends TransformPayloadTranslator> getTransformPayloadTranslators() { return ImmutableMap., TransformPayloadTranslator>builder() - .put(Read.Unbounded.class, new UnboundedReadPayloadTranslator()) - .put(Read.Bounded.class, new BoundedReadPayloadTranslator()) + .put(SplittableParDo.PrimitiveUnboundedRead.class, new UnboundedReadPayloadTranslator()) + .put(SplittableParDo.PrimitiveBoundedRead.class, new BoundedReadPayloadTranslator()) .build(); } } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index acece8859ef9..a3e8b1218287 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -21,7 +21,6 @@ import com.google.auto.service.AutoService; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -37,15 +36,18 @@ import org.apache.beam.runners.core.construction.ReadTranslation.BoundedReadPayloadTranslator; import org.apache.beam.runners.core.construction.ReadTranslation.UnboundedReadPayloadTranslator; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; -import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.PTransform; @@ -53,6 +55,7 @@ import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.ArgumentProvider; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.BaseArgumentProvider; @@ -62,7 +65,9 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.NameUtils; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; @@ -71,6 +76,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.checkerframework.checker.nullness.qual.Nullable; @@ -638,43 +644,161 @@ public void tearDown() { } /** - * Throws an {@link IllegalArgumentException} if the pipeline contains any primitive read - * transforms that have not been expanded to be executed as {@link DoFn splittable DoFns} as long - * as the experiment {@code use_deprecated_read} is not specified. + * Converts {@link Read} based Splittable DoFn expansions to primitive reads implemented by {@link + * PrimitiveBoundedRead} and {@link PrimitiveUnboundedRead} if either the experiment {@code + * use_deprecated_read} or {@code beam_fn_api_use_deprecated_read} are specified. + * + *

TODO(BEAM-10670): Remove the primitive Read and make the splittable DoFn the only option. + */ + public static void convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(Pipeline pipeline) { + if (ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api_use_deprecated_read") + || ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) { + convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline); + } + } + + /** + * Converts {@link Read} based Splittable DoFn expansions to primitive reads implemented by {@link + * PrimitiveBoundedRead} and {@link PrimitiveUnboundedRead}. + * + *

TODO(BEAM-10670): Remove the primitive Read and make the splittable DoFn the only option. + */ + public static void convertReadBasedSplittableDoFnsToPrimitiveReads(Pipeline pipeline) { + pipeline.replaceAll( + ImmutableList.of(PRIMITIVE_BOUNDED_READ_OVERRIDE, PRIMITIVE_UNBOUNDED_READ_OVERRIDE)); + } + + /** + * A transform override for {@link Read.Bounded} that converts it to a {@link + * PrimitiveBoundedRead}. + */ + public static final PTransformOverride PRIMITIVE_BOUNDED_READ_OVERRIDE = + PTransformOverride.of( + PTransformMatchers.classEqualTo(Read.Bounded.class), new BoundedReadOverrideFactory<>()); + /** + * A transform override for {@link Read.Unbounded} that converts it to a {@link + * PrimitiveUnboundedRead}. */ - public static void validateNoPrimitiveReads(Pipeline pipeline) { - // TODO(BEAM-10670): Remove the deprecated Read and make the splittable DoFn the only option. - if (!(ExperimentalOptions.hasExperiment( - pipeline.getOptions(), "beam_fn_api_use_deprecated_read") - || ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read"))) { + public static final PTransformOverride PRIMITIVE_UNBOUNDED_READ_OVERRIDE = + PTransformOverride.of( + PTransformMatchers.classEqualTo(Read.Unbounded.class), + new UnboundedReadOverrideFactory<>()); - pipeline.traverseTopologically(new ValidateNoPrimitiveReads()); + private static class BoundedReadOverrideFactory + implements PTransformOverrideFactory, Read.Bounded> { + @Override + public PTransformReplacement> getReplacementTransform( + AppliedPTransform, Read.Bounded> transform) { + return PTransformReplacement.of( + transform.getPipeline().begin(), new PrimitiveBoundedRead<>(transform.getTransform())); + } + + @Override + public Map mapOutputs( + Map, PValue> outputs, PCollection newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } + } + + private static class UnboundedReadOverrideFactory + implements PTransformOverrideFactory, Read.Unbounded> { + @Override + public PTransformReplacement> getReplacementTransform( + AppliedPTransform, Read.Unbounded> transform) { + return PTransformReplacement.of( + transform.getPipeline().begin(), new PrimitiveUnboundedRead<>(transform.getTransform())); + } + + @Override + public Map mapOutputs( + Map, PValue> outputs, PCollection newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); } } /** - * A {@link org.apache.beam.sdk.Pipeline.PipelineVisitor} that ensures that the pipeline does not - * contain any primitive reads. + * Base class that ensures the overridden transform has the same contract as if interacting with + * the original {@link Read.Bounded Read.Bounded}/{@link Read.Unbounded Read.Unbounded} + * implementations. */ - private static class ValidateNoPrimitiveReads extends PipelineVisitor.Defaults { - public final List> foundPrimitiveReads = new ArrayList<>(); + private abstract static class PrimitiveRead extends PTransform> { + private final PTransform> originalTransform; + protected final Object source; + + public PrimitiveRead(PTransform> originalTransform, Object source) { + this.originalTransform = originalTransform; + this.source = source; + } @Override - public void visitPrimitiveTransform(Node node) { - if (node.getTransform() instanceof Read.Bounded - || node.getTransform() instanceof Read.Unbounded) { - foundPrimitiveReads.add(node.getTransform()); - } + public void validate(@Nullable PipelineOptions options) { + originalTransform.validate(options); } @Override - public void leavePipeline(Pipeline pipeline) { - if (!foundPrimitiveReads.isEmpty()) { - throw new IllegalArgumentException( - String.format( - "Found primitive read transforms %s within the pipeline when only Splittable DoFns were expected. If you would like to use the deprecated behavior, please specify the experiment 'use_deprecated_read'. For example '--experiements=use_deprecated_read' on the command line.", - foundPrimitiveReads)); - } + public Map, PValue> getAdditionalInputs() { + return originalTransform.getAdditionalInputs(); + } + + @Override + public Coder getDefaultOutputCoder(PBegin input, PCollection output) + throws CannotProvideCoderException { + return originalTransform.getDefaultOutputCoder(input, output); + } + + @Override + public String getName() { + return originalTransform.getName(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + originalTransform.populateDisplayData(builder); + } + + @Override + protected String getKindString() { + return String.format("Read(%s)", NameUtils.approximateSimpleName(source)); + } + } + + /** The original primitive based {@link Read.Bounded Read.Bounded} expansion. */ + public static class PrimitiveBoundedRead extends PrimitiveRead { + public PrimitiveBoundedRead(Read.Bounded originalTransform) { + super(originalTransform, originalTransform.getSource()); + } + + @Override + public PCollection expand(PBegin input) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + PCollection.IsBounded.BOUNDED, + getSource().getOutputCoder()); + } + + public BoundedSource getSource() { + return (BoundedSource) source; + } + } + + /** The original primitive based {@link Read.Unbounded Read.Unbounded} expansion. */ + public static class PrimitiveUnboundedRead extends PrimitiveRead { + public PrimitiveUnboundedRead(Read.Unbounded originalTransform) { + super(originalTransform, originalTransform.getSource()); + } + + @Override + public PCollection expand(PBegin input) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + PCollection.IsBounded.UNBOUNDED, + getSource().getOutputCoder()); + } + + public UnboundedSource getSource() { + return (UnboundedSource) source; } } } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java index 63264a6897f3..52b9868c50f5 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java @@ -34,6 +34,8 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -45,6 +47,10 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -111,18 +117,21 @@ public void process(ProcessContext c) { @Test public void testProtoDirectly() { final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, false); - pipeline.traverseTopologically(new PipelineProtoVerificationVisitor(pipelineProto, false)); + pipeline.traverseTopologically( + new PipelineProtoVerificationVisitor(pipelineProto, pipeline.getCoderRegistry(), false)); } @Test public void testProtoDirectlyWithViewTransform() { final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, true); - pipeline.traverseTopologically(new PipelineProtoVerificationVisitor(pipelineProto, true)); + pipeline.traverseTopologically( + new PipelineProtoVerificationVisitor(pipelineProto, pipeline.getCoderRegistry(), true)); } private static class PipelineProtoVerificationVisitor extends PipelineVisitor.Defaults { private final RunnerApi.Pipeline pipelineProto; + private final CoderRegistry coderRegistry; private boolean useDeprecatedViewTransforms; Set transforms; Set> pcollections; @@ -131,8 +140,11 @@ private static class PipelineProtoVerificationVisitor extends PipelineVisitor.De int missingViewTransforms = 0; public PipelineProtoVerificationVisitor( - RunnerApi.Pipeline pipelineProto, boolean useDeprecatedViewTransforms) { + RunnerApi.Pipeline pipelineProto, + CoderRegistry coderRegistry, + boolean useDeprecatedViewTransforms) { this.pipelineProto = pipelineProto; + this.coderRegistry = coderRegistry; this.useDeprecatedViewTransforms = useDeprecatedViewTransforms; transforms = new HashSet<>(); pcollections = new HashSet<>(); @@ -182,6 +194,28 @@ public void visitPrimitiveTransform(Node node) { PTransformTranslation.urnForTransformOrNull(node.getTransform()))) { missingViewTransforms += 1; } + if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals( + PTransformTranslation.urnForTransformOrNull(node.getTransform()))) { + final DoFn doFn; + if (node.getTransform() instanceof ParDo.SingleOutput) { + doFn = ((ParDo.SingleOutput) node.getTransform()).getFn(); + } else if (node.getTransform() instanceof ParDo.MultiOutput) { + doFn = ((ParDo.MultiOutput) node.getTransform()).getFn(); + } else { + throw new IllegalStateException( + "Unexpected type of ParDo " + node.getTransform().getClass()); + } + final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); + final String restrictionCoderId; + if (signature.processElement().isSplittable()) { + DoFnInvoker doFnInvoker = DoFnInvokers.invokerFor(doFn); + final Coder restrictionAndWatermarkStateCoder = + KvCoder.of( + doFnInvoker.invokeGetRestrictionCoder(coderRegistry), + doFnInvoker.invokeGetWatermarkEstimatorStateCoder(coderRegistry)); + addCoders(restrictionAndWatermarkStateCoder); + } + } } @Override diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java index afb8def688a5..8309f766dfe2 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java @@ -69,7 +69,8 @@ public void testToFromProtoBounded() throws Exception { // TODO: Split into two tests. assumeThat(source, instanceOf(BoundedSource.class)); BoundedSource boundedSource = (BoundedSource) this.source; - Read.Bounded boundedRead = Read.from(boundedSource); + SplittableParDo.PrimitiveBoundedRead boundedRead = + new SplittableParDo.PrimitiveBoundedRead<>(Read.from(boundedSource)); SdkComponents components = SdkComponents.create(); components.registerEnvironment(Environments.createDockerEnvironment("java")); ReadPayload payload = ReadTranslation.toProto(boundedRead, components); @@ -82,7 +83,8 @@ public void testToFromProtoBounded() throws Exception { public void testToFromProtoUnbounded() throws Exception { assumeThat(source, instanceOf(UnboundedSource.class)); UnboundedSource unboundedSource = (UnboundedSource) this.source; - Read.Unbounded unboundedRead = Read.from(unboundedSource); + SplittableParDo.PrimitiveUnboundedRead unboundedRead = + new SplittableParDo.PrimitiveUnboundedRead<>(Read.from(unboundedSource)); SdkComponents components = SdkComponents.create(); // No environment set for unbounded sources ReadPayload payload = ReadTranslation.toProto(unboundedRead, components); diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java index 65715ac8cc51..3e75f1a68382 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java @@ -18,15 +18,20 @@ package org.apache.beam.runners.core.construction; import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.io.Serializable; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource; @@ -35,6 +40,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -205,47 +211,55 @@ public Coder getOutputCoder() { } @Test - public void testValidateNoPrimitiveReadsIsSkippedWhenUsingDeprecatedRead() { + public void testConvertIsSkippedWhenUsingDeprecatedRead() { + Pipeline sdfRead = Pipeline.create(); + sdfRead.apply(Read.from(new FakeBoundedSource())); + sdfRead.apply(Read.from(new BoundedToUnboundedSourceAdapter<>(new FakeBoundedSource()))); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(sdfRead); + pipeline.traverseTopologically( + new Defaults() { + @Override + public void visitPrimitiveTransform(Node node) { + assertThat( + node.getTransform(), not(instanceOf(SplittableParDo.PrimitiveBoundedRead.class))); + assertThat( + node.getTransform(), not(instanceOf(SplittableParDo.PrimitiveUnboundedRead.class))); + } + }); + } + + @Test + public void testConvertToPrimitiveReadsHappen() { PipelineOptions deprecatedReadOptions = PipelineOptionsFactory.create(); deprecatedReadOptions.setRunner(CrashingRunner.class); ExperimentalOptions.addExperiment( deprecatedReadOptions.as(ExperimentalOptions.class), "use_deprecated_read"); - Pipeline deprecatedReadAllowed = Pipeline.create(deprecatedReadOptions); - deprecatedReadAllowed.apply(Read.from(new FakeBoundedSource())); - deprecatedReadAllowed.apply( - Read.from(new BoundedToUnboundedSourceAdapter<>(new FakeBoundedSource()))); - // We expect that the experiment will skip validation. - SplittableParDo.validateNoPrimitiveReads(deprecatedReadAllowed); - } - - @Test - public void testValidateNoPrimitiveReadsWhenThereAreNone() { - PipelineOptions sdfOptions = PipelineOptionsFactory.create(); - sdfOptions.setRunner(CrashingRunner.class); - ExperimentalOptions.addExperiment(sdfOptions.as(ExperimentalOptions.class), "beam_fn_api"); - Pipeline sdf = Pipeline.create(sdfOptions); - sdf.apply(Read.from(new FakeBoundedSource())); - sdf.apply(Read.from(new BoundedToUnboundedSourceAdapter<>(new FakeBoundedSource()))); - // We expect that the experiment will have caused the transform to use SDF wrappers during - // execution. - SplittableParDo.validateNoPrimitiveReads(sdf); - } - - @Test - public void testValidateNoPrimitiveReadsFindsPrimitiveReads() { - PipelineOptions noSdfOptions = PipelineOptionsFactory.create(); - noSdfOptions.setRunner(CrashingRunner.class); - Pipeline boundedRead = Pipeline.create(noSdfOptions); - boundedRead.apply(Read.from(new FakeBoundedSource())); - assertThrows( - IllegalArgumentException.class, - () -> SplittableParDo.validateNoPrimitiveReads(boundedRead)); - - Pipeline unboundedRead = Pipeline.create(noSdfOptions); - unboundedRead.apply(Read.from(new BoundedToUnboundedSourceAdapter<>(new FakeBoundedSource()))); - assertThrows( - IllegalArgumentException.class, - () -> SplittableParDo.validateNoPrimitiveReads(unboundedRead)); + Pipeline pipeline = Pipeline.create(deprecatedReadOptions); + pipeline.apply(Read.from(new FakeBoundedSource())); + pipeline.apply(Read.from(new BoundedToUnboundedSourceAdapter<>(new FakeBoundedSource()))); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); + AtomicBoolean sawPrimitiveBoundedRead = new AtomicBoolean(); + AtomicBoolean sawPrimitiveUnboundedRead = new AtomicBoolean(); + pipeline.traverseTopologically( + new Defaults() { + @Override + public CompositeBehavior enterCompositeTransform(Node node) { + assertThat(node.getTransform(), not(instanceOf(Read.Bounded.class))); + assertThat(node.getTransform(), not(instanceOf(Read.Unbounded.class))); + return super.enterCompositeTransform(node); + } + + @Override + public void visitPrimitiveTransform(Node node) { + if (node.getTransform() instanceof SplittableParDo.PrimitiveBoundedRead) { + sawPrimitiveBoundedRead.set(true); + } else if (node.getTransform() instanceof SplittableParDo.PrimitiveUnboundedRead) { + sawPrimitiveUnboundedRead.set(true); + } + } + }); + assertTrue(sawPrimitiveBoundedRead.get()); + assertTrue(sawPrimitiveUnboundedRead.get()); } } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java index ff797ef45d9d..6de249b39fc0 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -175,9 +176,9 @@ public void rootTransforms() { rootTransform.getTransform().getInputsCount(), equalTo(0)); assertThat( - "Only added source reads to the pipeline", + "Only added impulse transforms to the pipeline", rootTransform.getTransform().getSpec().getUrn(), - equalTo(PTransformTranslation.READ_TRANSFORM_URN)); + equalTo(PTransformTranslation.IMPULSE_TRANSFORM_URN)); } } @@ -189,10 +190,10 @@ public void rootTransforms() { @Test public void transformWithSideAndMainInputs() { Pipeline p = Pipeline.create(); - PCollection longs = p.apply("BoundedRead", Read.from(CountingSource.upTo(100L))); + PCollection impulse = p.apply("Impulse", Impulse.create()); PCollectionView view = p.apply("Create", Create.of("foo")).apply("View", View.asSingleton()); - longs.apply( + impulse.apply( "par_do", ParDo.of(new TestFn()) .withSideInputs(view) @@ -203,7 +204,7 @@ public void transformWithSideAndMainInputs() { String mainInputName = getOnlyElement( - PipelineNode.pTransform("BoundedRead", components.getTransformsOrThrow("BoundedRead")) + PipelineNode.pTransform("Impulse", components.getTransformsOrThrow("Impulse")) .getTransform() .getOutputsMap() .values()); @@ -313,17 +314,19 @@ public void getProducer() { Components components = PipelineTranslation.toProto(p).getComponents(); QueryablePipeline qp = QueryablePipeline.forPrimitivesIn(components); - String longsOutputName = + String impulseOutputName = getOnlyElement( - PipelineNode.pTransform("BoundedRead", components.getTransformsOrThrow("BoundedRead")) + PipelineNode.pTransform( + "BoundedRead/Impulse", components.getTransformsOrThrow("BoundedRead/Impulse")) .getTransform() .getOutputsMap() .values()); - PTransformNode longsProducer = - PipelineNode.pTransform("BoundedRead", components.getTransformsOrThrow("BoundedRead")); - PCollectionNode longsOutput = + PTransformNode impulseProducer = + PipelineNode.pTransform( + "BoundedRead/Impulse", components.getTransformsOrThrow("BoundedRead/Impulse")); + PCollectionNode impulseOutput = PipelineNode.pCollection( - longsOutputName, components.getPcollectionsOrThrow(longsOutputName)); + impulseOutputName, components.getPcollectionsOrThrow(impulseOutputName)); String flattenOutputName = getOnlyElement( PipelineNode.pTransform("flatten", components.getTransformsOrThrow("flatten")) @@ -336,35 +339,37 @@ public void getProducer() { PipelineNode.pCollection( flattenOutputName, components.getPcollectionsOrThrow(flattenOutputName)); - assertThat(qp.getProducer(longsOutput), equalTo(longsProducer)); + assertThat(qp.getProducer(impulseOutput), equalTo(impulseProducer)); assertThat(qp.getProducer(flattenOutput), equalTo(flattenProducer)); } @Test public void getEnvironmentWithEnvironment() { Pipeline p = Pipeline.create(); - PCollection longs = p.apply("BoundedRead", Read.from(CountingSource.upTo(100L))); + PCollection longs = + p.apply("Impulse", Impulse.create()).apply("ParDo", ParDo.of(new TestFn())); longs.apply(WithKeys.of("a")).apply("groupByKey", GroupByKey.create()); Components components = PipelineTranslation.toProto(p).getComponents(); QueryablePipeline qp = QueryablePipeline.forPrimitivesIn(components); - PTransformNode environmentalRead = - PipelineNode.pTransform("BoundedRead", components.getTransformsOrThrow("BoundedRead")); + PTransformNode environmentalTransform = + PipelineNode.pTransform( + "ParDo/ParMultiDo(Test)", components.getTransformsOrThrow("ParDo/ParMultiDo(Test)")); PTransformNode nonEnvironmentalTransform = PipelineNode.pTransform("groupByKey", components.getTransformsOrThrow("groupByKey")); - assertThat(qp.getEnvironment(environmentalRead).isPresent(), is(true)); + assertThat(qp.getEnvironment(environmentalTransform).isPresent(), is(true)); assertThat( - qp.getEnvironment(environmentalRead).get().getUrn(), + qp.getEnvironment(environmentalTransform).get().getUrn(), equalTo(Environments.JAVA_SDK_HARNESS_ENVIRONMENT.getUrn())); assertThat( - qp.getEnvironment(environmentalRead).get().getPayload(), + qp.getEnvironment(environmentalTransform).get().getPayload(), equalTo(Environments.JAVA_SDK_HARNESS_ENVIRONMENT.getPayload())); assertThat(qp.getEnvironment(nonEnvironmentalTransform).isPresent(), is(false)); } - private static class TestFn extends DoFn { + private static class TestFn extends DoFn { @ProcessElement public void process(ProcessContext ctxt) {} } @@ -372,7 +377,7 @@ public void process(ProcessContext ctxt) {} @Test public void retainOnlyPrimitivesWithOnlyPrimitivesUnchanged() { Pipeline p = Pipeline.create(); - p.apply("Read", Read.from(CountingSource.unbounded())) + p.apply("Impulse", Impulse.create()) .apply( "multi-do", ParDo.of(new TestFn()).withOutputTags(new TupleTag<>(), TupleTagList.empty())); @@ -392,9 +397,9 @@ public void retainOnlyPrimitivesComposites() { @Override public PCollection expand(PBegin input) { return input - .apply(GenerateSequence.from(2L)) + .apply(Impulse.create()) .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5L)))) - .apply(MapElements.into(TypeDescriptors.longs()).via(l -> l + 1)); + .apply(MapElements.into(TypeDescriptors.longs()).via(l -> 1L)); } }); diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRendererTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRendererTest.java index 50a574a92d01..eca57b9e8b19 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRendererTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRendererTest.java @@ -64,29 +64,42 @@ public void testCompositePipeline() { + " label = \"Create.TimestampedValues\"" + " subgraph cluster_2 {" + " label = \"Create.TimestampedValues/Create.Values\"" - + " 3 [label=\"Read(CreateSource)\"]" + + " subgraph cluster_3 {" + + " label = \"Create.TimestampedValues/Create.Values/Read(CreateSource)\"" + + " 4 [label=\"Impulse\"]" + + " subgraph cluster_5 {" + + " label = \"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(OutputSingleSource)\"" + + " 6 [label=\"ParMultiDo(OutputSingleSource)\"]" + + " 4 -> 6 [style=solid label=\"\"]" + + " }" + + " subgraph cluster_7 {" + + " label = \"Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)\"" + + " 8 [label=\"ParMultiDo(BoundedSourceAsSDFWrapper)\"]" + + " 6 -> 8 [style=solid label=\"\"]" + + " }" + + " }" + " }" - + " subgraph cluster_4 {" + + " subgraph cluster_9 {" + " label = \"Create.TimestampedValues/ParDo(ConvertTimestamps)\"" - + " 5 [label=\"ParMultiDo(ConvertTimestamps)\"]" - + " 3 -> 5 [style=solid label=\"\"]" + + " 10 [label=\"ParMultiDo(ConvertTimestamps)\"]" + + " 8 -> 10 [style=solid label=\"\"]" + " }" + " }" - + " subgraph cluster_6 {" + + " subgraph cluster_11 {" + " label = \"Window.Into()\"" - + " 7 [label=\"Window.Assign\"]" - + " 5 -> 7 [style=solid label=\"\"]" + + " 12 [label=\"Window.Assign\"]" + + " 10 -> 12 [style=solid label=\"\"]" + " }" - + " subgraph cluster_8 {" + + " subgraph cluster_13 {" + " label = \"Combine.perKey(SumInteger)\"" - + " 9 [label=\"GroupByKey\"]" - + " 7 -> 9 [style=solid label=\"\"]" - + " subgraph cluster_10 {" + + " 14 [label=\"GroupByKey\"]" + + " 12 -> 14 [style=solid label=\"\"]" + + " subgraph cluster_15 {" + " label = \"Combine.perKey(SumInteger)/Combine.GroupedValues\"" - + " subgraph cluster_11 {" + + " subgraph cluster_16 {" + " label = \"Combine.perKey(SumInteger)/Combine.GroupedValues/ParDo(Anonymous)\"" - + " 12 [label=\"ParMultiDo(Anonymous)\"]" - + " 9 -> 12 [style=solid label=\"\"]" + + " 17 [label=\"ParMultiDo(Anonymous)\"]" + + " 14 -> 17 [style=solid label=\"\"]" + " }" + " }" + " }" diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index d7589a33939d..10f5d7e4d2ae 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -27,10 +27,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.beam.runners.core.construction.ReadTranslation; +import org.apache.beam.runners.core.construction.SplittableParDo.PrimitiveBoundedRead; import org.apache.beam.runners.direct.StepTransformResult.Builder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.Read.Bounded; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -49,7 +49,7 @@ /** * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} - * for the {@link Bounded Read.Bounded} primitive {@link PTransform}. + * for the {@link PrimitiveBoundedRead SplittableParDo.PrimitiveBoundedRead} {@link PTransform}. */ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { /** diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 367971c76418..c0eb97906d85 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -181,7 +181,7 @@ public DirectPipelineResult run(Pipeline pipeline) { DisplayDataValidator.validatePipeline(pipeline); DisplayDataValidator.validateOptions(options); - SplittableParDo.validateNoPrimitiveReads(pipeline); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); ExecutorService metricsPool = Executors.newCachedThreadPool( diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java index 6193e6c387a5..b2226980ae79 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java @@ -19,9 +19,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.runners.core.construction.SplittableParDo.PrimitiveUnboundedRead; import org.apache.beam.runners.local.StructuralKey; import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.io.Read.Unbounded; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader; @@ -30,7 +30,7 @@ /** * Provides methods to determine if a record is a duplicate within the evaluation of a {@link - * Unbounded} {@link PTransform}. + * PrimitiveUnboundedRead SplittableParDo.PrimitiveUnboundedRead} {@link PTransform}. */ interface UnboundedReadDeduplicator { /** diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 50fe80cbf7d8..4f3520f01707 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -26,9 +26,8 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.runners.core.construction.ReadTranslation; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.Read.Unbounded; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; @@ -48,7 +47,8 @@ /** * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} - * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}. + * for the {@link PrimitiveUnboundedRead SplittableParDo.PrimitiveUnboundedRead} primitive {@link + * PTransform}. */ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { // Occasionally close an existing reader and resume from checkpoint, to exercise close-and-resume @@ -78,7 +78,9 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { } private TransformEvaluator createEvaluator( - AppliedPTransform, Read.Unbounded> application) { + AppliedPTransform< + PBegin, PCollection, SplittableParDo.PrimitiveUnboundedRead> + application) { return new UnboundedReadEvaluator<>(application, evaluationContext, options, readerReuseChance); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index ccb19719f2f8..b934639ff4ce 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.CountDownLatch; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; @@ -77,11 +78,7 @@ public class BoundedReadEvaluatorFactoryTest { private BundleFactory bundleFactory; private AppliedPTransform longsProducer; - @Rule - public TestPipeline p = - TestPipeline.fromOptions( - PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create()) - .enableAbandonedNodeEnforcement(false); + @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); private PipelineOptions options; @@ -90,7 +87,7 @@ public void setup() { MockitoAnnotations.initMocks(this); source = CountingSource.upTo(10L); longs = p.apply(Read.from(source)); - + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(p); factory = new BoundedReadEvaluatorFactory( context, p.getOptions(), Long.MAX_VALUE /* minimum size for dynamic splits */); @@ -143,6 +140,7 @@ public void boundedSourceEvaluatorProducesDynamicSplits() throws Exception { elems[i] = (long) i; } PCollection read = p.apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems))); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(p); AppliedPTransform transform = DirectGraphs.getProducer(read); Collection> unreadInputs = new BoundedReadEvaluatorFactory.InputProvider(context, options) @@ -193,6 +191,7 @@ public void boundedSourceEvaluatorDynamicSplitsUnsplittable() throws Exception { PCollection read = p.apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L)))); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(p); AppliedPTransform transform = DirectGraphs.getProducer(read); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 4033d3b15231..d4072d29f959 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -47,6 +47,7 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.Executors; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator; import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard; import org.apache.beam.sdk.Pipeline; @@ -107,17 +108,13 @@ public class UnboundedReadEvaluatorFactoryTest { @Rule public ExpectedException thrown = ExpectedException.none(); - @Rule - public TestPipeline p = - TestPipeline.fromOptions( - PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create()) - .enableAbandonedNodeEnforcement(false); + @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Before public void setup() { source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn()); longs = p.apply(Read.from(source)); - + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(p); context = mock(EvaluationContext.class); factory = new UnboundedReadEvaluatorFactory(context, p.getOptions()); output = bundleFactory.createBundle(longs); @@ -201,6 +198,7 @@ public void unboundedSourceWithDuplicatesMultipleCalls() throws Exception { source.dedupes = true; PCollection pcollection = p.apply(Read.from(source)); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(p); AppliedPTransform sourceTransform = getProducer(pcollection); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); @@ -241,6 +239,7 @@ public void noElementsAvailableReaderIncludedInResidual() throws Exception { // Read with a very slow rate so by the second read there are no more elements PCollection pcollection = p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L))); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(p); AppliedPTransform sourceTransform = getProducer(pcollection); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); @@ -298,6 +297,7 @@ public void evaluatorReusesReaderAndClosesAtTheEnd() throws Exception { source.advanceWatermarkToInfinity = true; PCollection pcollection = p.apply(Read.from(source)); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(p); DirectGraph graph = DirectGraphs.getGraph(p); AppliedPTransform sourceTransform = graph.getProducer(pcollection); @@ -443,16 +443,19 @@ private void processElement(final TestUnboundedSource source) throws Exc final UnboundedReadEvaluatorFactory factory = new UnboundedReadEvaluatorFactory(context, p.getOptions()); - final Read.Unbounded unbounded = Read.from(source); + final SplittableParDo.PrimitiveUnboundedRead unbounded = + new SplittableParDo.PrimitiveUnboundedRead(Read.from(source)); final Pipeline pipeline = Pipeline.create(p.getOptions()); final PCollection pCollection = pipeline.apply(unbounded); - final AppliedPTransform, Read.Unbounded> application = - AppliedPTransform.of( - "test", - new HashMap<>(), - singletonMap(new TupleTag(), pCollection), - unbounded, - pipeline); + final AppliedPTransform< + PBegin, PCollection, SplittableParDo.PrimitiveUnboundedRead> + application = + AppliedPTransform.of( + "test", + new HashMap<>(), + singletonMap(new TupleTag(), pCollection), + unbounded, + pipeline); final TransformEvaluator> evaluator = factory.forApplication(application, null); final UnboundedSource.UnboundedReader reader = diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java index 1effd72d1bda..669583de5549 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java @@ -64,15 +64,6 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); this.depth++; - // TODO(BEAM-10670): Remove this and the ReadTranslator once the "use_deprecated_read" - // experiment is removed. Don't translate composite Read transforms since we expect the - // primitive expansion containing an SDF to be used. - if (node.getTransform() != null - && PTransformTranslation.READ_TRANSFORM_URN.equals( - PTransformTranslation.urnForTransformOrNull(node.getTransform()))) { - return CompositeBehavior.ENTER_TRANSFORM; - } - BatchTransformTranslator translator = getTranslator(node, batchContext); if (translator != null) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index a18d78fd7ddd..186efe17d250 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -71,7 +71,7 @@ protected FlinkRunner(FlinkPipelineOptions options) { @Override public PipelineResult run(Pipeline pipeline) { - SplittableParDo.validateNoPrimitiveReads(pipeline); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); MetricsEnvironment.setMetricsSupported(true); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 8d8f6c0b1807..e5ff56309d94 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -99,13 +99,6 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { PTransform transform = node.getTransform(); if (transform != null) { - // TODO(BEAM-10670): Remove this and the ReadTranslator once the "use_deprecated_read" - // experiment is removed. Don't translate composite Read transforms since we expect the - // primitive expansion containing an SDF to be used. - if (PTransformTranslation.READ_TRANSFORM_URN.equals( - PTransformTranslation.urnForTransformOrNull(transform))) { - return CompositeBehavior.ENTER_TRANSFORM; - } StreamTransformTranslator translator = FlinkStreamingTransformTranslators.getTranslator(transform); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java index 3b5152bae7c1..b4971020474e 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.flink.FlinkStreamingTransformTranslators.UnboundedSourceWrapperNoValueWithRecordId; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.Pipeline; @@ -58,7 +59,9 @@ public void readSourceTranslatorBoundedWithMaxParallelism() { final int maxParallelism = 6; final int parallelism = 2; - Read.Bounded transform = Read.from(new TestBoundedSource(maxParallelism)); + SplittableParDo.PrimitiveBoundedRead transform = + new SplittableParDo.PrimitiveBoundedRead<>( + Read.from(new TestBoundedSource(maxParallelism))); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); env.setMaxParallelism(maxParallelism); @@ -78,7 +81,8 @@ public void readSourceTranslatorBoundedWithoutMaxParallelism() { final int parallelism = 2; - Read.Bounded transform = Read.from(new TestBoundedSource(parallelism)); + SplittableParDo.PrimitiveBoundedRead transform = + new SplittableParDo.PrimitiveBoundedRead<>(Read.from(new TestBoundedSource(parallelism))); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); @@ -98,7 +102,8 @@ public void readSourceTranslatorUnboundedWithMaxParallelism() { final int maxParallelism = 6; final int parallelism = 2; - Read.Unbounded transform = Read.from(new TestUnboundedSource()); + SplittableParDo.PrimitiveUnboundedRead transform = + new SplittableParDo.PrimitiveUnboundedRead<>(Read.from(new TestUnboundedSource())); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); env.setMaxParallelism(maxParallelism); @@ -119,7 +124,8 @@ public void readSourceTranslatorUnboundedWithoutMaxParallelism() { final int parallelism = 2; - Read.Unbounded transform = Read.from(new TestUnboundedSource()); + SplittableParDo.PrimitiveUnboundedRead transform = + new SplittableParDo.PrimitiveUnboundedRead<>(Read.from(new TestUnboundedSource())); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 0df02d7f3471..07d0ca1e95c4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -72,7 +72,6 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -1078,7 +1077,7 @@ private void translateHelper(Window.Assign transform, TranslationContext /////////////////////////////////////////////////////////////////////////// // IO Translation. - registerTransformTranslator(Read.Bounded.class, new ReadTranslator()); + registerTransformTranslator(SplittableParDo.PrimitiveBoundedRead.class, new ReadTranslator()); registerTransformTranslator( TestStream.class, diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 10fb46b66af6..9eb162cc0aaa 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -72,6 +72,7 @@ import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded; import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; import org.apache.beam.runners.core.construction.UnconsumedReads; @@ -110,7 +111,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; -import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; @@ -496,8 +496,7 @@ private List getOverrides(boolean streaming) { PTransformMatchers.groupWithShardableStates(), new GroupIntoBatchesOverride.StreamingGroupIntoBatchesOverrideFactory(this))); - if (!fnApiEnabled - || ExperimentalOptions.hasExperiment(options, "beam_fn_api_use_deprecated_read")) { + if (!fnApiEnabled) { overridesBuilder .add( // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and @@ -520,6 +519,9 @@ private List getOverrides(boolean streaming) { // Dataflow Streaming runner overrides the SPLITTABLE_PROCESS_KEYED transform // natively in the Dataflow service. } else { + if (!fnApiEnabled) { + overridesBuilder.add(SplittableParDo.PRIMITIVE_BOUNDED_READ_OVERRIDE); + } overridesBuilder // Replace GroupIntoBatches before the state/timer replacements below since // GroupIntoBatches internally uses a stateful DoFn. diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java index 1cf62165ccd0..821d149f620d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java @@ -24,17 +24,18 @@ import com.google.api.services.dataflow.model.SourceMetadata; import java.util.HashMap; import java.util.Map; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.dataflow.internal.CustomSources; import org.apache.beam.runners.dataflow.util.PropertyNames; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; /** Translator for the {@code Read} {@code PTransform} for the Dataflow back-end. */ -class ReadTranslator implements TransformTranslator> { +class ReadTranslator implements TransformTranslator> { @Override - public void translate(Read.Bounded transform, TranslationContext context) { + public void translate( + SplittableParDo.PrimitiveBoundedRead transform, TranslationContext context) { translateReadHelper(transform.getSource(), transform, context); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 5f4359513577..593899caba38 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -63,6 +63,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -71,6 +72,7 @@ import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.SdkComponents; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -415,6 +417,11 @@ static com.google.api.services.dataflow.model.Source translateIOToCloudSource( Pipeline p = Pipeline.create(options); p.begin().apply(Read.from(io)); + // Note that we specifically perform this replacement since this is what the DataflowRunner + // does and the DataflowRunner class does not expose a way to perform these replacements + // without running the pipeline. + p.replaceAll(Collections.singletonList(SplittableParDo.PRIMITIVE_BOUNDED_READ_OVERRIDE)); + DataflowRunner runner = DataflowRunner.fromOptions(options); SdkComponents sdkComponents = SdkComponents.create(); RunnerApi.Environment defaultEnvironmentForDataflow = diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle index 6ea4c59986c7..9e78479ba408 100644 --- a/runners/samza/build.gradle +++ b/runners/samza/build.gradle @@ -105,6 +105,7 @@ task validatesRunner(type: Test) { // These tests produce the output but either the pipeline doesn't shutdown or PAssert fails excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testAdditionalOutputUnbounded' excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexBasicUnbounded' + excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestampedUnbounded' excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testOutputAfterCheckpointUnbounded' } } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index 9bde6d3dfb04..82c0b83f6f89 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -107,7 +107,7 @@ public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) { @Override public SamzaPipelineResult run(Pipeline pipeline) { - SplittableParDo.validateNoPrimitiveReads(pipeline); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); MetricsEnvironment.setMetricsSupported(true); if (LOG.isDebugEnabled()) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index 4dd444109b04..d99c0c6aa008 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -351,11 +351,14 @@ private void doProcessWatermark(Instant watermark, OpEmitter emitter) { timerInternalsFactory.setInputWatermark(actualInputWatermark); - pushbackFnRunner.startBundle(); - for (KeyedTimerData keyedTimerData : timerInternalsFactory.removeReadyTimers()) { - fireTimer(keyedTimerData); + Collection> readyTimers = timerInternalsFactory.removeReadyTimers(); + if (!readyTimers.isEmpty()) { + pushbackFnRunner.startBundle(); + for (KeyedTimerData keyedTimerData : readyTimers) { + fireTimer(keyedTimerData); + } + pushbackFnRunner.finishBundle(); } - pushbackFnRunner.finishBundle(); if (timerInternalsFactory.getOutputWatermark() == null || timerInternalsFactory.getOutputWatermark().isBefore(actualInputWatermark)) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java index 1b0ab61f5700..5262e337ca53 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.samza.runtime; +import java.util.Collection; import java.util.Collections; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -198,11 +199,14 @@ public void processElement( public void processWatermark(Instant watermark, OpEmitter> emitter) { timerInternalsFactory.setInputWatermark(watermark); - fnRunner.startBundle(); - for (KeyedTimerData keyedTimerData : timerInternalsFactory.removeReadyTimers()) { - fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData()); + Collection> readyTimers = timerInternalsFactory.removeReadyTimers(); + if (!readyTimers.isEmpty()) { + fnRunner.startBundle(); + for (KeyedTimerData keyedTimerData : readyTimers) { + fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData()); + } + fnRunner.finishBundle(); } - fnRunner.finishBundle(); if (timerInternalsFactory.getOutputWatermark() == null || timerInternalsFactory.getOutputWatermark().isBefore(watermark)) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java index c6bd8dcffc97..ae5406cb3c78 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java @@ -219,11 +219,14 @@ public void processElement( public void processWatermark(Instant watermark, OpEmitter emitter) { timerInternalsFactory.setInputWatermark(watermark); - fnRunner.startBundle(); - for (KeyedTimerData keyedTimerData : timerInternalsFactory.removeReadyTimers()) { - fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData()); + Collection> readyTimers = timerInternalsFactory.removeReadyTimers(); + if (!readyTimers.isEmpty()) { + fnRunner.startBundle(); + for (KeyedTimerData keyedTimerData : readyTimers) { + fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData()); + } + fnRunner.finishBundle(); } - fnRunner.finishBundle(); if (timerInternalsFactory.getOutputWatermark() == null || timerInternalsFactory.getOutputWatermark().isBefore(watermark)) { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java index 3c1dbb610df6..174294291bc1 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.samza.translation; import java.util.Map; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.serialization.Base64Serializer; import org.apache.beam.runners.samza.adapter.BoundedSourceSystem; import org.apache.beam.runners.samza.adapter.UnboundedSourceSystem; @@ -25,7 +26,6 @@ import org.apache.beam.runners.samza.util.SamzaCoders; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; @@ -54,9 +54,9 @@ public void translate( final PCollection output = ctx.getOutput(transform); final Coder> coder = SamzaCoders.of(output); final Source source = - transform instanceof Read.Unbounded - ? ((Read.Unbounded) transform).getSource() - : ((Read.Bounded) transform).getSource(); + transform instanceof SplittableParDo.PrimitiveBoundedRead + ? ((SplittableParDo.PrimitiveBoundedRead) transform).getSource() + : ((SplittableParDo.PrimitiveUnboundedRead) transform).getSource(); final String id = ctx.getIdForPValue(output); // Create system descriptor diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java index 0b91c4ff0afb..077203cdcdcb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java @@ -22,10 +22,10 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformEvaluator; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; @@ -141,10 +141,18 @@ public String toString() { transformString = replaceFnString(transformClass, transformString, "windowFn"); } else if (transformString.contains("")) { String sourceName = "..."; - if (transform instanceof Read.Bounded) { - sourceName = ((Read.Bounded) transform).getSource().getClass().getName(); - } else if (transform instanceof Read.Unbounded) { - sourceName = ((Read.Unbounded) transform).getSource().getClass().getName(); + if (transform instanceof SplittableParDo.PrimitiveBoundedRead) { + sourceName = + ((SplittableParDo.PrimitiveBoundedRead) transform) + .getSource() + .getClass() + .getName(); + } else if (transform instanceof SplittableParDo.PrimitiveUnboundedRead) { + sourceName = + ((SplittableParDo.PrimitiveUnboundedRead) transform) + .getSource() + .getClass() + .getName(); } transformString = transformString.replace("", sourceName); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index c8a0e30e6b93..d2f2513b91e0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.core.metrics.MetricsPusher; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; @@ -153,6 +154,8 @@ private SparkRunner(SparkPipelineOptions options) { public SparkPipelineResult run(final Pipeline pipeline) { LOG.info("Executing pipeline using the SparkRunner."); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline); + final SparkPipelineResult result; final Future startPipeline; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java index a348977e1232..6385504956cf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java @@ -19,6 +19,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformTranslator; @@ -72,6 +73,7 @@ public static SparkRunnerDebugger fromOptions(PipelineOptions options) { @Override public SparkPipelineResult run(Pipeline pipeline) { + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline); JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline"); JavaStreamingContext jssc = new JavaStreamingContext(jsc, new org.apache.spark.streaming.Duration(1000)); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index 0295ac3355ed..3267696d9207 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -326,7 +326,8 @@ public Partition[] getPartitions() { @Override public Option partitioner() { // setting the partitioner helps to "keep" the same partitioner in the following - // mapWithState read for Read.Unbounded, preventing a post-mapWithState shuffle. + // mapWithState read for SplittableParDo.PrimitiveUnboundedRead, preventing a + // post-mapWithState shuffle. return scala.Some.apply(partitioner); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java index 603e64105d19..88682941ba9c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.metrics.MetricsPusher; import org.apache.beam.runners.spark.structuredstreaming.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetricSource; @@ -133,6 +134,7 @@ private SparkStructuredStreamingRunner(SparkStructuredStreamingPipelineOptions o @Override public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) { + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(pipeline); MetricsEnvironment.setMetricsSupported(true); LOG.info( diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java index 4d923dafa40a..14ed65d8f5d5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java @@ -19,12 +19,12 @@ import java.util.HashMap; import java.util.Map; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Flatten; @@ -71,7 +71,8 @@ public class PipelineTranslatorBatch extends PipelineTranslator { TRANSFORM_TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch()); - TRANSFORM_TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch()); + TRANSFORM_TRANSLATORS.put( + SplittableParDo.PrimitiveBoundedRead.class, new ReadSourceTranslatorBatch()); TRANSFORM_TRANSLATORS.put( View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch()); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java index 3bfa9e4a1991..42a5fdc3cd5f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java @@ -19,12 +19,12 @@ import java.util.HashMap; import java.util.Map; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; import org.checkerframework.checker.nullness.qual.Nullable; @@ -65,7 +65,8 @@ public class PipelineTranslatorStreaming extends PipelineTranslator { // // TRANSFORM_TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch()); - TRANSFORM_TRANSLATORS.put(Read.Unbounded.class, new ReadSourceTranslatorStreaming()); + TRANSFORM_TRANSLATORS.put( + SplittableParDo.PrimitiveUnboundedRead.class, new ReadSourceTranslatorStreaming()); // TRANSFORM_TRANSLATORS // .put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch()); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 14a26555aeb4..c871819a97c8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -29,6 +29,7 @@ import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ParDoTranslation; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.SourceRDD; @@ -41,7 +42,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.DoFn; @@ -603,10 +603,11 @@ protected WindowedValue> computeNext() { }; } - private static TransformEvaluator> readBounded() { - return new TransformEvaluator>() { + private static TransformEvaluator> readBounded() { + return new TransformEvaluator>() { @Override - public void evaluate(Read.Bounded transform, EvaluationContext context) { + public void evaluate( + SplittableParDo.PrimitiveBoundedRead transform, EvaluationContext context) { String stepName = context.getCurrentTransform().getFullName(); final JavaSparkContext jsc = context.getSparkContext(); // create an RDD from a BoundedSource. diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 9b45f05c0ec5..16e9a936abc6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.ConsoleIO; @@ -58,7 +59,6 @@ import org.apache.beam.runners.spark.util.SparkCompat; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.DoFn; @@ -115,10 +115,11 @@ public String toNativeString() { }; } - private static TransformEvaluator> readUnbounded() { - return new TransformEvaluator>() { + private static TransformEvaluator> readUnbounded() { + return new TransformEvaluator>() { @Override - public void evaluate(Read.Unbounded transform, EvaluationContext context) { + public void evaluate( + SplittableParDo.PrimitiveUnboundedRead transform, EvaluationContext context) { final String stepName = context.getCurrentTransform().getFullName(); context.putDataset( transform, diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java index 75d206a95699..2273256df534 100644 --- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java @@ -99,7 +99,7 @@ public PipelineResult run(Pipeline pipeline) { Twister2PipelineExecutionEnvironment env = new Twister2PipelineExecutionEnvironment(options); LOG.info("Translating pipeline to Twister2 program."); pipeline.replaceAll(getDefaultOverrides()); - SplittableParDo.validateNoPrimitiveReads(pipeline); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); env.translate(pipeline); setupSystem(options); @@ -155,7 +155,7 @@ public PipelineResult runTest(Pipeline pipeline) { Twister2PipelineExecutionEnvironment env = new Twister2PipelineExecutionEnvironment(options); LOG.info("Translating pipeline to Twister2 program."); pipeline.replaceAll(getDefaultOverrides()); - SplittableParDo.validateNoPrimitiveReads(pipeline); + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); env.translate(pipeline); setupSystemTest(options); Map configMap = new HashMap(); diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/ReadSourceTranslatorBatch.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/ReadSourceTranslatorBatch.java index 20bd8b7a5b7e..eef0a13f5141 100644 --- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/ReadSourceTranslatorBatch.java +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/ReadSourceTranslatorBatch.java @@ -20,19 +20,21 @@ import edu.iu.dsc.tws.tset.env.BatchTSetEnvironment; import edu.iu.dsc.tws.tset.env.TSetEnvironment; import edu.iu.dsc.tws.tset.sets.batch.SourceTSet; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.twister2.Twister2BatchTranslationContext; import org.apache.beam.runners.twister2.translation.wrappers.Twister2BoundedSource; import org.apache.beam.runners.twister2.translators.BatchTransformTranslator; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; /** Source translator. */ -public class ReadSourceTranslatorBatch implements BatchTransformTranslator> { +public class ReadSourceTranslatorBatch + implements BatchTransformTranslator> { @Override - public void translateNode(Read.Bounded transform, Twister2BatchTranslationContext context) { + public void translateNode( + SplittableParDo.PrimitiveBoundedRead transform, Twister2BatchTranslationContext context) { BoundedSource boundedSource = transform.getSource(); Twister2BoundedSource twister2BoundedSource = new Twister2BoundedSource(boundedSource, context, context.getOptions()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index 992e3ddfe18c..9cbfcc415f16 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -27,7 +27,6 @@ import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; -import java.util.Set; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.InstantCoder; @@ -37,16 +36,15 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NoopCheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; -import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Deduplicate; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; import org.apache.beam.sdk.transforms.Impulse; -import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; @@ -62,8 +60,6 @@ import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.ValueWithRecordId.StripIdsDoFn; import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -135,26 +131,16 @@ private Bounded(@Nullable String name, BoundedSource source) { @Override public final PCollection expand(PBegin input) { source.validate(); - - if (useSdf(input.getPipeline().getOptions())) { - // We don't use Create here since Create is defined as a BoundedSource and using it would - // cause an infinite expansion loop. We can reconsider this if Create is implemented - // directly as a SplittableDoFn. - return input - .getPipeline() - .apply(Impulse.create()) - .apply( - MapElements.into(new TypeDescriptor>() {}).via(element -> source)) - .setCoder(SerializableCoder.of(new TypeDescriptor>() {})) - .apply(ParDo.of(new BoundedSourceAsSDFWrapperFn<>())) - .setCoder(source.getOutputCoder()); - } - - return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - PCollection.IsBounded.BOUNDED, - source.getOutputCoder()); + // We don't use Create here since Create is defined as a BoundedSource and using it would + // cause an infinite expansion loop. We can reconsider this if Create is implemented + // directly as a SplittableDoFn. + return input + .getPipeline() + .apply(Impulse.create()) + .apply(ParDo.of(new OutputSingleSource<>(source))) + .setCoder(SerializableCoder.of(new TypeDescriptor>() {})) + .apply(ParDo.of(new BoundedSourceAsSDFWrapperFn<>())) + .setCoder(source.getOutputCoder()); } /** Returns the {@code BoundedSource} used to create this {@code Read} {@code PTransform}. */ @@ -178,11 +164,12 @@ public void populateDisplayData(DisplayData.Builder builder) { /** {@link PTransform} that reads from a {@link UnboundedSource}. */ public static class Unbounded extends PTransform> { - private final UnboundedSource source; + private final UnboundedSource source; private Unbounded(@Nullable String name, UnboundedSource source) { super(name); - this.source = SerializableUtils.ensureSerializable(source); + this.source = + (UnboundedSource) SerializableUtils.ensureSerializable(source); } /** @@ -208,41 +195,29 @@ public BoundedReadFromUnboundedSource withMaxReadTime(Duration maxReadTime) { @Override public final PCollection expand(PBegin input) { source.validate(); - - if (useSdf(input.getPipeline().getOptions())) { - // We don't use Create here since Create is defined as a BoundedSource and using it would - // cause an infinite expansion loop. We can reconsider this if Create is implemented - // directly as a SplittableDoFn. - PCollection> outputWithIds = - input - .getPipeline() - .apply(Impulse.create()) - .apply( - MapElements.into(new TypeDescriptor>() {}) - .via(element -> (UnboundedSource) source)) - .setCoder( - SerializableCoder.of( - new TypeDescriptor>() {})) - .apply( - ParDo.of( - new UnboundedSourceAsSDFWrapperFn<>( - (Coder) source.getCheckpointMarkCoder()))) - .setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder())); - - if (source.requiresDeduping()) { - outputWithIds.apply( - Deduplicate., byte[]>withRepresentativeValueFn( - element -> element.getId()) - .withRepresentativeType(TypeDescriptor.of(byte[].class))); - } - return outputWithIds.apply(ParDo.of(new StripIdsDoFn<>())); - } - - return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - PCollection.IsBounded.UNBOUNDED, - source.getOutputCoder()); + // We don't use Create here since Create is defined as a BoundedSource and using it would + // cause an infinite expansion loop. We can reconsider this if Create is implemented + // directly as a SplittableDoFn. + PCollection> outputWithIds = + input + .getPipeline() + .apply(Impulse.create()) + .apply(ParDo.of(new OutputSingleSource<>(source))) + .setCoder( + SerializableCoder.of(new TypeDescriptor>() {})) + .apply( + ParDo.of( + new UnboundedSourceAsSDFWrapperFn<>( + (Coder) source.getCheckpointMarkCoder()))) + .setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder())); + + if (source.requiresDeduping()) { + outputWithIds.apply( + Deduplicate., byte[]>withRepresentativeValueFn( + element -> element.getId()) + .withRepresentativeType(TypeDescriptor.of(byte[].class))); + } + return outputWithIds.apply(ParDo.of(new StripIdsDoFn<>())); } /** Returns the {@code UnboundedSource} used to create this {@code Read} {@code PTransform}. */ @@ -921,41 +896,26 @@ public Progress getProgress() { } } - private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; + private static class OutputSingleSource extends DoFn { + private final T source; - /** - * Used to migrate runners to use splittable DoFn without needing to rely on PTransform - * replacement which allows removal of the migration code without changing the pipeline shape - * since pipeline shape affects pipeline update for some runners. - */ - private static final Set SPLITTABLE_DOFN_PREFERRED_RUNNERS = - ImmutableSet.of( - "DirectRunner", - "FlinkRunner", - "TestFlinkRunner", - "JetRunner", - "TestJetRunner", - "SamzaRunner", - "TestSamzaRunner", - "Twister2Runner", - "Twister2TestRunner"); - - private static boolean useSdf(PipelineOptions options) { - // TODO(BEAM-10670): Make this by default true and have runners opt-out instead. - boolean runnerPrefersSdf = false; - try { - runnerPrefersSdf = - SPLITTABLE_DOFN_PREFERRED_RUNNERS.contains(options.getRunner().getSimpleName()); - } catch (Exception e) { - // Ignore construction failures since there may not be a runner on the classpath if this is a - // test. - } - - // We keep the old names of experiments around for portable runners and existing users. - return (runnerPrefersSdf - || ExperimentalOptions.hasExperiment(options, "beam_fn_api") - || ExperimentalOptions.hasExperiment(options, "use_sdf_read")) - && !(ExperimentalOptions.hasExperiment(options, "beam_fn_api_use_deprecated_read") - || ExperimentalOptions.hasExperiment(options, "use_deprecated_read")); + private OutputSingleSource(T source) { + this.source = source; + } + + @ProcessElement + public void processElement(OutputReceiver outputReceiver) { + outputReceiver.output(source); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData.item("source", source.getClass()).withLabel("Read Source")) + .include("source", source); + } } + + private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java index 4355f4540821..0be211fd1de4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java @@ -23,6 +23,7 @@ import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.in; import static org.hamcrest.core.Is.is; @@ -122,14 +123,12 @@ >>> with open('/tmp/python_foo.tfrecord', 'rb') as f: public void testReadNamed() { readPipeline.enableAbandonedNodeEnforcement(false); - assertEquals( - "TFRecordIO.Read/Read.out", - readPipeline.apply(TFRecordIO.read().from("foo.*").withoutValidation()).getName()); - assertEquals( - "MyRead/Read.out", - readPipeline - .apply("MyRead", TFRecordIO.read().from("foo.*").withoutValidation()) - .getName()); + assertThat( + readPipeline.apply(TFRecordIO.read().from("foo.*").withoutValidation()).getName(), + startsWith("TFRecordIO.Read/Read")); + assertThat( + readPipeline.apply("MyRead", TFRecordIO.read().from("foo.*").withoutValidation()).getName(), + startsWith("MyRead/Read")); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 08aac090cd19..4dd694bf81da 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -31,6 +31,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -86,12 +87,14 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.experimental.runners.Enclosed; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.junit.runners.Parameterized; /** Tests for {@link TextIO.Read}. */ +@RunWith(Enclosed.class) public class TextIOReadTest { private static final int LINES_NUMBER_FOR_LARGE = 1000; private static final List EMPTY = Collections.emptyList(); @@ -474,9 +477,10 @@ public void testReadNamed() throws Exception { File emptyFile = tempFolder.newFile(); p.enableAbandonedNodeEnforcement(false); - assertEquals("TextIO.Read/Read.out", p.apply(TextIO.read().from("somefile")).getName()); - assertEquals( - "MyRead/Read.out", p.apply("MyRead", TextIO.read().from(emptyFile.getPath())).getName()); + assertThat(p.apply(TextIO.read().from("somefile")).getName(), startsWith("TextIO.Read/Read")); + assertThat( + p.apply("MyRead", TextIO.read().from(emptyFile.getPath())).getName(), + startsWith("MyRead/Read")); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java index d9e641d1b202..03149fa34f45 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java @@ -29,7 +29,6 @@ import java.util.EnumSet; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.testing.NeedsRunner; @@ -38,6 +37,7 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.Sample; import org.apache.beam.sdk.values.PBegin; @@ -60,7 +60,7 @@ public class TransformTreeTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); enum TransformsSeen { - READ, + IMPULSE, WRITE, SAMPLE } @@ -139,7 +139,7 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { assertNotNull(node.getEnclosingNode()); assertTrue(node.isCompositeNode()); } - assertThat(transform, not(instanceOf(Read.Bounded.class))); + assertThat(transform, not(instanceOf(Impulse.class))); return CompositeBehavior.ENTER_TRANSFORM; } @@ -154,13 +154,13 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) { @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { PTransform transform = node.getTransform(); - // Pick is a composite, should not be visited here. + // Composites should not be visited here. assertThat(transform, not(instanceOf(Combine.Globally.class))); assertThat(transform, not(instanceOf(WriteFiles.class))); - if (transform instanceof Read.Bounded - && node.getEnclosingNode().getTransform() instanceof TextIO.Read) { - assertTrue(visited.add(TransformsSeen.READ)); - } + assertThat(transform, not(instanceOf(TextIO.Read.class))); + // There are multiple impulses in the graph so we don't validate that we haven't + // seen one before. + visited.add(TransformsSeen.IMPULSE); } }); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java index 0b3b40b2fded..891d468db728 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java @@ -26,20 +26,18 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.PipelineTranslation; -import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BooleanCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.expansion.service.ExpansionService; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaTranslation; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString; @@ -99,22 +97,6 @@ public void testConstructPubsubRead() throws Exception { "test_namespacetest/PubsubUnboundedSource", "test_namespacetest/MapElements")); assertThat(transform.getInputsCount(), Matchers.is(0)); assertThat(transform.getOutputsCount(), Matchers.is(1)); - - RunnerApi.PTransform pubsubComposite = - result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0)); - RunnerApi.PTransform pubsubRead = - result.getComponents().getTransformsOrThrow(pubsubComposite.getSubtransforms(0)); - RunnerApi.ReadPayload readPayload = - RunnerApi.ReadPayload.parseFrom(pubsubRead.getSpec().getPayload()); - PubsubUnboundedSource.PubsubSource source = - (PubsubUnboundedSource.PubsubSource) ReadTranslation.unboundedSourceFromProto(readPayload); - PubsubUnboundedSource spec = source.outer; - - assertThat( - spec.getTopicProvider() == null ? null : String.valueOf(spec.getTopicProvider()), - Matchers.is(topic)); - assertThat(spec.getIdAttribute(), Matchers.is(idAttribute)); - assertThat(spec.getNeedsAttributes(), Matchers.is(true)); } @Test @@ -135,10 +117,8 @@ public void testConstructPubsubWrite() throws Exception { // Requirements are not passed as part of the expansion service so the validation // fails because of how we construct the pipeline to expand the transform since it now // has a transform with a requirement. - Pipeline p = - Pipeline.create( - PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create()); - p.apply("unbounded", Create.of(1, 2, 3)).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); + Pipeline p = Pipeline.create(); + p.apply("unbounded", Impulse.create()).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); String inputPCollection = diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index 706cd982fa62..8b04ec7b41a1 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.kafka; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertNotNull; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -30,7 +31,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.PipelineTranslation; -import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -120,28 +120,15 @@ public void testConstructKafkaRead() throws Exception { RunnerApi.PTransform kafkaComposite = result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0)); - RunnerApi.PTransform kafkaRead = + RunnerApi.PTransform kafkaReadComposite = result.getComponents().getTransformsOrThrow(kafkaComposite.getSubtransforms(0)); - RunnerApi.ReadPayload readPayload = - RunnerApi.ReadPayload.parseFrom(kafkaRead.getSpec().getPayload()); - KafkaUnboundedSource source = - (KafkaUnboundedSource) ReadTranslation.unboundedSourceFromProto(readPayload); - KafkaIO.Read spec = source.getSpec(); - - assertThat(spec.getConsumerConfig(), Matchers.is(consumerConfig)); - assertThat(spec.getTopics(), Matchers.is(topics)); - assertThat( - spec.getKeyDeserializerProvider() - .getDeserializer(spec.getConsumerConfig(), true) - .getClass() - .getName(), - Matchers.is(keyDeserializer)); - assertThat( - spec.getValueDeserializerProvider() - .getDeserializer(spec.getConsumerConfig(), false) - .getClass() - .getName(), - Matchers.is(valueDeserializer)); + RunnerApi.PTransform kafkaSdfComposite = + result.getComponents().getTransformsOrThrow(kafkaReadComposite.getSubtransforms(2)); + RunnerApi.PTransform kafkaSdfParDo = + result.getComponents().getTransformsOrThrow(kafkaSdfComposite.getSubtransforms(0)); + RunnerApi.ParDoPayload parDoPayload = + RunnerApi.ParDoPayload.parseFrom(kafkaSdfParDo.getSpec().getPayload()); + assertNotNull(parDoPayload.getRestrictionCoderId()); } @Test