diff --git a/build.gradle b/build.gradle index bf24706cb45f..d61071a4ff7d 100644 --- a/build.gradle +++ b/build.gradle @@ -368,7 +368,7 @@ if (project.hasProperty('javaLinkageArtifactIds')) { "${project.ext.mavenGroupId}:${it}:${project.version}" } }).join(',')] - + // Exclusion file filters out existing linkage errors before a change if (project.hasProperty('javaLinkageWriteBaseline')) { arguments += ['--output-exclusion-file', project.getProperty('javaLinkageWriteBaseline')] diff --git a/runners/jet/build.gradle b/runners/jet/build.gradle index ae9daf5ff640..3813fdb64d4c 100644 --- a/runners/jet/build.gradle +++ b/runners/jet/build.gradle @@ -77,6 +77,21 @@ task validatesRunnerBatch(type: Test) { excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse' //impulse doesn't cooperate properly with Jet when multiple cluster members are used exclude '**/SplittableDoFnTest.class' //Splittable DoFn functionality not yet in the runner excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' + + //Jet Runner doesn't current support @RequiresTimeSortedInput annotation. + excludeCategories 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput' + + //Event type not supported in TestStream: class org.apache.beam.sdk.testing.AutoValue_TestStream_ProcessingTimeEvent + excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' + } + filter { + // unbounded streams created from bounded sources not supported by Jet Runner + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testTwoTimersSettingEachOtherWithCreateAsInputUnbounded' + + // timer output timestamps not supported by Jet Runner + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp' + + excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testAssertionSiteIsCaptured*' } maxHeapSize = '4g' diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/JetTransformTranslators.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/JetTransformTranslators.java index c79627bd8d65..644de8937650 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/JetTransformTranslators.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/JetTransformTranslators.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation; @@ -186,12 +185,14 @@ public Vertex translate( SerializablePipelineOptions pipelineOptions = context.getOptions(); Coder inputValueCoder = ((PCollection) Utils.getInput(appliedTransform)).getCoder(); Coder inputCoder = Utils.getCoder((PCollection) Utils.getInput(appliedTransform)); - List> sideInputs = Utils.getSideInputs(appliedTransform); + Collection> sideInputs = Utils.getSideInputs(appliedTransform); Map, Coder> sideInputCoders = sideInputs.stream() .collect(Collectors.toMap(si -> si, si -> Utils.getCoder(si.getPCollection()))); DoFnSchemaInformation doFnSchemaInformation = ParDoTranslation.getSchemaInformation(appliedTransform); + Map> sideInputMappings = + ParDoTranslation.getSideInputMapping(appliedTransform); SupplierEx processorSupplier = usesStateOrTimers ? new StatefulParDoP.Supplier( @@ -208,7 +209,8 @@ public Vertex translate( outputCoders, inputValueCoder, outputValueCoders, - sideInputs) + sideInputs, + sideInputMappings) : new ParDoP.Supplier( stepId, vertexId, @@ -223,7 +225,8 @@ public Vertex translate( outputCoders, inputValueCoder, outputValueCoders, - sideInputs); + sideInputs, + sideInputMappings); Vertex vertex = dagBuilder.addVertex(vertexId, processorSupplier); dagBuilder.registerConstructionListeners((DAGBuilder.WiringListener) processorSupplier); diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/Utils.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/Utils.java index aa898269e462..2bd1b5d9b9b6 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/Utils.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/Utils.java @@ -136,7 +136,9 @@ static Map getCoders( Map, PValue> pCollections, Function, PValue>, T> tupleTagExtractor) { return pCollections.entrySet().stream() - .collect(Collectors.toMap(tupleTagExtractor, e -> getCoder((PCollection) e.getValue()))); + .collect( + Collectors.toMap( + tupleTagExtractor, e -> getCoder((PCollection) e.getValue()), (v1, v2) -> v1)); } static Map, Coder> getOutputValueCoders( @@ -146,7 +148,7 @@ static Map, Coder> getOutputValueCoders( .collect(Collectors.toMap(Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder())); } - static List> getSideInputs(AppliedPTransform appliedTransform) { + static Collection> getSideInputs(AppliedPTransform appliedTransform) { PTransform transform = appliedTransform.getTransform(); if (transform instanceof ParDo.MultiOutput) { ParDo.MultiOutput multiParDo = (ParDo.MultiOutput) transform; @@ -177,7 +179,7 @@ static boolean usesStateOrTimers(AppliedPTransform appliedTransform) { if (DoFnSignatures.requiresTimeSortedInput(doFn)) { throw new UnsupportedOperationException( String.format( - "%s doesn't current support @RequiresTimeSortedInput annotation.", + "%s doesn't currently support @RequiresTimeSortedInput annotation.", JetRunner.class.getSimpleName())); } return doFn; diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AbstractParDoP.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AbstractParDoP.java index ae92f894236b..2a7ac146a72c 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AbstractParDoP.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AbstractParDoP.java @@ -26,6 +26,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.HashMap; @@ -110,6 +111,7 @@ abstract class AbstractParDoP implements Processor { Coder inputValueCoder, Map, Coder> outputValueCoders, Map> ordinalToSideInput, + Map> sideInputMapping, String ownerId, String stepId) { this.pipelineOptions = pipelineOptions; @@ -131,6 +133,7 @@ abstract class AbstractParDoP implements Processor { this.inputValueCoder = inputValueCoder; this.outputValueCoders = outputValueCoders; this.ordinalToSideInput = ordinalToSideInput; + this.sideInputMapping = sideInputMapping; this.ownerId = ownerId; this.stepId = stepId; this.cooperative = isCooperativenessAllowed(pipelineOptions) && hasOutput(); @@ -391,7 +394,8 @@ abstract static class AbstractSupplier private final Map, Coder> outputCoders; private final Coder inputValueCoder; private final Map, Coder> outputValueCoders; - private final List> sideInputs; + private final Collection> sideInputs; + private final Map> sideInputMapping; private final Map> ordinalToSideInput = new HashMap<>(); @@ -409,7 +413,8 @@ abstract static class AbstractSupplier Map, Coder> outputCoders, Coder inputValueCoder, Map, Coder> outputValueCoders, - List> sideInputs) { + Collection> sideInputs, + Map> sideInputMapping) { this.stepId = stepId; this.ownerId = ownerId; this.pipelineOptions = pipelineOptions; @@ -426,6 +431,7 @@ abstract static class AbstractSupplier this.inputValueCoder = inputValueCoder; this.outputValueCoders = outputValueCoders; this.sideInputs = sideInputs; + this.sideInputMapping = sideInputMapping; } @Override @@ -449,6 +455,7 @@ public Processor getEx() { inputValueCoder, Collections.unmodifiableMap(outputValueCoders), Collections.unmodifiableMap(ordinalToSideInput), + sideInputMapping, ownerId, stepId); } @@ -466,6 +473,7 @@ abstract Processor getEx( Coder inputValueCoder, Map, Coder> outputValueCoders, Map> ordinalToSideInput, + Map> sideInputMapping, String ownerId, String stepId); diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/ParDoP.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/ParDoP.java index 38f07c9cae0e..004a0222ee29 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/ParDoP.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/ParDoP.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.jet.processors; import com.hazelcast.jet.core.Processor; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -56,6 +57,7 @@ private ParDoP( Coder inputValueCoder, Map, Coder> outputValueCoders, Map> ordinalToSideInput, + Map> sideInputMapping, String ownerId, String stepId) { super( @@ -71,6 +73,7 @@ private ParDoP( inputValueCoder, outputValueCoders, ordinalToSideInput, + sideInputMapping, ownerId, stepId); } @@ -124,7 +127,8 @@ public Supplier( Map, Coder> outputCoders, Coder inputValueCoder, Map, Coder> outputValueCoders, - List> sideInputs) { + Collection> sideInputs, + Map> sideInputMapping) { super( stepId, ownerId, @@ -139,7 +143,8 @@ public Supplier( outputCoders, inputValueCoder, outputValueCoders, - sideInputs); + sideInputs, + sideInputMapping); } @Override @@ -156,6 +161,7 @@ Processor getEx( Coder inputValueCoder, Map, Coder> outputValueCoders, Map> ordinalToSideInput, + Map> sideInputMapping, String ownerId, String stepId) { return new ParDoP<>( @@ -171,6 +177,7 @@ Processor getEx( inputValueCoder, outputValueCoders, ordinalToSideInput, + sideInputMapping, ownerId, stepId); } diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java index 1c51e9f840dc..22dbd21467eb 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java @@ -19,10 +19,11 @@ import com.hazelcast.jet.core.Processor; import com.hazelcast.jet.core.Watermark; -import java.util.HashMap; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nonnull; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -54,7 +55,6 @@ public class StatefulParDoP extends AbstractParDoP, OutputT> { // todo: unify with ParDoP? private KeyedStepContext keyedStepContext; - private InMemoryTimerInternals timerInternals; private StatefulParDoP( DoFn, OutputT> doFn, @@ -69,6 +69,7 @@ private StatefulParDoP( Coder> inputValueCoder, Map, Coder> outputValueCoders, Map> ordinalToSideInput, + Map> sideInputMapping, String ownerId, String stepId) { super( @@ -84,18 +85,19 @@ private StatefulParDoP( inputValueCoder, outputValueCoders, ordinalToSideInput, + sideInputMapping, ownerId, stepId); } private static void fireTimer( - TimerInternals.TimerData timer, DoFnRunner, ?> doFnRunner) { + Object key, TimerInternals.TimerData timer, DoFnRunner, ?> doFnRunner) { StateNamespace namespace = timer.getNamespace(); BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); doFnRunner.onTimer( timer.getTimerId(), timer.getTimerFamilyId(), - null, + key, window, timer.getTimestamp(), timer.getOutputTimestamp(), @@ -115,8 +117,7 @@ private static void fireTimer( WindowingStrategy windowingStrategy, DoFnSchemaInformation doFnSchemaInformation, Map> sideInputMapping) { - timerInternals = new InMemoryTimerInternals(); - keyedStepContext = new KeyedStepContext(timerInternals); + keyedStepContext = new KeyedStepContext(); return DoFnRunners.simpleRunner( pipelineOptions, doFn, @@ -134,14 +135,7 @@ private static void fireTimer( @Override protected void startRunnerBundle(DoFnRunner, OutputT> runner) { - try { - Instant now = Instant.now(); - timerInternals.advanceProcessingTime(now); - timerInternals.advanceSynchronizedProcessingTime(now); - } catch (Exception e) { - throw new RuntimeException("Failed advancing time!"); - } - + keyedStepContext.advanceProcessingTimes(); super.startRunnerBundle(runner); } @@ -150,9 +144,10 @@ protected void processElementWithRunner( DoFnRunner, OutputT> runner, WindowedValue> windowedValue) { KV kv = windowedValue.getValue(); Object key = kv.getKey(); - keyedStepContext.setKey(key); + keyedStepContext.setKey(key); super.processElementWithRunner(runner, windowedValue); + keyedStepContext.clearKey(); } @Override @@ -166,40 +161,28 @@ public boolean complete() { } private boolean flushTimers(long watermark) { - if (timerInternals.currentInputWatermarkTime().isBefore(watermark)) { - try { - Instant watermarkInstant = new Instant(watermark); - timerInternals.advanceInputWatermark(watermarkInstant); - if (watermarkInstant.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - timerInternals.advanceProcessingTime(watermarkInstant); - timerInternals.advanceSynchronizedProcessingTime(watermarkInstant); - } - fireEligibleTimers(timerInternals); - } catch (Exception e) { - throw new RuntimeException("Failed advancing processing time", e); - } - } + keyedStepContext.flushTimers(watermark); return outputManager.tryFlush(); } - private void fireEligibleTimers(InMemoryTimerInternals timerInternals) { + private void fireEligibleTimers(Object key, InMemoryTimerInternals timerInternals) { while (true) { TimerInternals.TimerData timer; boolean hasFired = false; while ((timer = timerInternals.removeNextEventTimer()) != null) { hasFired = true; - fireTimer(timer, doFnRunner); + fireTimer(key, timer, doFnRunner); } while ((timer = timerInternals.removeNextProcessingTimer()) != null) { hasFired = true; - fireTimer(timer, doFnRunner); + fireTimer(key, timer, doFnRunner); } while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) { hasFired = true; - fireTimer(timer, doFnRunner); + fireTimer(key, timer, doFnRunner); } if (!hasFired) { @@ -229,7 +212,8 @@ public Supplier( Map, Coder> outputCoders, Coder> inputValueCoder, Map, Coder> outputValueCoders, - List> sideInputs) { + Collection> sideInputs, + Map> sideInputMapping) { super( stepId, ownerId, @@ -244,7 +228,8 @@ public Supplier( outputCoders, inputValueCoder, outputValueCoders, - sideInputs); + sideInputs, + sideInputMapping); } @Override @@ -261,6 +246,7 @@ Processor getEx( Coder> inputValueCoder, Map, Coder> outputValueCoders, Map> ordinalToSideInput, + Map> sideInputMapping, String ownerId, String stepId) { return new StatefulParDoP<>( @@ -276,36 +262,94 @@ Processor getEx( inputValueCoder, outputValueCoders, ordinalToSideInput, + sideInputMapping, ownerId, stepId); } } - private static class KeyedStepContext implements StepContext { + private class KeyedStepContext implements StepContext { - private final Map stateInternalsOfKeys; - private final InMemoryTimerInternals timerInternals; + private final Object nullKey = new Object(); - private InMemoryStateInternals currentStateInternals; + private final ConcurrentHashMap keyedStateInternals; + private final ConcurrentHashMap keyedTimerInternals; - KeyedStepContext(InMemoryTimerInternals timerInternals) { - this.stateInternalsOfKeys = new HashMap<>(); - this.timerInternals = timerInternals; + @SuppressWarnings("ThreadLocalUsage") + private final ThreadLocal currentKey = new ThreadLocal<>(); + + KeyedStepContext() { + this.keyedStateInternals = new ConcurrentHashMap<>(); + this.keyedTimerInternals = new ConcurrentHashMap<>(); } void setKey(Object key) { - currentStateInternals = - stateInternalsOfKeys.computeIfAbsent(key, InMemoryStateInternals::forKey); + Object normalizedKey = key == null ? nullKey : key; + currentKey.set(normalizedKey); + keyedStateInternals.computeIfAbsent(normalizedKey, InMemoryStateInternals::forKey); + keyedTimerInternals.computeIfAbsent(normalizedKey, k -> new InMemoryTimerInternals()); + } + + void clearKey() { + currentKey.remove(); } @Override public StateInternals stateInternals() { - return currentStateInternals; + Object key = currentKey.get(); + if (key == null) { + throw new IllegalStateException("Active key should be set"); + } + return keyedStateInternals.get(key); } @Override public TimerInternals timerInternals() { - return timerInternals; + Object key = currentKey.get(); + if (key == null) { + throw new IllegalStateException("Active key should be set"); + } + return keyedTimerInternals.get(key); + } + + public void advanceProcessingTimes() { + Instant now = Instant.now(); + keyedTimerInternals + .values() + .forEach( + timerInternals -> { + try { + timerInternals.advanceProcessingTime(now); + timerInternals.advanceSynchronizedProcessingTime(now); + } catch (Exception e) { + throw new RuntimeException("Failed advancing time!"); + } + }); + } + + public void flushTimers(long watermark) { + Instant watermarkInstant = new Instant(watermark); + keyedTimerInternals + .entrySet() + .forEach( + (entry) -> { + InMemoryTimerInternals timerInternals = entry.getValue(); + if (timerInternals.currentInputWatermarkTime().isBefore(watermark)) { + try { + timerInternals.advanceInputWatermark(watermarkInstant); + if (watermarkInstant.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + timerInternals.advanceProcessingTime(watermarkInstant); + timerInternals.advanceSynchronizedProcessingTime(watermarkInstant); + } + Object key = entry.getKey(); + setKey(key); + fireEligibleTimers(key, timerInternals); + clearKey(); + } catch (Exception e) { + throw new RuntimeException("Failed advancing processing time", e); + } + } + }); } } }