Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 1,
"modification": 2,
}
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,10 @@ void stop() {
}

private void onCompleteCommit(CompleteCommit completeCommit) {
if (completeCommit.status() == Windmill.CommitStatus.OK
&& !completeCommit.finalizeIds().isEmpty()) {
streamingWorkScheduler.queueAppliedFinalizeIds(completeCommit.finalizeIds());
}
if (completeCommit.status() != Windmill.CommitStatus.OK) {
readerCache.invalidateReader(
WindmillComputationKey.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

/**
* A {@link Commit} is marked as complete when it has been attempted to be committed back to
Expand All @@ -38,19 +39,24 @@
public abstract class CompleteCommit {

public static CompleteCommit create(Commit commit, CommitStatus commitStatus) {
return new AutoValue_CompleteCommit(
return create(
commit.computationId(),
ShardedKey.create(commit.request().getKey(), commit.request().getShardingKey()),
WorkId.builder()
.setWorkToken(commit.request().getWorkToken())
.setCacheToken(commit.request().getCacheToken())
.build(),
commitStatus);
commitStatus,
ImmutableList.copyOf(commit.request().getFinalizeIdsList()));
}

public static CompleteCommit create(
String computationId, ShardedKey shardedKey, WorkId workId, CommitStatus status) {
return new AutoValue_CompleteCommit(computationId, shardedKey, workId, status);
String computationId,
ShardedKey shardedKey,
WorkId workId,
CommitStatus status,
ImmutableList<Long> finalizeIds) {
return new AutoValue_CompleteCommit(computationId, shardedKey, workId, status, finalizeIds);
}

public static CompleteCommit forFailedWork(Commit commit) {
Expand All @@ -64,4 +70,6 @@ public static CompleteCommit forFailedWork(Commit commit) {
public abstract WorkId workId();

public abstract CommitStatus status();

public abstract ImmutableList<Long> finalizeIds();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -155,7 +156,8 @@ private void completeWork(
.setCacheToken(workRequest.getCacheToken())
.setWorkToken(workRequest.getWorkToken())
.build(),
Windmill.CommitStatus.OK));
Windmill.CommitStatus.OK,
ImmutableList.copyOf(workRequest.getFinalizeIdsList())));
}
}
}
Expand Down
Loading