Skip to content

Commit c1a4516

Browse files
committed
Revert Skip BoundedTrie on Dataflow till service is have BoundedTrie apache#33921
1 parent 56409b2 commit c1a4516

File tree

4 files changed

+5
-25
lines changed

4 files changed

+5
-25
lines changed

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ def commonLegacyExcludeCategories = [
193193
'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner
194194
'org.apache.beam.sdk.testing.UsesMetricsPusher',
195195
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
196-
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics',
197196
]
198197

199198
def commonRunnerV2ExcludeCategories = [
@@ -207,7 +206,6 @@ def commonRunnerV2ExcludeCategories = [
207206
'org.apache.beam.sdk.testing.UsesTestStream',
208207
'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime',
209208
'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput',
210-
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics',
211209
]
212210

213211
// For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import com.google.api.services.dataflow.model.CounterUpdate;
2121
import com.google.api.services.dataflow.model.SideInputInfo;
22-
import java.util.Collections;
2322
import java.util.Objects;
2423
import java.util.concurrent.TimeUnit;
2524
import org.apache.beam.runners.core.InMemoryStateInternals;
@@ -78,18 +77,14 @@ public class BatchModeExecutionContext
7877
protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE =
7978
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";
8079

81-
// TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries.
82-
private final boolean populateBoundedTrieMetrics;
83-
8480
private BatchModeExecutionContext(
8581
CounterFactory counterFactory,
8682
Cache<?, WeightedValue<?>> dataCache,
8783
Cache<?, ?> logicalReferenceCache,
8884
ReaderFactory readerFactory,
8985
PipelineOptions options,
9086
DataflowExecutionStateTracker executionStateTracker,
91-
DataflowExecutionStateRegistry executionStateRegistry,
92-
boolean populateBoundedTrieMetrics) {
87+
DataflowExecutionStateRegistry executionStateRegistry) {
9388
super(
9489
counterFactory,
9590
createMetricsContainerRegistry(),
@@ -102,7 +97,6 @@ private BatchModeExecutionContext(
10297
this.dataCache = dataCache;
10398
this.containerRegistry =
10499
(MetricsContainerRegistry<MetricsContainerImpl>) getMetricsContainerRegistry();
105-
this.populateBoundedTrieMetrics = populateBoundedTrieMetrics;
106100
}
107101

108102
private static MetricsContainerRegistry<MetricsContainerImpl> createMetricsContainerRegistry() {
@@ -138,8 +132,7 @@ public static BatchModeExecutionContext forTesting(
138132
counterFactory,
139133
options,
140134
"test-work-item-id"),
141-
stateRegistry,
142-
true);
135+
stateRegistry);
143136
}
144137

145138
public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) {
@@ -252,8 +245,7 @@ public static BatchModeExecutionContext create(
252245
counterFactory,
253246
options,
254247
workItemId),
255-
executionStateRegistry,
256-
false);
248+
executionStateRegistry);
257249
}
258250

259251
/** Create a new {@link StepContext}. */
@@ -528,10 +520,7 @@ public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
528520
update ->
529521
MetricsToCounterUpdateConverter.fromStringSet(
530522
update.getKey(), true, update.getUpdate())),
531-
FluentIterable.from(
532-
populateBoundedTrieMetrics
533-
? updates.boundedTrieUpdates()
534-
: Collections.emptyList())
523+
FluentIterable.from(updates.boundedTrieUpdates())
535524
.transform(
536525
update ->
537526
MetricsToCounterUpdateConverter.fromBoundedTrie(

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.time.Clock;
2424
import java.time.Duration;
2525
import java.time.Instant;
26-
import java.util.Collections;
2726
import java.util.HashSet;
2827
import java.util.Map;
2928
import java.util.Map.Entry;
@@ -104,9 +103,6 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
104103

105104
private final Clock clock;
106105

107-
// TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries.
108-
@VisibleForTesting boolean populateBoundedTrieMetrics;
109-
110106
private StreamingStepMetricsContainer(String stepName) {
111107
this.stepName = stepName;
112108
this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>();
@@ -217,7 +213,7 @@ public Iterable<CounterUpdate> extractUpdates() {
217213
.append(distributionUpdates())
218214
.append(gaugeUpdates())
219215
.append(stringSetUpdates())
220-
.append(populateBoundedTrieMetrics ? boundedTrieUpdates() : Collections.emptyList());
216+
.append(boundedTrieUpdates());
221217
}
222218

223219
private FluentIterable<CounterUpdate> counterUpdates() {

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,6 @@ public void testBoundedTrieUpdateExtraction() {
371371
.setBoundedTrie(
372372
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto()));
373373

374-
((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true;
375374
Iterable<CounterUpdate> updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
376375
assertThat(updates, containsInAnyOrder(name1Update));
377376

@@ -400,7 +399,6 @@ public void testBoundedTrieUpdateExtraction() {
400399
.setBoundedTrie(
401400
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2.toProto()));
402401

403-
((StreamingStepMetricsContainer) c2).populateBoundedTrieMetrics = true;
404402
updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
405403
assertThat(updates, containsInAnyOrder(name2Update));
406404

@@ -412,7 +410,6 @@ public void testBoundedTrieUpdateExtraction() {
412410
name1Update.setBoundedTrie(
413411
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto()));
414412

415-
((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true;
416413
updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
417414
assertThat(updates, containsInAnyOrder(name1Update));
418415
}

0 commit comments

Comments
 (0)