diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index e623d3373a93..50d17c108f2e 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 1, + "modification": 2, } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 9e82343474c6..48d5e0ecf1f1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1186,6 +1186,10 @@ void stop() { } private void onCompleteCommit(CompleteCommit completeCommit) { + if (completeCommit.status() == Windmill.CommitStatus.OK + && !completeCommit.finalizeIds().isEmpty()) { + streamingWorkScheduler.queueAppliedFinalizeIds(completeCommit.finalizeIds()); + } if (completeCommit.status() != Windmill.CommitStatus.OK) { readerCache.invalidateReader( WindmillComputationKey.create( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/CompleteCommit.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/CompleteCommit.java index e33e853d3d76..cd2ec76df0d2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/CompleteCommit.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/CompleteCommit.java @@ -24,6 +24,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; /** * A {@link Commit} is marked as complete when it has been attempted to be committed back to @@ -38,19 +39,24 @@ public abstract class CompleteCommit { public static CompleteCommit create(Commit commit, CommitStatus commitStatus) { - return new AutoValue_CompleteCommit( + return create( commit.computationId(), ShardedKey.create(commit.request().getKey(), commit.request().getShardingKey()), WorkId.builder() .setWorkToken(commit.request().getWorkToken()) .setCacheToken(commit.request().getCacheToken()) .build(), - commitStatus); + commitStatus, + ImmutableList.copyOf(commit.request().getFinalizeIdsList())); } public static CompleteCommit create( - String computationId, ShardedKey shardedKey, WorkId workId, CommitStatus status) { - return new AutoValue_CompleteCommit(computationId, shardedKey, workId, status); + String computationId, + ShardedKey shardedKey, + WorkId workId, + CommitStatus status, + ImmutableList finalizeIds) { + return new AutoValue_CompleteCommit(computationId, shardedKey, workId, status, finalizeIds); } public static CompleteCommit forFailedWork(Commit commit) { @@ -64,4 +70,6 @@ public static CompleteCommit forFailedWork(Commit commit) { public abstract WorkId workId(); public abstract CommitStatus status(); + + public abstract ImmutableList finalizeIds(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java index 20b95b0661d0..fe4fc7ebaaa7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java @@ -32,6 +32,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -155,7 +156,8 @@ private void completeWork( .setCacheToken(workRequest.getCacheToken()) .setWorkToken(workRequest.getWorkToken()) .build(), - Windmill.CommitStatus.OK)); + Windmill.CommitStatus.OK, + ImmutableList.copyOf(workRequest.getFinalizeIdsList()))); } } }