Skip to content

Commit 1c974aa

Browse files
lostluckclmccart
andauthored
[Cherry Pick #29979] Fix race condition in Dataflow sampler. (#30144)
* make getProcessingDistributionsForWorkId synchronized to avoid race conditions with remove tracker * correctly access the concurrent hashmap to avoid race conditions --------- Co-authored-by: Claire McCarthy <clairemccarthy@google.com>
1 parent d1d3457 commit 1c974aa

File tree

1 file changed

+3
-6
lines changed

1 file changed

+3
-6
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,10 @@ public Optional<ActiveMessageMetadata> getActiveMessageMetadataForWorkId(String
118118
}
119119

120120
public Map<String, IntSummaryStatistics> getProcessingDistributionsForWorkId(String workId) {
121-
if (!activeTrackersByWorkId.containsKey(workId)) {
122-
if (completedProcessingMetrics.containsKey(workId)) {
123-
return completedProcessingMetrics.get(workId);
124-
}
125-
return new HashMap<>();
126-
}
127121
DataflowExecutionStateTracker tracker = activeTrackersByWorkId.get(workId);
122+
if (tracker == null) {
123+
return completedProcessingMetrics.getOrDefault(workId, new HashMap<>());
124+
}
128125
return mergeStepStatsMaps(
129126
completedProcessingMetrics.getOrDefault(workId, new HashMap<>()),
130127
tracker.getProcessingTimesByStepCopy());

0 commit comments

Comments
 (0)