Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ if (project.hasProperty('javaLinkageArtifactIds')) {
"${project.ext.mavenGroupId}:${it}:${project.version}"
}
}).join(',')]

// Exclusion file filters out existing linkage errors before a change
if (project.hasProperty('javaLinkageWriteBaseline')) {
arguments += ['--output-exclusion-file', project.getProperty('javaLinkageWriteBaseline')]
Expand Down
15 changes: 15 additions & 0 deletions runners/jet/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ task validatesRunnerBatch(type: Test) {
excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse' //impulse doesn't cooperate properly with Jet when multiple cluster members are used
exclude '**/SplittableDoFnTest.class' //Splittable DoFn functionality not yet in the runner
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'

//Jet Runner doesn't current support @RequiresTimeSortedInput annotation.
excludeCategories 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput'

//Event type not supported in TestStream: class org.apache.beam.sdk.testing.AutoValue_TestStream_ProcessingTimeEvent
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
}
filter {
// unbounded streams created from bounded sources not supported by Jet Runner
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testTwoTimersSettingEachOtherWithCreateAsInputUnbounded'

// timer output timestamps not supported by Jet Runner
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp'

excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testAssertionSiteIsCaptured*'
}

maxHeapSize = '4g'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
Expand Down Expand Up @@ -186,12 +185,14 @@ public Vertex translate(
SerializablePipelineOptions pipelineOptions = context.getOptions();
Coder inputValueCoder = ((PCollection) Utils.getInput(appliedTransform)).getCoder();
Coder inputCoder = Utils.getCoder((PCollection) Utils.getInput(appliedTransform));
List<PCollectionView<?>> sideInputs = Utils.getSideInputs(appliedTransform);
Collection<PCollectionView<?>> sideInputs = Utils.getSideInputs(appliedTransform);
Map<? extends PCollectionView<?>, Coder> sideInputCoders =
sideInputs.stream()
.collect(Collectors.toMap(si -> si, si -> Utils.getCoder(si.getPCollection())));
DoFnSchemaInformation doFnSchemaInformation =
ParDoTranslation.getSchemaInformation(appliedTransform);
Map<String, PCollectionView<?>> sideInputMappings =
ParDoTranslation.getSideInputMapping(appliedTransform);
SupplierEx<Processor> processorSupplier =
usesStateOrTimers
? new StatefulParDoP.Supplier(
Expand All @@ -208,7 +209,8 @@ public Vertex translate(
outputCoders,
inputValueCoder,
outputValueCoders,
sideInputs)
sideInputs,
sideInputMappings)
: new ParDoP.Supplier(
stepId,
vertexId,
Expand All @@ -223,7 +225,8 @@ public Vertex translate(
outputCoders,
inputValueCoder,
outputValueCoders,
sideInputs);
sideInputs,
sideInputMappings);

Vertex vertex = dagBuilder.addVertex(vertexId, processorSupplier);
dagBuilder.registerConstructionListeners((DAGBuilder.WiringListener) processorSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ static <T> Map<T, Coder> getCoders(
Map<TupleTag<?>, PValue> pCollections,
Function<Map.Entry<TupleTag<?>, PValue>, T> tupleTagExtractor) {
return pCollections.entrySet().stream()
.collect(Collectors.toMap(tupleTagExtractor, e -> getCoder((PCollection) e.getValue())));
.collect(
Collectors.toMap(
tupleTagExtractor, e -> getCoder((PCollection) e.getValue()), (v1, v2) -> v1));
}

static Map<TupleTag<?>, Coder<?>> getOutputValueCoders(
Expand All @@ -146,7 +148,7 @@ static Map<TupleTag<?>, Coder<?>> getOutputValueCoders(
.collect(Collectors.toMap(Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
}

static List<PCollectionView<?>> getSideInputs(AppliedPTransform<?, ?, ?> appliedTransform) {
static Collection<PCollectionView<?>> getSideInputs(AppliedPTransform<?, ?, ?> appliedTransform) {
PTransform<?, ?> transform = appliedTransform.getTransform();
if (transform instanceof ParDo.MultiOutput) {
ParDo.MultiOutput multiParDo = (ParDo.MultiOutput) transform;
Expand Down Expand Up @@ -177,7 +179,7 @@ static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> appliedTransform) {
if (DoFnSignatures.requiresTimeSortedInput(doFn)) {
throw new UnsupportedOperationException(
String.format(
"%s doesn't current support @RequiresTimeSortedInput annotation.",
"%s doesn't currently support @RequiresTimeSortedInput annotation.",
JetRunner.class.getSimpleName()));
}
return doFn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
Expand Down Expand Up @@ -110,6 +111,7 @@ abstract class AbstractParDoP<InputT, OutputT> implements Processor {
Coder<InputT> inputValueCoder,
Map<TupleTag<?>, Coder<?>> outputValueCoders,
Map<Integer, PCollectionView<?>> ordinalToSideInput,
Map<String, PCollectionView<?>> sideInputMapping,
String ownerId,
String stepId) {
this.pipelineOptions = pipelineOptions;
Expand All @@ -131,6 +133,7 @@ abstract class AbstractParDoP<InputT, OutputT> implements Processor {
this.inputValueCoder = inputValueCoder;
this.outputValueCoders = outputValueCoders;
this.ordinalToSideInput = ordinalToSideInput;
this.sideInputMapping = sideInputMapping;
this.ownerId = ownerId;
this.stepId = stepId;
this.cooperative = isCooperativenessAllowed(pipelineOptions) && hasOutput();
Expand Down Expand Up @@ -391,7 +394,8 @@ abstract static class AbstractSupplier<InputT, OutputT>
private final Map<TupleTag<?>, Coder<?>> outputCoders;
private final Coder<InputT> inputValueCoder;
private final Map<TupleTag<?>, Coder<?>> outputValueCoders;
private final List<PCollectionView<?>> sideInputs;
private final Collection<PCollectionView<?>> sideInputs;
private final Map<String, PCollectionView<?>> sideInputMapping;

private final Map<Integer, PCollectionView<?>> ordinalToSideInput = new HashMap<>();

Expand All @@ -409,7 +413,8 @@ abstract static class AbstractSupplier<InputT, OutputT>
Map<TupleTag<?>, Coder<?>> outputCoders,
Coder<InputT> inputValueCoder,
Map<TupleTag<?>, Coder<?>> outputValueCoders,
List<PCollectionView<?>> sideInputs) {
Collection<PCollectionView<?>> sideInputs,
Map<String, PCollectionView<?>> sideInputMapping) {
this.stepId = stepId;
this.ownerId = ownerId;
this.pipelineOptions = pipelineOptions;
Expand All @@ -426,6 +431,7 @@ abstract static class AbstractSupplier<InputT, OutputT>
this.inputValueCoder = inputValueCoder;
this.outputValueCoders = outputValueCoders;
this.sideInputs = sideInputs;
this.sideInputMapping = sideInputMapping;
}

@Override
Expand All @@ -449,6 +455,7 @@ public Processor getEx() {
inputValueCoder,
Collections.unmodifiableMap(outputValueCoders),
Collections.unmodifiableMap(ordinalToSideInput),
sideInputMapping,
ownerId,
stepId);
}
Expand All @@ -466,6 +473,7 @@ abstract Processor getEx(
Coder<InputT> inputValueCoder,
Map<TupleTag<?>, Coder<?>> outputValueCoders,
Map<Integer, PCollectionView<?>> ordinalToSideInput,
Map<String, PCollectionView<?>> sideInputMapping,
String ownerId,
String stepId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.jet.processors;

import com.hazelcast.jet.core.Processor;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -56,6 +57,7 @@ private ParDoP(
Coder<InputT> inputValueCoder,
Map<TupleTag<?>, Coder<?>> outputValueCoders,
Map<Integer, PCollectionView<?>> ordinalToSideInput,
Map<String, PCollectionView<?>> sideInputMapping,
String ownerId,
String stepId) {
super(
Expand All @@ -71,6 +73,7 @@ private ParDoP(
inputValueCoder,
outputValueCoders,
ordinalToSideInput,
sideInputMapping,
ownerId,
stepId);
}
Expand Down Expand Up @@ -124,7 +127,8 @@ public Supplier(
Map<TupleTag<?>, Coder<?>> outputCoders,
Coder<InputT> inputValueCoder,
Map<TupleTag<?>, Coder<?>> outputValueCoders,
List<PCollectionView<?>> sideInputs) {
Collection<PCollectionView<?>> sideInputs,
Map<String, PCollectionView<?>> sideInputMapping) {
super(
stepId,
ownerId,
Expand All @@ -139,7 +143,8 @@ public Supplier(
outputCoders,
inputValueCoder,
outputValueCoders,
sideInputs);
sideInputs,
sideInputMapping);
}

@Override
Expand All @@ -156,6 +161,7 @@ Processor getEx(
Coder<InputT> inputValueCoder,
Map<TupleTag<?>, Coder<?>> outputValueCoders,
Map<Integer, PCollectionView<?>> ordinalToSideInput,
Map<String, PCollectionView<?>> sideInputMapping,
String ownerId,
String stepId) {
return new ParDoP<>(
Expand All @@ -171,6 +177,7 @@ Processor getEx(
inputValueCoder,
outputValueCoders,
ordinalToSideInput,
sideInputMapping,
ownerId,
stepId);
}
Expand Down
Loading