Skip to content

Commit 782a78b

Browse files
[CherryPick #30229] [Dataflow Streaming] Invalidate caches and remove work on failure before commit (#30234)
* Invalidate caches and remove work on failure before commit * Prevent completeWorkAndScheduleNextWorkForKey from throwing --------- Co-authored-by: Arun Pandian <pandiana@google.com>
1 parent c726525 commit 782a78b

File tree

2 files changed

+18
-14
lines changed

2 files changed

+18
-14
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,12 +1385,21 @@ private void commitLoop() {
13851385
// Adds the commit to the commitStream if it fits, returning true iff it is consumed.
13861386
private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) {
13871387
Preconditions.checkNotNull(commit);
1388+
final ComputationState state = commit.computationState();
1389+
final Windmill.WorkItemCommitRequest request = commit.request();
13881390
// Drop commits for failed work. Such commits will be dropped by Windmill anyway.
13891391
if (commit.work().isFailed()) {
1392+
readerCache.invalidateReader(
1393+
WindmillComputationKey.create(
1394+
state.getComputationId(), request.getKey(), request.getShardingKey()));
1395+
stateCache
1396+
.forComputation(state.getComputationId())
1397+
.invalidate(request.getKey(), request.getShardingKey());
1398+
state.completeWorkAndScheduleNextWorkForKey(
1399+
ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken());
13901400
return true;
13911401
}
1392-
final ComputationState state = commit.computationState();
1393-
final Windmill.WorkItemCommitRequest request = commit.request();
1402+
13941403
final int size = commit.getSize();
13951404
commit.work().setState(Work.State.COMMITTING);
13961405
activeCommitBytes.addAndGet(size);
@@ -1407,8 +1416,6 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream)
14071416
.invalidate(request.getKey(), request.getShardingKey());
14081417
}
14091418
activeCommitBytes.addAndGet(-size);
1410-
// This may throw an exception if the commit was not active, which is possible if it
1411-
// was deemed stuck.
14121419
state.completeWorkAndScheduleNextWorkForKey(
14131420
ShardedKey.create(request.getKey(), request.getShardingKey()),
14141421
request.getWorkToken());

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -188,16 +188,13 @@ synchronized Optional<Work> completeWorkAndGetNextWorkForKey(
188188

189189
private synchronized void removeCompletedWorkFromQueue(
190190
Queue<Work> workQueue, ShardedKey shardedKey, long workToken) {
191-
// avoid Preconditions.checkState here to prevent eagerly evaluating the
192-
// format string parameters for the error message.
193-
Work completedWork =
194-
Optional.ofNullable(workQueue.peek())
195-
.orElseThrow(
196-
() ->
197-
new IllegalStateException(
198-
String.format(
199-
"Active key %s without work, expected token %d",
200-
shardedKey, workToken)));
191+
Work completedWork = workQueue.peek();
192+
if (completedWork == null) {
193+
// Work may have been completed due to clearing of stuck commits.
194+
LOG.warn(
195+
String.format("Active key %s without work, expected token %d", shardedKey, workToken));
196+
return;
197+
}
201198

202199
if (completedWork.getWorkItem().getWorkToken() != workToken) {
203200
// Work may have been completed due to clearing of stuck commits.

0 commit comments

Comments
 (0)