From 9c1dd6a5e11eff6b450025ef1b82069f66d616a2 Mon Sep 17 00:00:00 2001 From: Rehman Date: Fri, 5 Jun 2020 00:52:18 +0500 Subject: [PATCH 1/6] validated timer ordering --- .../worker/StreamingModeExecutionContext.java | 21 ++++++++++++++++--- .../worker/WindmillTimerInternals.java | 12 +++++++++++ .../apache/beam/sdk/transforms/ParDoTest.java | 10 ++++++--- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index cfaf950a2b21..42e7639030ef 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -24,10 +24,12 @@ import com.google.api.services.dataflow.model.SideInputInfo; import java.io.Closeable; import java.io.IOException; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; @@ -522,6 +524,7 @@ public void start( this.cachedFiredTimers = null; this.cachedFiredUserTimers = null; + this.toBeFiredTimersOrdered.clear(); } public void flushState() { @@ -564,10 +567,13 @@ public TimerData getNextFiredTimer(Coder windowCode // Lazily initialized private Iterator cachedFiredUserTimers = null; + private PriorityQueue toBeFiredTimersOrdered = + new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp)); + public TimerData getNextFiredUserTimer(Coder windowCoder) { if (cachedFiredUserTimers == null) { cachedFiredUserTimers = - FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers()) + FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers()) .filter( timer -> WindmillTimerInternals.isUserTimer(timer) @@ -577,12 +583,21 @@ public TimerData getNextFiredUserTimer(Coder window WindmillTimerInternals.windmillTimerToTimerData( WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder)) .iterator(); + + cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add); + } + + Instant currentInputWatermark = userTimerInternals.currentInputWatermarkTime(); + if (userTimerInternals.hasTimerBefore(currentInputWatermark)) { + while (!toBeFiredTimersOrdered.isEmpty()) { + userTimerInternals.setTimer(toBeFiredTimersOrdered.poll()); + } } - if (!cachedFiredUserTimers.hasNext()) { + if (toBeFiredTimersOrdered.isEmpty()) { return null; } - TimerData nextTimer = cachedFiredUserTimers.next(); + TimerData nextTimer = toBeFiredTimersOrdered.poll(); // User timers must be explicitly deleted when delivered, to release the implied hold userTimerInternals.deleteTimer(nextTimer); return nextTimer; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index 4256294f7e33..8be2be7cebf4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -227,6 +227,18 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { timers.clear(); } + public boolean hasTimerBefore(Instant time) { + for (Cell cell : timerStillPresent.cellSet()) { + TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey()); + if (cell.getValue()) { + if (timerData.getTimestamp().isBefore(time)) { + return true; + } + } + } + return false; + } + private boolean needsWatermarkHold(TimerData timerData) { // If it is a user timer or a system timer with outputTimestamp different than timestamp return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index c3a190f904c3..34d0d0b40fb4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -139,6 +139,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.hamcrest.Matchers; @@ -3938,10 +3939,11 @@ public void testEventTimeTimerOrdering() throws Exception { ValidatesRunner.class, UsesTimersInParDo.class, UsesStatefulParDo.class, + UsesUnboundedPCollections.class, UsesStrictTimerOrdering.class }) public void testEventTimeTimerOrderingWithCreate() throws Exception { - final int numTestElements = 100; + final int numTestElements = 5; final Instant now = new Instant(1500000000000L); List>> elements = new ArrayList<>(); @@ -4009,6 +4011,7 @@ public void onTimer( List> flush = new ArrayList<>(); Instant flushTime = context.timestamp(); + int bagSize = Iterators.size(bagState.read().iterator()); for (TimestampedValue val : bagState.read()) { if (!val.getTimestamp().isAfter(flushTime)) { flush.add(val); @@ -4018,7 +4021,7 @@ public void onTimer( context.output( Joiner.on(":").join(flush.stream().map(TimestampedValue::getValue).iterator())); Instant newMinStamp = flushTime.plus(1); - if (flush.size() < numTestElements) { + if (flush.size() < numTestElements && bagSize > 0) { timer.set(newMinStamp); } } @@ -4040,7 +4043,8 @@ public void onTimer( } }; - PCollection output = pipeline.apply(transform).apply(ParDo.of(fn)); + PCollection output = + pipeline.apply(transform).setIsBoundedInternal(IsBounded.UNBOUNDED).apply(ParDo.of(fn)); List expected = IntStream.rangeClosed(0, numTestElements) .mapToObj(expandFn(numTestElements)) From b73c77003945073d69485692dc6df05ac570b063 Mon Sep 17 00:00:00 2001 From: Rehman Date: Fri, 5 Jun 2020 01:04:09 +0500 Subject: [PATCH 2/6] update changes.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index f6e455113354..6a5cc6767f44 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -101,6 +101,7 @@ * Upgrade Sphinx to 3.0.3 for building PyDoc. * Added a PTransform for image annotation using Google Cloud AI image processing service ([BEAM-9646](https://issues.apache.org/jira/browse/BEAM-9646)) +* Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)). ## Breaking Changes From fba4c3d90302eede9a9741938de06b6260f158fc Mon Sep 17 00:00:00 2001 From: Rehman Date: Tue, 9 Jun 2020 18:53:55 +0500 Subject: [PATCH 3/6] Strict timer ordering without resetting timer --- .../worker/StreamingModeExecutionContext.java | 94 ++++++++++++++----- .../worker/WindmillTimerInternals.java | 4 +- .../apache/beam/sdk/transforms/ParDoTest.java | 90 +++++++++++++++--- 3 files changed, 146 insertions(+), 42 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 42e7639030ef..2fe183243ac4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -24,20 +24,12 @@ import com.google.api.services.dataflow.model.SideInputInfo; import java.io.Closeable; import java.io.IOException; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Set; +import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; -import org.apache.beam.runners.core.SideInputReader; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateNamespaces; -import org.apache.beam.runners.core.TimerInternals; + +import org.apache.beam.runners.core.*; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; @@ -57,12 +49,14 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -523,8 +517,7 @@ public void start( synchronizedProcessingTime); this.cachedFiredTimers = null; - this.cachedFiredUserTimers = null; - this.toBeFiredTimersOrdered.clear(); + this.toBeFiredTimersOrdered = null; } public void flushState() { @@ -564,16 +557,17 @@ public TimerData getNextFiredTimer(Coder windowCode return nextTimer; } - // Lazily initialized - private Iterator cachedFiredUserTimers = null; - private PriorityQueue toBeFiredTimersOrdered = - new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp)); + private PriorityQueue toBeFiredTimersOrdered = null; + + + private Map firedTimer = new HashMap<>(); public TimerData getNextFiredUserTimer(Coder windowCoder) { - if (cachedFiredUserTimers == null) { - cachedFiredUserTimers = - FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers()) + if (toBeFiredTimersOrdered == null) { + LOG.info("initializing queue"); + toBeFiredTimersOrdered = new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp)); + FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers()) .filter( timer -> WindmillTimerInternals.isUserTimer(timer) @@ -582,22 +576,72 @@ public TimerData getNextFiredUserTimer(Coder window timer -> WindmillTimerInternals.windmillTimerToTimerData( WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder)) - .iterator(); - - cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add); + .iterator() + .forEachRemaining(toBeFiredTimersOrdered::add); } + LOG.info("Queue content: " + Arrays.toString(toBeFiredTimersOrdered.toArray())); + + LOG.info("timer Internals timers: " + userTimerInternals.timers); + Instant currentInputWatermark = userTimerInternals.currentInputWatermarkTime(); if (userTimerInternals.hasTimerBefore(currentInputWatermark)) { - while (!toBeFiredTimersOrdered.isEmpty()) { - userTimerInternals.setTimer(toBeFiredTimersOrdered.poll()); + LOG.info("Yaya!! timer contains updates "); + + for (Table.Cell cell : userTimerInternals.timerStillPresent.cellSet()) { + TimerData timerData = userTimerInternals.timers.get(cell.getRowKey(), cell.getColumnKey()); + if (cell.getValue()) { + LOG.info("Adding timer in queue: " + timerData); + //firedTimer.put(timerData.getTimerId() + '+' + timerData.getTimerFamilyId(), timerData.getTimestamp()); + toBeFiredTimersOrdered.add(timerData); + + } } + } + + + /* + TimerData nextTimer = null; + while (!toBeFiredTimersOrdered.isEmpty()){ + nextTimer = toBeFiredTimersOrdered.poll(); + if(firedTimer.containsKey(nextTimer.getTimerId())){ + LOG.info("Timer Already fired betaa" + nextTimer); + userTimerInternals.deleteTimer(nextTimer); + nextTimer = null; + } + else { + break; + } + } + + if (nextTimer == null) { + return null; + } +*/ + if (toBeFiredTimersOrdered.isEmpty()) { return null; } + TimerData nextTimer = toBeFiredTimersOrdered.poll(); + LOG.info( + "Polled timer: Id: " + + nextTimer.getTimerId() + + " timestamp: " + + nextTimer.getTimestamp()); + LOG.info("Queue content after timer poll : " + Arrays.toString(toBeFiredTimersOrdered.toArray())); + + String timerUniqueId = nextTimer.getTimerId() + '+' + nextTimer.getTimerFamilyId(); + if(firedTimer.containsKey(timerUniqueId)){ + Instant instant = firedTimer.get(timerUniqueId); + LOG.info("Timer " + timerUniqueId +" already fired with timestamp: " + instant); + } + + + firedTimer.put(nextTimer.getTimerId() + '+' + nextTimer.getTimerFamilyId(), nextTimer.getTimestamp()); + // User timers must be explicitly deleted when delivered, to release the implied hold userTimerInternals.deleteTimer(nextTimer); return nextTimer; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index 8be2be7cebf4..b6d8bebba9b3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -54,10 +54,10 @@ class WindmillTimerInternals implements TimerInternals { // though technically in Windmill this is only enforced per ID and namespace // and TimeDomain. This TimerInternals is scoped to a step and key, shared // across namespaces. - private Table timers = HashBasedTable.create(); + public Table timers = HashBasedTable.create(); // Map from timer id to whether it is to be deleted or set - private Table timerStillPresent = HashBasedTable.create(); + public Table timerStillPresent = HashBasedTable.create(); private Instant inputDataWatermark; private Instant processingTime; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 34d0d0b40fb4..412380fa4098 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -153,6 +153,8 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Tests for ParDo. */ public class ParDoTest implements Serializable { @@ -3930,11 +3932,35 @@ public void testEventTimeTimerOrdering() throws Exception { } testEventTimeTimerOrderingWithInputPTransform( - now, numTestElements, builder.advanceWatermarkToInfinity()); + now, numTestElements, builder.advanceWatermarkToInfinity(), false); } /** A test makes sure that an event time timers are correctly ordered using Create transform. */ @Test + @Category({ + ValidatesRunner.class, + UsesTimersInParDo.class, + UsesStatefulParDo.class, + UsesStrictTimerOrdering.class + }) + public void testEventTimeTimerOrderingWithCreateBounded() throws Exception { + final int numTestElements = 100; + final Instant now = new Instant(1500000000000L); + + List>> elements = new ArrayList<>(); + for (int i = 0; i < numTestElements; i++) { + elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i))); + } + + testEventTimeTimerOrderingWithInputPTransform( + now, numTestElements, Create.timestamped(elements), false); + } + + /** + * A test makes sure that an event time timers are correctly ordered using Create transform + * unbounded. + */ + @Test @Category({ ValidatesRunner.class, UsesTimersInParDo.class, @@ -3942,8 +3968,8 @@ public void testEventTimeTimerOrdering() throws Exception { UsesUnboundedPCollections.class, UsesStrictTimerOrdering.class }) - public void testEventTimeTimerOrderingWithCreate() throws Exception { - final int numTestElements = 5; + public void testEventTimeTimerOrderingWithCreateUnbounded() throws Exception { + final int numTestElements = 100; final Instant now = new Instant(1500000000000L); List>> elements = new ArrayList<>(); @@ -3952,13 +3978,14 @@ public void testEventTimeTimerOrderingWithCreate() throws Exception { } testEventTimeTimerOrderingWithInputPTransform( - now, numTestElements, Create.timestamped(elements)); + now, numTestElements, Create.timestamped(elements), true); } private void testEventTimeTimerOrderingWithInputPTransform( Instant now, int numTestElements, - PTransform>> transform) + PTransform>> transform, + boolean isStreaming) throws Exception { final String timerIdBagAppend = "append"; @@ -3966,6 +3993,7 @@ private void testEventTimeTimerOrderingWithInputPTransform( final String bag = "bag"; final String minTimestamp = "minTs"; final Instant gcTimerStamp = now.plus(numTestElements + 1); + Logger log = LoggerFactory.getLogger(ParDoTest.class); DoFn, String> fn = new DoFn, String>() { @@ -3996,6 +4024,7 @@ public void processElement( if (currentMinStamp.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { gcTimer.set(gcTimerStamp); } + if (currentMinStamp.isAfter(context.timestamp())) { minStampState.write(context.timestamp()); bagTimer.set(context.timestamp()); @@ -4009,6 +4038,7 @@ public void onTimer( @TimerId(timerIdBagAppend) Timer timer, @StateId(bag) BagState> bagState) { + log.info("OntimerAppend: timestamp: " + context.timestamp()); List> flush = new ArrayList<>(); Instant flushTime = context.timestamp(); int bagSize = Iterators.size(bagState.read().iterator()); @@ -4018,6 +4048,7 @@ public void onTimer( } } flush.sort(Comparator.comparing(TimestampedValue::getTimestamp)); + log.info("flush list size: " + flush.size()); context.output( Joiner.on(":").join(flush.stream().map(TimestampedValue::getValue).iterator())); Instant newMinStamp = flushTime.plus(1); @@ -4030,6 +4061,7 @@ public void onTimer( public void onTimer( OnTimerContext context, @StateId(bag) BagState> bagState) { + log.info("OntimerGC: timestamp: " + context.timestamp()); String output = Joiner.on(":") .join( @@ -4044,7 +4076,10 @@ public void onTimer( }; PCollection output = - pipeline.apply(transform).setIsBoundedInternal(IsBounded.UNBOUNDED).apply(ParDo.of(fn)); + pipeline + .apply(transform) + .setIsBoundedInternal(isStreaming ? IsBounded.UNBOUNDED : IsBounded.BOUNDED) + .apply(ParDo.of(fn)); List expected = IntStream.rangeClosed(0, numTestElements) .mapToObj(expandFn(numTestElements)) @@ -4123,21 +4158,30 @@ public void duplicateTimerSetting() { }) public void testTwoTimersSettingEachOther() { Instant now = new Instant(1500000000000L); - Instant end = now.plus(100); + Instant end = now.plus(3); TestStream> input = TestStream.create(KvCoder.of(VoidCoder.of(), VoidCoder.of())) .addElements(KV.of(null, null)) .advanceWatermarkToInfinity(); - pipeline.apply(TwoTimerTest.of(now, end, input)); + pipeline.apply(TwoTimerTest.of(now, end, input, false)); + pipeline.run(); + } + + @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class}) + public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() { + Instant now = new Instant(1500000000000L); + Instant end = now.plus(3); + pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), false)); pipeline.run(); } @Test @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class}) - public void testTwoTimersSettingEachOtherWithCreateAsInput() { + public void testTwoTimersSettingEachOtherWithCreateAsInputUnbounded() { Instant now = new Instant(1500000000000L); - Instant end = now.plus(100); - pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)))); + Instant end = now.plus(3); + pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), true)); pipeline.run(); } @@ -4311,18 +4355,20 @@ public void onTimer( private static class TwoTimerTest extends PTransform { private static PTransform of( - Instant start, Instant end, PTransform>> input) { - return new TwoTimerTest(start, end, input); + Instant start, Instant end, PTransform>> input, boolean isStreaming) { + return new TwoTimerTest(start, end, input, isStreaming); } private final Instant start; private final Instant end; + private final boolean isStreaming; private final transient PTransform>> inputPTransform; public TwoTimerTest( - Instant start, Instant end, PTransform>> input) { + Instant start, Instant end, PTransform>> input, boolean isStreaming) { this.start = start; this.end = end; + this.isStreaming = isStreaming; this.inputPTransform = input; } @@ -4332,9 +4378,11 @@ public PDone expand(PBegin input) { final String timerName1 = "t1"; final String timerName2 = "t2"; final String countStateName = "count"; + final Logger log = LoggerFactory.getLogger(ParDoTest.class); PCollection result = input .apply(inputPTransform) + .setIsBoundedInternal(isStreaming ? IsBounded.UNBOUNDED : IsBounded.BOUNDED) .apply( ParDo.of( new DoFn, String>() { @@ -4371,6 +4419,12 @@ public void onTimer1( Integer current = state.read(); t2.set(context.timestamp()); + log.info( + "t1:" + + current + + ":" + + context.timestamp().minus(start.getMillis()).getMillis()); + context.output( "t1:" + current @@ -4390,6 +4444,12 @@ public void onTimer2( } else { state.write(-1); } + + log.info( + "t2:" + + current + + ":" + + context.timestamp().minus(start.getMillis()).getMillis()); context.output( "t2:" + current @@ -4399,7 +4459,7 @@ public void onTimer2( })); List expected = - LongStream.rangeClosed(0, 100) + LongStream.rangeClosed(0, 3) .mapToObj(e -> (Long) e) .flatMap(e -> Arrays.asList("t1:" + e + ":" + e, "t2:" + e + ":" + e).stream()) .collect(Collectors.toList()); From 032bde5a107c2ba0327eff426047b0ac083715ec Mon Sep 17 00:00:00 2001 From: Rehman Date: Tue, 9 Jun 2020 23:51:21 +0500 Subject: [PATCH 4/6] maintain strict ordering + timer reset --- .../worker/StreamingModeExecutionContext.java | 92 ++++++------------- .../worker/WindmillTimerInternals.java | 17 +++- .../apache/beam/sdk/transforms/ParDoTest.java | 42 +++------ 3 files changed, 58 insertions(+), 93 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 2fe183243ac4..8097547c3b86 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -28,7 +28,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; - import org.apache.beam.runners.core.*; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; @@ -49,14 +48,12 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString; -import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -557,90 +554,61 @@ public TimerData getNextFiredTimer(Coder windowCode return nextTimer; } - private PriorityQueue toBeFiredTimersOrdered = null; - private Map firedTimer = new HashMap<>(); public TimerData getNextFiredUserTimer(Coder windowCoder) { if (toBeFiredTimersOrdered == null) { - LOG.info("initializing queue"); + toBeFiredTimersOrdered = new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp)); FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers()) - .filter( - timer -> - WindmillTimerInternals.isUserTimer(timer) - && timer.getStateFamily().equals(stateFamily)) - .transform( - timer -> - WindmillTimerInternals.windmillTimerToTimerData( - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder)) - .iterator() - .forEachRemaining(toBeFiredTimersOrdered::add); + .filter( + timer -> + WindmillTimerInternals.isUserTimer(timer) + && timer.getStateFamily().equals(stateFamily)) + .transform( + timer -> + WindmillTimerInternals.windmillTimerToTimerData( + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder)) + .iterator() + .forEachRemaining( + timerData -> { + firedTimer.put( + timerData.getTimerId() + '+' + timerData.getTimerFamilyId(), + timerData.getTimestamp()); + toBeFiredTimersOrdered.add(timerData); + }); } - LOG.info("Queue content: " + Arrays.toString(toBeFiredTimersOrdered.toArray())); - - LOG.info("timer Internals timers: " + userTimerInternals.timers); - Instant currentInputWatermark = userTimerInternals.currentInputWatermarkTime(); if (userTimerInternals.hasTimerBefore(currentInputWatermark)) { - LOG.info("Yaya!! timer contains updates "); - - for (Table.Cell cell : userTimerInternals.timerStillPresent.cellSet()) { - TimerData timerData = userTimerInternals.timers.get(cell.getRowKey(), cell.getColumnKey()); - if (cell.getValue()) { - LOG.info("Adding timer in queue: " + timerData); - //firedTimer.put(timerData.getTimerId() + '+' + timerData.getTimerFamilyId(), timerData.getTimestamp()); - toBeFiredTimersOrdered.add(timerData); + List currentTimers = userTimerInternals.getCurrentTimers(); - } + for (TimerData timerData : currentTimers) { + firedTimer.put( + timerData.getTimerId() + '+' + timerData.getTimerFamilyId(), + timerData.getTimestamp()); + toBeFiredTimersOrdered.add(timerData); } - } - - - /* TimerData nextTimer = null; - while (!toBeFiredTimersOrdered.isEmpty()){ + + while (!toBeFiredTimersOrdered.isEmpty()) { nextTimer = toBeFiredTimersOrdered.poll(); - if(firedTimer.containsKey(nextTimer.getTimerId())){ - LOG.info("Timer Already fired betaa" + nextTimer); - userTimerInternals.deleteTimer(nextTimer); - nextTimer = null; - } - else { + String timerUniqueId = nextTimer.getTimerId() + '+' + nextTimer.getTimerFamilyId(); + if (firedTimer.containsKey(timerUniqueId) + && firedTimer.get(timerUniqueId).isEqual(nextTimer.getTimestamp())) { break; + } else { + nextTimer = null; } } if (nextTimer == null) { return null; } -*/ - - if (toBeFiredTimersOrdered.isEmpty()) { - return null; - } - - TimerData nextTimer = toBeFiredTimersOrdered.poll(); - LOG.info( - "Polled timer: Id: " - + nextTimer.getTimerId() - + " timestamp: " - + nextTimer.getTimestamp()); - LOG.info("Queue content after timer poll : " + Arrays.toString(toBeFiredTimersOrdered.toArray())); - - String timerUniqueId = nextTimer.getTimerId() + '+' + nextTimer.getTimerFamilyId(); - if(firedTimer.containsKey(timerUniqueId)){ - Instant instant = firedTimer.get(timerUniqueId); - LOG.info("Timer " + timerUniqueId +" already fired with timestamp: " + instant); - } - - - firedTimer.put(nextTimer.getTimerId() + '+' + nextTimer.getTimerFamilyId(), nextTimer.getTimestamp()); // User timers must be explicitly deleted when delivered, to release the implied hold userTimerInternals.deleteTimer(nextTimer); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index b6d8bebba9b3..628cc1a5288c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import javax.annotation.Nullable; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; @@ -54,10 +56,10 @@ class WindmillTimerInternals implements TimerInternals { // though technically in Windmill this is only enforced per ID and namespace // and TimeDomain. This TimerInternals is scoped to a step and key, shared // across namespaces. - public Table timers = HashBasedTable.create(); + private Table timers = HashBasedTable.create(); // Map from timer id to whether it is to be deleted or set - public Table timerStillPresent = HashBasedTable.create(); + private Table timerStillPresent = HashBasedTable.create(); private Instant inputDataWatermark; private Instant processingTime; @@ -239,6 +241,17 @@ public boolean hasTimerBefore(Instant time) { return false; } + public List getCurrentTimers() { + List timerDataList = new ArrayList<>(); + for (Cell cell : timerStillPresent.cellSet()) { + TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey()); + if (cell.getValue()) { + timerDataList.add(timerData); + } + } + return timerDataList; + } + private boolean needsWatermarkHold(TimerData timerData) { // If it is a user timer or a system timer with outputTimestamp different than timestamp return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 412380fa4098..ac04c9ca45ce 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -139,7 +139,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.hamcrest.Matchers; @@ -153,8 +152,6 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Tests for ParDo. */ public class ParDoTest implements Serializable { @@ -3993,7 +3990,6 @@ private void testEventTimeTimerOrderingWithInputPTransform( final String bag = "bag"; final String minTimestamp = "minTs"; final Instant gcTimerStamp = now.plus(numTestElements + 1); - Logger log = LoggerFactory.getLogger(ParDoTest.class); DoFn, String> fn = new DoFn, String>() { @@ -4024,7 +4020,6 @@ public void processElement( if (currentMinStamp.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { gcTimer.set(gcTimerStamp); } - if (currentMinStamp.isAfter(context.timestamp())) { minStampState.write(context.timestamp()); bagTimer.set(context.timestamp()); @@ -4038,21 +4033,18 @@ public void onTimer( @TimerId(timerIdBagAppend) Timer timer, @StateId(bag) BagState> bagState) { - log.info("OntimerAppend: timestamp: " + context.timestamp()); List> flush = new ArrayList<>(); Instant flushTime = context.timestamp(); - int bagSize = Iterators.size(bagState.read().iterator()); for (TimestampedValue val : bagState.read()) { if (!val.getTimestamp().isAfter(flushTime)) { flush.add(val); } } flush.sort(Comparator.comparing(TimestampedValue::getTimestamp)); - log.info("flush list size: " + flush.size()); context.output( Joiner.on(":").join(flush.stream().map(TimestampedValue::getValue).iterator())); Instant newMinStamp = flushTime.plus(1); - if (flush.size() < numTestElements && bagSize > 0) { + if (flush.size() < numTestElements) { timer.set(newMinStamp); } } @@ -4061,7 +4053,6 @@ public void onTimer( public void onTimer( OnTimerContext context, @StateId(bag) BagState> bagState) { - log.info("OntimerGC: timestamp: " + context.timestamp()); String output = Joiner.on(":") .join( @@ -4158,7 +4149,7 @@ public void duplicateTimerSetting() { }) public void testTwoTimersSettingEachOther() { Instant now = new Instant(1500000000000L); - Instant end = now.plus(3); + Instant end = now.plus(100); TestStream> input = TestStream.create(KvCoder.of(VoidCoder.of(), VoidCoder.of())) .addElements(KV.of(null, null)) @@ -4171,7 +4162,7 @@ public void testTwoTimersSettingEachOther() { @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class}) public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() { Instant now = new Instant(1500000000000L); - Instant end = now.plus(3); + Instant end = now.plus(100); pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), false)); pipeline.run(); } @@ -4180,7 +4171,7 @@ public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() { @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class}) public void testTwoTimersSettingEachOtherWithCreateAsInputUnbounded() { Instant now = new Instant(1500000000000L); - Instant end = now.plus(3); + Instant end = now.plus(100); pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), true)); pipeline.run(); } @@ -4355,7 +4346,10 @@ public void onTimer( private static class TwoTimerTest extends PTransform { private static PTransform of( - Instant start, Instant end, PTransform>> input, boolean isStreaming) { + Instant start, + Instant end, + PTransform>> input, + boolean isStreaming) { return new TwoTimerTest(start, end, input, isStreaming); } @@ -4365,7 +4359,10 @@ private static PTransform of( private final transient PTransform>> inputPTransform; public TwoTimerTest( - Instant start, Instant end, PTransform>> input, boolean isStreaming) { + Instant start, + Instant end, + PTransform>> input, + boolean isStreaming) { this.start = start; this.end = end; this.isStreaming = isStreaming; @@ -4378,7 +4375,6 @@ public PDone expand(PBegin input) { final String timerName1 = "t1"; final String timerName2 = "t2"; final String countStateName = "count"; - final Logger log = LoggerFactory.getLogger(ParDoTest.class); PCollection result = input .apply(inputPTransform) @@ -4419,12 +4415,6 @@ public void onTimer1( Integer current = state.read(); t2.set(context.timestamp()); - log.info( - "t1:" - + current - + ":" - + context.timestamp().minus(start.getMillis()).getMillis()); - context.output( "t1:" + current @@ -4444,12 +4434,6 @@ public void onTimer2( } else { state.write(-1); } - - log.info( - "t2:" - + current - + ":" - + context.timestamp().minus(start.getMillis()).getMillis()); context.output( "t2:" + current @@ -4459,7 +4443,7 @@ public void onTimer2( })); List expected = - LongStream.rangeClosed(0, 3) + LongStream.rangeClosed(0, end.minus(start.getMillis()).getMillis()) .mapToObj(e -> (Long) e) .flatMap(e -> Arrays.asList("t1:" + e + ":" + e, "t2:" + e + ":" + e).stream()) .collect(Collectors.toList()); From abd93ecde23701e0d0c63bfaaf10785ca0be332a Mon Sep 17 00:00:00 2001 From: Rehman Date: Wed, 10 Jun 2020 21:14:41 +0500 Subject: [PATCH 5/6] adding comments --- .../runners/dataflow/worker/StreamingModeExecutionContext.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 8097547c3b86..90263f5d6e3a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -556,6 +556,7 @@ public TimerData getNextFiredTimer(Coder windowCode private PriorityQueue toBeFiredTimersOrdered = null; + // to track if timer is reset earlier mid-bundle. private Map firedTimer = new HashMap<>(); public TimerData getNextFiredUserTimer(Coder windowCoder) { @@ -595,6 +596,7 @@ public TimerData getNextFiredUserTimer(Coder window TimerData nextTimer = null; + // fire timer only if its timestamp matched. Else it is either reset or obsolete. while (!toBeFiredTimersOrdered.isEmpty()) { nextTimer = toBeFiredTimersOrdered.poll(); String timerUniqueId = nextTimer.getTimerId() + '+' + nextTimer.getTimerFamilyId(); From 10487f0a4097bc51a55bf26256fed61f7ae25532 Mon Sep 17 00:00:00 2001 From: Rehman Date: Wed, 29 Jul 2020 18:49:22 +0500 Subject: [PATCH 6/6] Replace isStreaming with isBounded --- .../worker/StreamingModeExecutionContext.java | 5 +++ .../worker/WindmillTimerInternals.java | 1 - .../apache/beam/sdk/transforms/ParDoTest.java | 33 +++++++++---------- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 83361f6ff935..334f145d5ede 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -24,10 +24,12 @@ import com.google.api.services.dataflow.model.SideInputInfo; import java.io.Closeable; import java.io.IOException; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; @@ -562,6 +564,8 @@ public TimerData getNextFiredTimer(Coder windowCode private PriorityQueue toBeFiredTimersOrdered = null; // to track if timer is reset earlier mid-bundle. + // Map of timer's id to timer's firing time to check + // the actual firing time of a timer. private Map firedTimer = new HashMap<>(); public TimerData getNextFiredUserTimer(Coder windowCoder) { @@ -588,6 +592,7 @@ public TimerData getNextFiredUserTimer(Coder window } Instant currentInputWatermark = userTimerInternals.currentInputWatermarkTime(); + if (userTimerInternals.hasTimerBefore(currentInputWatermark)) { List currentTimers = userTimerInternals.getCurrentTimers(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index aeebf5754b67..5269cf29ea67 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import javax.annotation.Nullable; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 6823cfadcdf1..776fd3e26218 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -3959,7 +3959,7 @@ public void testEventTimeTimerOrdering() throws Exception { } testEventTimeTimerOrderingWithInputPTransform( - now, numTestElements, builder.advanceWatermarkToInfinity(), false); + now, numTestElements, builder.advanceWatermarkToInfinity(), IsBounded.BOUNDED); } /** A test makes sure that an event time timers are correctly ordered using Create transform. */ @@ -3980,7 +3980,7 @@ public void testEventTimeTimerOrderingWithCreateBounded() throws Exception { } testEventTimeTimerOrderingWithInputPTransform( - now, numTestElements, Create.timestamped(elements), false); + now, numTestElements, Create.timestamped(elements), IsBounded.BOUNDED); } /** @@ -4001,18 +4001,18 @@ public void testEventTimeTimerOrderingWithCreateUnbounded() throws Exception { List>> elements = new ArrayList<>(); for (int i = 0; i < numTestElements; i++) { - elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i))); + elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i * 1000))); } testEventTimeTimerOrderingWithInputPTransform( - now, numTestElements, Create.timestamped(elements), true); + now, numTestElements, Create.timestamped(elements), IsBounded.UNBOUNDED); } private void testEventTimeTimerOrderingWithInputPTransform( Instant now, int numTestElements, PTransform>> transform, - boolean isStreaming) + IsBounded isBounded) throws Exception { final String timerIdBagAppend = "append"; @@ -4097,10 +4097,7 @@ public void onTimer( }; PCollection output = - pipeline - .apply(transform) - .setIsBoundedInternal(isStreaming ? IsBounded.UNBOUNDED : IsBounded.BOUNDED) - .apply(ParDo.of(fn)); + pipeline.apply(transform).setIsBoundedInternal(isBounded).apply(ParDo.of(fn)); List expected = IntStream.rangeClosed(0, numTestElements) .mapToObj(expandFn(numTestElements)) @@ -4184,7 +4181,7 @@ public void testTwoTimersSettingEachOther() { TestStream.create(KvCoder.of(VoidCoder.of(), VoidCoder.of())) .addElements(KV.of(null, null)) .advanceWatermarkToInfinity(); - pipeline.apply(TwoTimerTest.of(now, end, input, false)); + pipeline.apply(TwoTimerTest.of(now, end, input, IsBounded.BOUNDED)); pipeline.run(); } @@ -4193,7 +4190,7 @@ public void testTwoTimersSettingEachOther() { public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() { Instant now = new Instant(1500000000000L); Instant end = now.plus(100); - pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), false)); + pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), IsBounded.BOUNDED)); pipeline.run(); } @@ -4202,7 +4199,7 @@ public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() { public void testTwoTimersSettingEachOtherWithCreateAsInputUnbounded() { Instant now = new Instant(1500000000000L); Instant end = now.plus(100); - pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), true)); + pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), IsBounded.UNBOUNDED)); pipeline.run(); } @@ -4379,23 +4376,23 @@ private static PTransform of( Instant start, Instant end, PTransform>> input, - boolean isStreaming) { - return new TwoTimerTest(start, end, input, isStreaming); + IsBounded isBounded) { + return new TwoTimerTest(start, end, input, isBounded); } private final Instant start; private final Instant end; - private final boolean isStreaming; + private final IsBounded isBounded; private final transient PTransform>> inputPTransform; public TwoTimerTest( Instant start, Instant end, PTransform>> input, - boolean isStreaming) { + IsBounded isBounded) { this.start = start; this.end = end; - this.isStreaming = isStreaming; + this.isBounded = isBounded; this.inputPTransform = input; } @@ -4408,7 +4405,7 @@ public PDone expand(PBegin input) { PCollection result = input .apply(inputPTransform) - .setIsBoundedInternal(isStreaming ? IsBounded.UNBOUNDED : IsBounded.BOUNDED) + .setIsBoundedInternal(isBounded) .apply( ParDo.of( new DoFn, String>() {