diff --git a/CHANGES.md b/CHANGES.md index 2c569bcb5695..8b2a124c5e28 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -147,6 +147,7 @@ * Upgrade Sphinx to 3.0.3 for building PyDoc. * Added a PTransform for image annotation using Google Cloud AI image processing service ([BEAM-9646](https://issues.apache.org/jira/browse/BEAM-9646)) +* Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)). ## Breaking Changes diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 7ed390b8f861..334f145d5ede 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -24,10 +24,12 @@ import com.google.api.services.dataflow.model.SideInputInfo; import java.io.Closeable; import java.io.IOException; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; @@ -519,7 +521,7 @@ public void start( synchronizedProcessingTime); this.cachedFiredTimers = null; - this.cachedFiredUserTimers = null; + this.toBeFiredTimersOrdered = null; } public void flushState() { @@ -559,28 +561,67 @@ public TimerData getNextFiredTimer(Coder windowCode return nextTimer; } - // Lazily initialized - private Iterator cachedFiredUserTimers = null; + private PriorityQueue toBeFiredTimersOrdered = null; + + // to track if timer is reset earlier mid-bundle. + // Map of timer's id to timer's firing time to check + // the actual firing time of a timer. + private Map firedTimer = new HashMap<>(); public TimerData getNextFiredUserTimer(Coder windowCoder) { - if (cachedFiredUserTimers == null) { - cachedFiredUserTimers = - FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers()) - .filter( - timer -> - WindmillTimerInternals.isUserTimer(timer) - && timer.getStateFamily().equals(stateFamily)) - .transform( - timer -> - WindmillTimerInternals.windmillTimerToTimerData( - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder)) - .iterator(); + if (toBeFiredTimersOrdered == null) { + + toBeFiredTimersOrdered = new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp)); + FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers()) + .filter( + timer -> + WindmillTimerInternals.isUserTimer(timer) + && timer.getStateFamily().equals(stateFamily)) + .transform( + timer -> + WindmillTimerInternals.windmillTimerToTimerData( + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder)) + .iterator() + .forEachRemaining( + timerData -> { + firedTimer.put( + timerData.getTimerId() + '+' + timerData.getTimerFamilyId(), + timerData.getTimestamp()); + toBeFiredTimersOrdered.add(timerData); + }); } - if (!cachedFiredUserTimers.hasNext()) { + Instant currentInputWatermark = userTimerInternals.currentInputWatermarkTime(); + + if (userTimerInternals.hasTimerBefore(currentInputWatermark)) { + List currentTimers = userTimerInternals.getCurrentTimers(); + + for (TimerData timerData : currentTimers) { + firedTimer.put( + timerData.getTimerId() + '+' + timerData.getTimerFamilyId(), + timerData.getTimestamp()); + toBeFiredTimersOrdered.add(timerData); + } + } + + TimerData nextTimer = null; + + // fire timer only if its timestamp matched. Else it is either reset or obsolete. + while (!toBeFiredTimersOrdered.isEmpty()) { + nextTimer = toBeFiredTimersOrdered.poll(); + String timerUniqueId = nextTimer.getTimerId() + '+' + nextTimer.getTimerFamilyId(); + if (firedTimer.containsKey(timerUniqueId) + && firedTimer.get(timerUniqueId).isEqual(nextTimer.getTimestamp())) { + break; + } else { + nextTimer = null; + } + } + + if (nextTimer == null) { return null; } - TimerData nextTimer = cachedFiredUserTimers.next(); + // User timers must be explicitly deleted when delivered, to release the implied hold userTimerInternals.deleteTimer(nextTimer); return nextTimer; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index f46fd4968e83..5269cf29ea67 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; @@ -225,6 +227,29 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { timers.clear(); } + public boolean hasTimerBefore(Instant time) { + for (Cell cell : timerStillPresent.cellSet()) { + TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey()); + if (cell.getValue()) { + if (timerData.getTimestamp().isBefore(time)) { + return true; + } + } + } + return false; + } + + public List getCurrentTimers() { + List timerDataList = new ArrayList<>(); + for (Cell cell : timerStillPresent.cellSet()) { + TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey()); + if (cell.getValue()) { + timerDataList.add(timerData); + } + } + return timerDataList; + } + private boolean needsWatermarkHold(TimerData timerData) { // If it is a user timer or a system timer with outputTimestamp different than timestamp return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 3f0d13c7a4d6..776fd3e26218 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -3959,7 +3959,7 @@ public void testEventTimeTimerOrdering() throws Exception { } testEventTimeTimerOrderingWithInputPTransform( - now, numTestElements, builder.advanceWatermarkToInfinity()); + now, numTestElements, builder.advanceWatermarkToInfinity(), IsBounded.BOUNDED); } /** A test makes sure that an event time timers are correctly ordered using Create transform. */ @@ -3970,7 +3970,7 @@ public void testEventTimeTimerOrdering() throws Exception { UsesStatefulParDo.class, UsesStrictTimerOrdering.class }) - public void testEventTimeTimerOrderingWithCreate() throws Exception { + public void testEventTimeTimerOrderingWithCreateBounded() throws Exception { final int numTestElements = 100; final Instant now = new Instant(1500000000000L); @@ -3980,13 +3980,39 @@ public void testEventTimeTimerOrderingWithCreate() throws Exception { } testEventTimeTimerOrderingWithInputPTransform( - now, numTestElements, Create.timestamped(elements)); + now, numTestElements, Create.timestamped(elements), IsBounded.BOUNDED); + } + + /** + * A test makes sure that an event time timers are correctly ordered using Create transform + * unbounded. + */ + @Test + @Category({ + ValidatesRunner.class, + UsesTimersInParDo.class, + UsesStatefulParDo.class, + UsesUnboundedPCollections.class, + UsesStrictTimerOrdering.class + }) + public void testEventTimeTimerOrderingWithCreateUnbounded() throws Exception { + final int numTestElements = 100; + final Instant now = new Instant(1500000000000L); + + List>> elements = new ArrayList<>(); + for (int i = 0; i < numTestElements; i++) { + elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i * 1000))); + } + + testEventTimeTimerOrderingWithInputPTransform( + now, numTestElements, Create.timestamped(elements), IsBounded.UNBOUNDED); } private void testEventTimeTimerOrderingWithInputPTransform( Instant now, int numTestElements, - PTransform>> transform) + PTransform>> transform, + IsBounded isBounded) throws Exception { final String timerIdBagAppend = "append"; @@ -4070,7 +4096,8 @@ public void onTimer( } }; - PCollection output = pipeline.apply(transform).apply(ParDo.of(fn)); + PCollection output = + pipeline.apply(transform).setIsBoundedInternal(isBounded).apply(ParDo.of(fn)); List expected = IntStream.rangeClosed(0, numTestElements) .mapToObj(expandFn(numTestElements)) @@ -4154,16 +4181,25 @@ public void testTwoTimersSettingEachOther() { TestStream.create(KvCoder.of(VoidCoder.of(), VoidCoder.of())) .addElements(KV.of(null, null)) .advanceWatermarkToInfinity(); - pipeline.apply(TwoTimerTest.of(now, end, input)); + pipeline.apply(TwoTimerTest.of(now, end, input, IsBounded.BOUNDED)); + pipeline.run(); + } + + @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class}) + public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() { + Instant now = new Instant(1500000000000L); + Instant end = now.plus(100); + pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), IsBounded.BOUNDED)); pipeline.run(); } @Test @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class}) - public void testTwoTimersSettingEachOtherWithCreateAsInput() { + public void testTwoTimersSettingEachOtherWithCreateAsInputUnbounded() { Instant now = new Instant(1500000000000L); Instant end = now.plus(100); - pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)))); + pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), IsBounded.UNBOUNDED)); pipeline.run(); } @@ -4337,18 +4373,26 @@ public void onTimer( private static class TwoTimerTest extends PTransform { private static PTransform of( - Instant start, Instant end, PTransform>> input) { - return new TwoTimerTest(start, end, input); + Instant start, + Instant end, + PTransform>> input, + IsBounded isBounded) { + return new TwoTimerTest(start, end, input, isBounded); } private final Instant start; private final Instant end; + private final IsBounded isBounded; private final transient PTransform>> inputPTransform; public TwoTimerTest( - Instant start, Instant end, PTransform>> input) { + Instant start, + Instant end, + PTransform>> input, + IsBounded isBounded) { this.start = start; this.end = end; + this.isBounded = isBounded; this.inputPTransform = input; } @@ -4361,6 +4405,7 @@ public PDone expand(PBegin input) { PCollection result = input .apply(inputPTransform) + .setIsBoundedInternal(isBounded) .apply( ParDo.of( new DoFn, String>() { @@ -4425,7 +4470,7 @@ public void onTimer2( })); List expected = - LongStream.rangeClosed(0, 100) + LongStream.rangeClosed(0, end.minus(start.getMillis()).getMillis()) .mapToObj(e -> (Long) e) .flatMap(e -> Arrays.asList("t1:" + e + ":" + e, "t2:" + e + ":" + e).stream()) .collect(Collectors.toList());