From fda75e0b1523f4aea91518e95a078494c02ee69e Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Thu, 26 Feb 2026 14:24:09 -0800 Subject: [PATCH 01/14] Adds bundleFinalizer support to non-portable worker. --- .../beam/runners/core/SimpleDoFnRunner.java | 16 +- .../worker/StreamingModeExecutionContext.java | 79 ++++++++- .../grpc/GetWorkResponseChunkAssembler.java | 13 +- .../processing/StreamingCommitFinalizer.java | 157 ++++++++++++++---- .../processing/StreamingWorkScheduler.java | 1 + .../dataflow/worker/SimpleParDoFnTest.java | 120 +++++++++++++ .../StreamingCommitFinalizerTest.java | 147 ++++++++++++++++ .../windmill/src/main/proto/windmill.proto | 17 +- 8 files changed, 500 insertions(+), 50 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 0fd63556b9c7..e2bf0f3c18b9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -310,9 +310,14 @@ public DoFn.StartBundleContext startBundleContext(DoFn { /** A concrete implementation of {@link DoFn.FinishBundleContext}. */ @@ -355,6 +360,11 @@ public DoFn.FinishBundleContext finishBundleContext( public String getErrorContext() { return "SimpleDoFnRunner/FinishBundle"; } + + @Override + public BundleFinalizer bundleFinalizer() { + return stepContext.bundleFinalizer(); + } } /** @@ -1005,7 +1015,7 @@ public void outputWindowedValue( @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( - "Bundle finalization is not supported in non-portable pipelines."); + "Bundle finalization is not supported in OnTimer calls."); } } @@ -1259,7 +1269,7 @@ public void outputWindowedValue( @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( - "Bundle finalization is not supported in non-portable pipelines."); + "Bundle finalization is not supported in OnWindowExpiration calls."); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 09afcadc3002..6c2d786e01c7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -24,6 +24,7 @@ import com.google.api.services.dataflow.model.SideInputInfo; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -71,6 +72,7 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.values.PCollectionView; @@ -86,6 +88,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.PeekingIterator; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table; +import org.apache.commons.lang3.tuple.Pair; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -444,11 +447,29 @@ public void invalidateCache() { } } - public Map flushState() { - Map callbacks = new HashMap<>(); + public Map> flushState() { + Map> callbacks = new HashMap<>(); + List> bundleFinalizers = new ArrayList<>(); for (StepContext stepContext : getAllStepContexts()) { stepContext.flushState(); + bundleFinalizers.addAll(stepContext.getBundleFinalizerCallbacks()); + stepContext.clearBundleFinalizerCallbacks(); + } + for (Pair bundleFinalizer : bundleFinalizers) { + long id = ThreadLocalRandom.current().nextLong(); + callbacks.put( + id, + Pair.of( + bundleFinalizer.getLeft(), + () -> { + try { + bundleFinalizer.getRight().onBundleSuccess(); + } catch (Exception e) { + throw new RuntimeException("Exception while running bundle finalizer", e); + } + })); + outputBuilder.addFinalizeIds(id); } if (activeReader != null) { @@ -460,13 +481,15 @@ public Map flushState() { sourceStateBuilder.addFinalizeIds(id); callbacks.put( id, - () -> { - try { - checkpointMark.finalizeCheckpoint(); - } catch (IOException e) { - throw new RuntimeException("Exception while finalizing checkpoint", e); - } - }); + Pair.of( + Instant.now().plus(Duration.standardMinutes(5)), + () -> { + try { + checkpointMark.finalizeCheckpoint(); + } catch (IOException e) { + throw new RuntimeException("Exception while finalizing checkpoint", e); + } + })); @SuppressWarnings("unchecked") Coder checkpointCoder = @@ -697,6 +720,11 @@ public void setStateCleanupTimer( public DataflowStepContext namespacedToUser() { return this; } + + @Override + public BundleFinalizer bundleFinalizer() { + return wrapped.bundleFinalizer(); + } } /** A {@link SideInputReader} that fetches side inputs from the streaming worker's cache. */ @@ -769,6 +797,7 @@ class StepContext extends DataflowExecutionContext.DataflowStepContext // A list of timer keys that were modified by user processing earlier in this bundle. This // serves a tombstone, so that we know not to fire any bundle timers that were modified. private Table modifiedUserTimerKeys = null; + private final WindmillBundleFinalizer bundleFinalizer = new WindmillBundleFinalizer(); public StepContext(DataflowOperationContext operationContext) { super(operationContext.nameContext()); @@ -1044,9 +1073,41 @@ public TimerInternals timerInternals() { return checkNotNull(systemTimerInternals); } + @Override + public BundleFinalizer bundleFinalizer() { + return bundleFinalizer; + } + public TimerInternals userTimerInternals() { ensureStateful("Tried to access user timers"); return checkNotNull(userTimerInternals); } + + public List> getBundleFinalizerCallbacks() { + return bundleFinalizer.getCallbacks(); + } + + public void clearBundleFinalizerCallbacks() { + bundleFinalizer.clearCallbacks(); + } + } + + private static class WindmillBundleFinalizer implements BundleFinalizer { + private List> callbacks = new ArrayList<>(); + + private WindmillBundleFinalizer() {} + + private List> getCallbacks() { + return callbacks; + } + + private void clearCallbacks() { + callbacks.clear(); + } + + @Override + public void afterBundleCommit(Instant callbackExpiry, Callback callback) { + callbacks.add(Pair.of(callbackExpiry, callback)); + } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java index 3608bd1ccacd..2fb70ad0111b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java @@ -54,6 +54,7 @@ final class GetWorkResponseChunkAssembler { private final WorkItem.Builder workItemBuilder; // Reused to reduce GC overhead. private ByteString data; private long bufferedSize; + private final List appliedFinalizeIds; GetWorkResponseChunkAssembler() { workTimingInfosTracker = new GetWorkTimingInfosTracker(System::currentTimeMillis); @@ -61,6 +62,7 @@ final class GetWorkResponseChunkAssembler { bufferedSize = 0; metadata = null; workItemBuilder = WorkItem.newBuilder(); + appliedFinalizeIds = new ArrayList<>(); } /** @@ -72,6 +74,7 @@ List append(Windmill.StreamingGetWorkResponseChunk chunk) { metadata = ComputationMetadata.fromProto(chunk.getComputationMetadata()); } workTimingInfosTracker.addTimingInfo(chunk.getPerWorkItemTimingInfosList()); + appliedFinalizeIds.addAll(chunk.getAppliedFinalizeIdsList()); List response = new ArrayList<>(); for (int i = 0; i < chunk.getSerializedWorkItemList().size(); i++) { @@ -90,7 +93,7 @@ List append(Windmill.StreamingGetWorkResponseChunk chunk) { } /** - * Attempt to flush the {@link #data} bytes into a {@link WorkItem} w/ it's metadata. Resets the + * Attempt to flush the {@link #data} bytes into a {@link WorkItem} w/ its metadata. Resets the * data byte string and tracking metadata afterwards, whether the {@link WorkItem} deserialization * was successful or not. */ @@ -102,7 +105,8 @@ private Optional flushToWorkItem() { workItemBuilder.build(), Preconditions.checkNotNull(metadata), workTimingInfosTracker.getLatencyAttributions(), - bufferedSize)); + bufferedSize, + appliedFinalizeIds)); } catch (IOException e) { LOG.error("Failed to parse work item from stream: ", e); } finally { @@ -110,6 +114,7 @@ private Optional flushToWorkItem() { workTimingInfosTracker.reset(); data = ByteString.EMPTY; bufferedSize = 0; + appliedFinalizeIds.clear(); } return Optional.empty(); @@ -144,7 +149,9 @@ private static AssembledWorkItem create( WorkItem workItem, ComputationMetadata computationMetadata, ImmutableList latencyAttributions, - long size) { + long size, + List appliedFinalizeIds) { + workItem = workItem.toBuilder().addAllAppliedFinalizeIds(appliedFinalizeIds).build(); return new AutoValue_GetWorkResponseChunkAssembler_AssembledWorkItem( workItem, computationMetadata, latencyAttributions, size); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index d663b4fca27a..6811e69d997a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -17,14 +17,24 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.processing; -import java.time.Duration; +import static com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.Comparator; +import java.util.HashMap; import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.commons.lang3.tuple.Pair; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,28 +42,99 @@ @Internal final class StreamingCommitFinalizer { private static final Logger LOG = LoggerFactory.getLogger(StreamingCommitFinalizer.class); - private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = Duration.ofMinutes(5L); - private final Cache commitFinalizerCache; - private final BoundedQueueExecutor finalizationExecutor; - - private StreamingCommitFinalizer( - Cache commitFinalizerCache, BoundedQueueExecutor finalizationExecutor) { - this.commitFinalizerCache = commitFinalizerCache; - this.finalizationExecutor = finalizationExecutor; + + /** A {@link Runnable} and expiry time pair. */ + @AutoValue + public abstract static class FinalizationInfo { + public abstract Long getId(); + + public abstract Instant getExpiryTime(); + + public abstract Runnable getCallback(); + + public static FinalizationInfo create(Long id, Instant expiryTime, Runnable callback) { + return new AutoValue_StreamingCommitFinalizer_FinalizationInfo(id, expiryTime, callback); + } + } + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition queueMinChanged = lock.newCondition(); + + @GuardedBy("lock") + private final HashMap commitFinalizationCallbacks = new HashMap<>(); + + @GuardedBy("lock") + private final PriorityQueue cleanUpQueue = + new PriorityQueue<>(11, Comparator.comparing(FinalizationInfo::getExpiryTime)); + + private StreamingCommitFinalizer(BoundedQueueExecutor finalizationCleanupExecutor) { + finalizationCleanupExecutor.execute(this::cleanupThreadBody, 0); + } + + private void cleanupThreadBody() { + lock.lock(); + try { + while (true) { + final @Nullable FinalizationInfo minValue = cleanUpQueue.peek(); + if (minValue == null) { + // Wait for an element to be added and loop to re-examine the min. + queueMinChanged.await(); + continue; + } + + Instant now = Instant.now(); + Duration timeDifference = new Duration(now, minValue.getExpiryTime()); + if (timeDifference.getMillis() < 0 + || (queueMinChanged.await(timeDifference.getMillis(), TimeUnit.MILLISECONDS) + && cleanUpQueue.peek() == minValue)) { + // The minimum element has an expiry time before now, either because it had elapsed when + // we pulled it or because we awaited it, and it is still the minimum. + checkState(minValue == cleanUpQueue.poll()); + checkState(commitFinalizationCallbacks.remove(minValue.getId()) == minValue); + } + } + } catch (InterruptedException e) { + // We're being shutdown. + } finally { + lock.unlock(); + } } static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) { - return new StreamingCommitFinalizer( - CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(), - workExecutor); + return new StreamingCommitFinalizer(workExecutor); } /** * Stores a map of user worker generated finalization ids and callbacks to execute once a commit * has been successfully committed to the backing state store. */ - void cacheCommitFinalizers(Map commitCallbacks) { - commitFinalizerCache.putAll(commitCallbacks); + public void cacheCommitFinalizers(Map> callbacks) { + for (Map.Entry> entry : callbacks.entrySet()) { + Long finalizeId = entry.getKey(); + final FinalizationInfo info = + FinalizationInfo.create( + finalizeId, entry.getValue().getLeft(), entry.getValue().getRight()); + + lock.lock(); + try { + FinalizationInfo existingInfo = commitFinalizationCallbacks.put(finalizeId, info); + if (existingInfo != null) { + throw new IllegalStateException( + "Expected to not have any past callbacks for bundle " + + finalizeId + + " but had " + + existingInfo); + } + cleanUpQueue.add(info); + @SuppressWarnings("ReferenceEquality") + boolean newMin = cleanUpQueue.peek() == info; + if (newMin) { + queueMinChanged.signal(); + } + } finally { + lock.unlock(); + } + } } /** @@ -61,25 +142,35 @@ void cacheCommitFinalizers(Map commitCallbacks) { * successfully persisted in the backing state store. If the commitCallback for the finalizationId * is still cached it is invoked. */ - void finalizeCommits(Iterable finalizeIds) { + public void finalizeCommits(Iterable finalizeIds) { for (long finalizeId : finalizeIds) { - @Nullable Runnable finalizeCommit = commitFinalizerCache.getIfPresent(finalizeId); - // NOTE: It is possible the same callback id may be removed twice if - // windmill restarts. - // TODO: It is also possible for an earlier finalized id to be lost. - // We should automatically discard all older callbacks for the same computation and key. - if (finalizeCommit != null) { - commitFinalizerCache.invalidate(finalizeId); - finalizationExecutor.forceExecute( - () -> { - try { - finalizeCommit.run(); - } catch (Throwable t) { - LOG.error("Source checkpoint finalization failed:", t); - } - }, - 0); + @Nullable FinalizationInfo info; + lock.lock(); + try { + info = commitFinalizationCallbacks.remove(finalizeId); + if (info != null) { + checkState(cleanUpQueue.remove(info)); + } + } finally { + lock.unlock(); + } + if (info != null) { + try { + info.getCallback().run(); + } catch (Throwable t) { + LOG.error("Commit finalization failed:", t); + } } } } + + // Only exposed for tests. + public int cleanupQueueSize() { + lock.lock(); + try { + return cleanUpQueue.size(); + } finally { + lock.unlock(); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 242e4a5f0db4..e5e88d1a9dc3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -246,6 +246,7 @@ private void processWork(ComputationState computationState, Work work) { // Before any processing starts, call any pending OnCommit callbacks. Nothing that requires // cleanup should be done before this, since we might exit early here. commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList()); + commitFinalizer.finalizeCommits(workItem.getAppliedFinalizeIdsList()); if (workItem.getSourceState().getOnlyFinalize()) { Windmill.WorkItemCommitRequest.Builder outputBuilder = initializeOutputBuilder(key, workItem); outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true)); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java index 9e45425562a3..5fd736c3aaa9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java @@ -36,20 +36,24 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDistribution; import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn; import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -62,12 +66,15 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; /** Tests for {@link SimpleParDoFn}. */ @RunWith(JUnit4.class) @@ -633,4 +640,117 @@ public T get(PCollectionView view, final BoundedWindow window) { throw new IllegalArgumentException("calling getSideInput() with unknown view"); } } + + @Test + public void testBundleFinalizer() throws Exception { + bundleSuccessCount.set(0); + DoFnInfo fnInfo = + DoFnInfo.forFn( + new WithBundleFinalizerDoFn(), + WindowingStrategy.globalDefault(), + null /* side input views */, + VarLongCoder.of(), + MAIN_OUTPUT, + DoFnSchemaInformation.create(), + Collections.emptyMap()); + DataflowExecutionContext.DataflowStepContext userStepContext = + Mockito.mock( + DataflowExecutionContext.DataflowStepContext.class, + invocation -> { + if (invocation.getMethod().getName().equals("bundleFinalizer")) { + return new BundleFinalizer() { + @Override + public void afterBundleCommit(Instant expiry, Callback callback) { + try { + callback.onBundleSuccess(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } + return invocation.getMethod().invoke(stepContext, invocation.getArguments()); + }); + + DataflowStepContext stepContextWithBundleFinalizer = + Mockito.mock( + DataflowStepContext.class, + invocation -> { + if (invocation.getMethod().getName().equals("bundleFinalizer")) { + return new BundleFinalizer() { + @Override + public void afterBundleCommit(Instant expiry, Callback callback) { + try { + callback.onBundleSuccess(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } + if (invocation.getMethod().getName().equals("namespacedToUser")) { + return userStepContext; + } + return invocation.getMethod().invoke(stepContext, invocation.getArguments()); + }); + + ParDoFn parDoFn = + new SimpleParDoFn<>( + options, + DoFnInstanceManagers.singleInstance(fnInfo), + new EmptySideInputReader(), + MAIN_OUTPUT, + ImmutableMap.of(MAIN_OUTPUT, 0), + stepContextWithBundleFinalizer, + operationContext, + DoFnSchemaInformation.create(), + Collections.emptyMap(), + SimpleDoFnRunnerFactory.INSTANCE); + + parDoFn.startBundle(new TestReceiver()); + + // Process a few elements + for (int i = 0; i < 5; i++) { + parDoFn.processElement(WindowedValues.valueInGlobalWindow(1L)); + } + + parDoFn.finishBundle(); + + // The counter increases by 1 in StartBundle, 5 in ProcessElement, and 1 in FinishBundle. + // Total should be 7. + assertThat(getBundleSuccessCount(), equalTo(7)); + } + + private static final AtomicInteger bundleSuccessCount = new AtomicInteger(0); + + static void increaseBundleSuccessCount() { + bundleSuccessCount.incrementAndGet(); + } + + static int getBundleSuccessCount() { + return bundleSuccessCount.get(); + } + + static class WithBundleFinalizerDoFn extends DoFn { + @StartBundle + public void startBundle(StartBundleContext context, BundleFinalizer bundleFinalizer) { + bundleFinalizer.afterBundleCommit( + Instant.now().plus(Duration.standardMinutes(5)), + SimpleParDoFnTest::increaseBundleSuccessCount); + } + + @ProcessElement + public void processElement(ProcessContext c, BundleFinalizer bundleFinalizer) { + bundleFinalizer.afterBundleCommit( + Instant.now().plus(Duration.standardMinutes(5)), + SimpleParDoFnTest::increaseBundleSuccessCount); + } + + @FinishBundle + public void finishBundle(FinishBundleContext context, BundleFinalizer bundleFinalizer) { + bundleFinalizer.afterBundleCommit( + Instant.now().plus(Duration.standardMinutes(5)), + SimpleParDoFnTest::increaseBundleSuccessCount); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java new file mode 100644 index 000000000000..8556b48a10fc --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.processing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.Map; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.commons.lang3.tuple.Pair; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; + +@RunWith(JUnit4.class) +public class StreamingCommitFinalizerTest { + + private StreamingCommitFinalizer finalizer; + private BoundedQueueExecutor mockExecutor; + private Runnable backgroundCleanupRunnable; + private Thread backgroundThread; + + @Before + public void setUp() { + mockExecutor = mock(BoundedQueueExecutor.class); + ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + finalizer = StreamingCommitFinalizer.create(mockExecutor); + verify(mockExecutor).execute(runnableCaptor.capture(), ArgumentMatchers.eq(0L)); + backgroundCleanupRunnable = runnableCaptor.getValue(); + + backgroundThread = new Thread(backgroundCleanupRunnable); + backgroundThread.setDaemon(true); + backgroundThread.start(); + } + + @After + public void tearDown() throws Exception { + if (backgroundThread != null) { + backgroundThread.interrupt(); + backgroundThread.join(); + } + } + + @Test + public void testCreateAndInit() { + assertEquals(0, finalizer.cleanupQueueSize()); + } + + @Test + public void testCacheCommitFinalizer() { + Runnable callback = mock(Runnable.class); + finalizer.cacheCommitFinalizers( + ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); + assertEquals(1, finalizer.cleanupQueueSize()); + } + + @Test + public void testThrowErrorOnDuplicateIds() { + Runnable callback1 = mock(Runnable.class); + Instant expiry = Instant.now().plus(Duration.standardHours(1)); + finalizer.cacheCommitFinalizers(ImmutableMap.of(1L, Pair.of(expiry, callback1))); + + Runnable callback2 = mock(Runnable.class); + Map> duplicateCallback = + ImmutableMap.of(1L, Pair.of(expiry, callback2)); + assertThrows( + IllegalStateException.class, () -> finalizer.cacheCommitFinalizers(duplicateCallback)); + } + + @Test + public void testFinalizeCommits() { + Runnable callback = mock(Runnable.class); + finalizer.cacheCommitFinalizers( + ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); + finalizer.finalizeCommits(Collections.singletonList(1L)); + + verify(callback).run(); + assertEquals(0, finalizer.cleanupQueueSize()); + } + + @Test + public void testIgnoresUnknownIds() { + Runnable callback = mock(Runnable.class); + finalizer.cacheCommitFinalizers( + ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); + finalizer.finalizeCommits(Collections.singletonList(2L)); + + verify(callback, never()).run(); + assertEquals(1, finalizer.cleanupQueueSize()); + } + + @Test + public void testCleanupOnExpiration() throws Exception { + Runnable callback = mock(Runnable.class); + finalizer.cacheCommitFinalizers( + ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); + assertEquals(1, finalizer.cleanupQueueSize()); + + Runnable callback2 = mock(Runnable.class); + finalizer.cacheCommitFinalizers( + ImmutableMap.of(2L, Pair.of(Instant.now().plus(Duration.millis(100)), callback2))); + + Runnable callback3 = mock(Runnable.class); + finalizer.cacheCommitFinalizers( + ImmutableMap.of(3L, Pair.of(Instant.now().plus(Duration.millis(100)), callback3))); + + while (finalizer.cleanupQueueSize() > 1) { + // Wait until it expires + Thread.sleep(500); + } + // We can call finalize even though these were already cleaned up. + finalizer.finalizeCommits(ImmutableList.of(2L, 3L)); + verify(callback2, never()).run(); + verify(callback3, never()).run(); + + finalizer.finalizeCommits(Collections.singletonList(1L)); + verify(callback).run(); + assertEquals(0, finalizer.cleanupQueueSize()); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index a4b3df906dd9..97cf52ecef21 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -447,6 +447,10 @@ message WorkItem { // elements mapped to a single key to impact pipeline performance. When // present, this field includes metadata associated with any hot key. optional HotKeyInfo hot_key_info = 11; + + repeated int64 applied_finalize_ids = 16; + + reserved 12, 13, 14, 15; } message ComputationWorkItems { @@ -653,10 +657,16 @@ message WorkItemCommitRequest { // Collected work item processing state durations. repeated LatencyAttribution per_work_item_latency_attributions = 27; + // Ids that will be passed back as applied_finalize_ids in a subsequent + // GetWorkResponse once the state in this request has been persisted to disk + // successfully. This is best-effort; it is possible that state is persisted + // but the finalize ids are not sent to the worker. + repeated int64 finalize_ids = 19 [packed = true]; + // DEPRECATED repeated GlobalDataId global_data_id_requests = 9; - reserved 6, 19, 23; + reserved 6, 23; } message ComputationCommitWorkRequest { @@ -791,11 +801,14 @@ message StreamingGetWorkResponseChunk { // from other stream_ids may be interleaved on the physical stream. optional fixed64 stream_id = 4; + // Finalize ids associated with successfully applied work from this worker + repeated int64 applied_finalize_ids = 6 [packed = true]; + // Timing infos for the work item. Windmill Dispatcher and user worker should // propagate critical event timings if the list is not empty. repeated GetWorkStreamTimingInfo per_work_item_timing_infos = 8; - // reserved field 5 + reserved 5, 7; } message ComputationWorkItemMetadata { From 1ec825d2a3dfa93123ad3d5c16a45ac4894ca6f8 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Thu, 26 Feb 2026 15:50:17 -0800 Subject: [PATCH 02/14] Removes check preventing stateful DoFn's with bundle finalizers from running on Dataflow streaming non-portable worker. --- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 775e7b91de93..c2e2b957d767 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -2736,15 +2736,6 @@ static void verifyDoFnSupported( "%s does not currently support @RequiresTimeSortedInput in streaming mode.", DataflowRunner.class.getSimpleName())); } - boolean isUnifiedWorker = useUnifiedWorker(options); - if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker) { - throw new UnsupportedOperationException( - String.format( - "%s does not currently support %s when not using unified worker because it uses " - + "BundleFinalizers in its implementation. Set the `--experiments=use_runner_v2` " - + "option to use this DoFn.", - DataflowRunner.class.getSimpleName(), fn.getClass().getSimpleName())); - } } static void verifyStateSupportForWindowingStrategy(WindowingStrategy strategy) { From 4694de84cd1b44818505c1451b379d622d205fe5 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Fri, 27 Feb 2026 14:44:08 -0800 Subject: [PATCH 03/14] Changes finalize method to only acquire the lock once. Runs finalize callbacks in the given executor. This required changing tests since the callbacks are now running asynchronously. --- .../processing/StreamingCommitFinalizer.java | 32 ++++++---- .../StreamingCommitFinalizerTest.java | 59 ++++++++++--------- 2 files changed, 51 insertions(+), 40 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index 6811e69d997a..25f76f9fd7b1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -20,8 +20,10 @@ import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; @@ -67,7 +69,10 @@ public static FinalizationInfo create(Long id, Instant expiryTime, Runnable call private final PriorityQueue cleanUpQueue = new PriorityQueue<>(11, Comparator.comparing(FinalizationInfo::getExpiryTime)); + private final BoundedQueueExecutor finalizationExecutor; + private StreamingCommitFinalizer(BoundedQueueExecutor finalizationCleanupExecutor) { + finalizationExecutor = finalizationCleanupExecutor; finalizationCleanupExecutor.execute(this::cleanupThreadBody, 0); } @@ -143,23 +148,24 @@ public void cacheCommitFinalizers(Map> callbacks) * is still cached it is invoked. */ public void finalizeCommits(Iterable finalizeIds) { - for (long finalizeId : finalizeIds) { - @Nullable FinalizationInfo info; - lock.lock(); - try { - info = commitFinalizationCallbacks.remove(finalizeId); + List callbacksToExecute = new ArrayList<>(); + lock.lock(); + try { + for (long finalizeId : finalizeIds) { + @Nullable FinalizationInfo info = commitFinalizationCallbacks.remove(finalizeId); if (info != null) { checkState(cleanUpQueue.remove(info)); + callbacksToExecute.add(info.getCallback()); } - } finally { - lock.unlock(); } - if (info != null) { - try { - info.getCallback().run(); - } catch (Throwable t) { - LOG.error("Commit finalization failed:", t); - } + } finally { + lock.unlock(); + } + for (Runnable callback : callbacksToExecute) { + try { + finalizationExecutor.execute(callback, 0); + } catch (Throwable t) { + LOG.error("Commit finalization failed:", t); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java index 8556b48a10fc..f6cefd9927c0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java @@ -27,45 +27,38 @@ import com.google.common.collect.ImmutableMap; import java.util.Collections; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.tuple.Pair; import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatchers; @RunWith(JUnit4.class) public class StreamingCommitFinalizerTest { private StreamingCommitFinalizer finalizer; - private BoundedQueueExecutor mockExecutor; - private Runnable backgroundCleanupRunnable; - private Thread backgroundThread; + private BoundedQueueExecutor executor; @Before public void setUp() { - mockExecutor = mock(BoundedQueueExecutor.class); - ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); - finalizer = StreamingCommitFinalizer.create(mockExecutor); - verify(mockExecutor).execute(runnableCaptor.capture(), ArgumentMatchers.eq(0L)); - backgroundCleanupRunnable = runnableCaptor.getValue(); - - backgroundThread = new Thread(backgroundCleanupRunnable); - backgroundThread.setDaemon(true); - backgroundThread.start(); - } - - @After - public void tearDown() throws Exception { - if (backgroundThread != null) { - backgroundThread.interrupt(); - backgroundThread.join(); - } + executor = + new BoundedQueueExecutor( + 10, + 60, + TimeUnit.SECONDS, + 10, + 10000000, + new ThreadFactoryBuilder() + .setNameFormat("FinalizationCallback-%d") + .setDaemon(true) + .build(), + /*useFairMonitor=*/ false); + finalizer = StreamingCommitFinalizer.create(executor); } @Test @@ -95,23 +88,29 @@ public void testThrowErrorOnDuplicateIds() { } @Test - public void testFinalizeCommits() { + public void testFinalizeCommits() throws Exception { Runnable callback = mock(Runnable.class); finalizer.cacheCommitFinalizers( ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); finalizer.finalizeCommits(Collections.singletonList(1L)); - + // The executor always has the cleanup thread running. So elementsOutstanding == 2 while we're + // waiting for the finalization callback to run. + while (executor.elementsOutstanding() > 1) { + Thread.sleep(500); + } verify(callback).run(); assertEquals(0, finalizer.cleanupQueueSize()); } @Test - public void testIgnoresUnknownIds() { + public void testIgnoresUnknownIds() throws Exception { Runnable callback = mock(Runnable.class); finalizer.cacheCommitFinalizers( ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); finalizer.finalizeCommits(Collections.singletonList(2L)); - + while (executor.elementsOutstanding() > 1) { + Thread.sleep(500); + } verify(callback, never()).run(); assertEquals(1, finalizer.cleanupQueueSize()); } @@ -137,10 +136,16 @@ public void testCleanupOnExpiration() throws Exception { } // We can call finalize even though these were already cleaned up. finalizer.finalizeCommits(ImmutableList.of(2L, 3L)); + while (executor.elementsOutstanding() > 1) { + Thread.sleep(500); + } verify(callback2, never()).run(); verify(callback3, never()).run(); finalizer.finalizeCommits(Collections.singletonList(1L)); + while (executor.elementsOutstanding() > 1) { + Thread.sleep(500); + } verify(callback).run(); assertEquals(0, finalizer.cleanupQueueSize()); } From 85701682ec0b2813c366ad649c1249f9583b10f8 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Mon, 2 Mar 2026 14:04:34 -0800 Subject: [PATCH 04/14] Addresses PR review comments, mostly around improving tests. --- .../processing/StreamingCommitFinalizer.java | 3 +- .../dataflow/worker/SimpleParDoFnTest.java | 48 ++++++------- .../StreamingCommitFinalizerTest.java | 68 +++++++++++++------ 3 files changed, 69 insertions(+), 50 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index 25f76f9fd7b1..c9b6bf8a131a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -170,8 +170,7 @@ public void finalizeCommits(Iterable finalizeIds) { } } - // Only exposed for tests. - public int cleanupQueueSize() { + int cleanupQueueSize() { lock.lock(); try { return cleanUpQueue.size(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java index 5fd736c3aaa9..6864c5f3909d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java @@ -79,6 +79,7 @@ /** Tests for {@link SimpleParDoFn}. */ @RunWith(JUnit4.class) public class SimpleParDoFnTest { + @Rule public ExpectedException thrown = ExpectedException.none(); private PipelineOptions options; @@ -102,6 +103,7 @@ public void setUp() { // TODO: Replace TestDoFn usages with a mock DoFn to reduce boilerplate. static class TestDoFn extends DoFn { + enum State { UNSTARTED, SET_UP, @@ -163,6 +165,7 @@ public void teardown() { } static class TestErrorDoFn extends DoFn { + // Used to test nested stack traces. private void nestedFunctionBeta(String s) { throw new RuntimeException(s); @@ -189,6 +192,7 @@ public void finishBundle() { } static class TestReceiver implements Receiver { + List receivedElems = new ArrayList<>(); @Override @@ -570,10 +574,10 @@ public void testOutputsPerElementCounterDisabledViaExperiment() throws Exception * @param inputData Input elements to process. For each element X, the DoFn will output a string * repeated X times. * @return Delta counter updates extracted after execution. - * @throws Exception */ private List executeParDoFnCounterTest(int... inputData) throws Exception { class RepeaterDoFn extends DoFn { + /** Takes as input the number of times to output a message. */ @ProcessElement public void processElement(ProcessContext c) { @@ -623,6 +627,7 @@ public void processElement(ProcessContext c) { * conversion according to the {@link PCollectionView} and projection to a particular window. */ private static class EmptySideInputReader implements SideInputReader { + private EmptySideInputReader() {} @Override @@ -643,7 +648,9 @@ public T get(PCollectionView view, final BoundedWindow window) { @Test public void testBundleFinalizer() throws Exception { - bundleSuccessCount.set(0); + startBundleCount.set(0); + processElementCount.set(0); + finishBundleCount.set(0); DoFnInfo fnInfo = DoFnInfo.forFn( new WithBundleFinalizerDoFn(), @@ -676,18 +683,6 @@ public void afterBundleCommit(Instant expiry, Callback callback) { Mockito.mock( DataflowStepContext.class, invocation -> { - if (invocation.getMethod().getName().equals("bundleFinalizer")) { - return new BundleFinalizer() { - @Override - public void afterBundleCommit(Instant expiry, Callback callback) { - try { - callback.onBundleSuccess(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } if (invocation.getMethod().getName().equals("namespacedToUser")) { return userStepContext; } @@ -716,41 +711,36 @@ public void afterBundleCommit(Instant expiry, Callback callback) { parDoFn.finishBundle(); - // The counter increases by 1 in StartBundle, 5 in ProcessElement, and 1 in FinishBundle. - // Total should be 7. - assertThat(getBundleSuccessCount(), equalTo(7)); + assertThat(startBundleCount.get(), equalTo(1)); + assertThat(processElementCount.get(), equalTo(5)); + assertThat(finishBundleCount.get(), equalTo(1)); } - private static final AtomicInteger bundleSuccessCount = new AtomicInteger(0); - - static void increaseBundleSuccessCount() { - bundleSuccessCount.incrementAndGet(); - } - - static int getBundleSuccessCount() { - return bundleSuccessCount.get(); - } + private static final AtomicInteger startBundleCount = new AtomicInteger(0); + private static final AtomicInteger processElementCount = new AtomicInteger(0); + private static final AtomicInteger finishBundleCount = new AtomicInteger(0); static class WithBundleFinalizerDoFn extends DoFn { + @StartBundle public void startBundle(StartBundleContext context, BundleFinalizer bundleFinalizer) { bundleFinalizer.afterBundleCommit( Instant.now().plus(Duration.standardMinutes(5)), - SimpleParDoFnTest::increaseBundleSuccessCount); + () -> SimpleParDoFnTest.startBundleCount.incrementAndGet()); } @ProcessElement public void processElement(ProcessContext c, BundleFinalizer bundleFinalizer) { bundleFinalizer.afterBundleCommit( Instant.now().plus(Duration.standardMinutes(5)), - SimpleParDoFnTest::increaseBundleSuccessCount); + () -> SimpleParDoFnTest.processElementCount.incrementAndGet()); } @FinishBundle public void finishBundle(FinishBundleContext context, BundleFinalizer bundleFinalizer) { bundleFinalizer.afterBundleCommit( Instant.now().plus(Duration.standardMinutes(5)), - SimpleParDoFnTest::increaseBundleSuccessCount); + () -> SimpleParDoFnTest.finishBundleCount.incrementAndGet()); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java index f6cefd9927c0..2c5fe04256be 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -27,6 +28,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -72,6 +74,7 @@ public void testCacheCommitFinalizer() { finalizer.cacheCommitFinalizers( ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); assertEquals(1, finalizer.cleanupQueueSize()); + verify(callback, never()).run(); } @Test @@ -89,16 +92,41 @@ public void testThrowErrorOnDuplicateIds() { @Test public void testFinalizeCommits() throws Exception { - Runnable callback = mock(Runnable.class); + CountDownLatch callbackExecuted = new CountDownLatch(1); finalizer.cacheCommitFinalizers( - ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); + ImmutableMap.of( + 1L, + Pair.of( + Instant.now().plus(Duration.standardHours(1)), + () -> callbackExecuted.countDown()))); finalizer.finalizeCommits(Collections.singletonList(1L)); - // The executor always has the cleanup thread running. So elementsOutstanding == 2 while we're - // waiting for the finalization callback to run. - while (executor.elementsOutstanding() > 1) { - Thread.sleep(500); - } - verify(callback).run(); + assertTrue(callbackExecuted.await(30, TimeUnit.SECONDS)); + assertEquals(0, finalizer.cleanupQueueSize()); + } + + @Test + public void testMultipleCommits() throws Exception { + CountDownLatch callback1Executed = new CountDownLatch(1); + CountDownLatch callback2Executed = new CountDownLatch(1); + CountDownLatch callback3Executed = new CountDownLatch(1); + + Instant expiryTime = Instant.now().plus(Duration.standardHours(1)); + finalizer.cacheCommitFinalizers( + ImmutableMap.>builder() + .put(1L, Pair.of(expiryTime, () -> callback1Executed.countDown())) + .put(2L, Pair.of(expiryTime, () -> callback2Executed.countDown())) + .put(3L, Pair.of(expiryTime, () -> callback3Executed.countDown())) + .build()); + // Finalize commits one at a time (in different order from added). + finalizer.finalizeCommits(Collections.singletonList(2L)); + assertTrue(callback2Executed.await(30, TimeUnit.SECONDS)); + + finalizer.finalizeCommits(Collections.singletonList(3L)); + assertTrue(callback3Executed.await(30, TimeUnit.SECONDS)); + + finalizer.finalizeCommits(Collections.singletonList(1L)); + assertTrue(callback1Executed.await(30, TimeUnit.SECONDS)); + assertEquals(0, finalizer.cleanupQueueSize()); } @@ -117,21 +145,26 @@ public void testIgnoresUnknownIds() throws Exception { @Test public void testCleanupOnExpiration() throws Exception { - Runnable callback = mock(Runnable.class); + CountDownLatch callback1Executed = new CountDownLatch(1); finalizer.cacheCommitFinalizers( - ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); + ImmutableMap.of( + 1L, + Pair.of( + Instant.now().plus(Duration.standardHours(1)), + () -> callback1Executed.countDown()))); assertEquals(1, finalizer.cleanupQueueSize()); Runnable callback2 = mock(Runnable.class); - finalizer.cacheCommitFinalizers( - ImmutableMap.of(2L, Pair.of(Instant.now().plus(Duration.millis(100)), callback2))); - Runnable callback3 = mock(Runnable.class); + Instant shortTimeout = Instant.now().plus(Duration.millis(100)); finalizer.cacheCommitFinalizers( - ImmutableMap.of(3L, Pair.of(Instant.now().plus(Duration.millis(100)), callback3))); + ImmutableMap.>builder() + .put(2L, Pair.of(shortTimeout, callback2)) + .put(3L, Pair.of(shortTimeout, callback3)) + .build()); while (finalizer.cleanupQueueSize() > 1) { - // Wait until it expires + // Wait until the two 100ms timeouts expire. Thread.sleep(500); } // We can call finalize even though these were already cleaned up. @@ -143,10 +176,7 @@ public void testCleanupOnExpiration() throws Exception { verify(callback3, never()).run(); finalizer.finalizeCommits(Collections.singletonList(1L)); - while (executor.elementsOutstanding() > 1) { - Thread.sleep(500); - } - verify(callback).run(); + assertTrue(callback1Executed.await(30, TimeUnit.SECONDS)); assertEquals(0, finalizer.cleanupQueueSize()); } } From 5e1642ce4bcbee6ad1e8a30a795045cf19414e7d Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Wed, 4 Mar 2026 11:25:27 -0800 Subject: [PATCH 05/14] Addresses minor code comments. Adds back in tests for batch, non-portable BundleFinalizers. --- ...va_ValidatesRunner_Dataflow_Streaming.json | 2 +- .../google-cloud-dataflow-java/build.gradle | 1 - .../beam/runners/dataflow/DataflowRunner.java | 9 +++++++ .../processing/StreamingCommitFinalizer.java | 2 ++ .../dataflow/worker/SimpleParDoFnTest.java | 25 +++++++++---------- .../StreamingCommitFinalizerTest.java | 6 ++--- 6 files changed, 27 insertions(+), 18 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index e623d3373a93..50d17c108f2e 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 1, + "modification": 2, } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 3792626a1fdf..43b948d0bfe3 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -205,7 +205,6 @@ def commonLegacyExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesGaugeMetrics', 'org.apache.beam.sdk.testing.UsesTestStream', 'org.apache.beam.sdk.testing.UsesMetricsPusher', - 'org.apache.beam.sdk.testing.UsesBundleFinalizer', 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of now does not support returning back BoundedTrie in metric result. ] diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index c2e2b957d767..552ad5920bbf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -2736,6 +2736,15 @@ static void verifyDoFnSupported( "%s does not currently support @RequiresTimeSortedInput in streaming mode.", DataflowRunner.class.getSimpleName())); } + boolean isUnifiedWorker = useUnifiedWorker(options); + if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker && !streaming) { + throw new UnsupportedOperationException( + String.format( + "%s does not currently support %s in batch mode when not using unified worker because it " + + "uses BundleFinalizers in its implementation. Set the `--experiments=use_runner_v2` " + + "option to use this DoFn.", + DataflowRunner.class.getSimpleName(), fn.getClass().getSimpleName())); + } } static void verifyStateSupportForWindowingStrategy(WindowingStrategy strategy) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index c9b6bf8a131a..0b721d2783bd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -34,6 +34,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.tuple.Pair; import org.joda.time.Duration; import org.joda.time.Instant; @@ -170,6 +171,7 @@ public void finalizeCommits(Iterable finalizeIds) { } } + @VisibleForTesting int cleanupQueueSize() { lock.lock(); try { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java index 6864c5f3909d..9c9f5386f443 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java @@ -648,9 +648,9 @@ public T get(PCollectionView view, final BoundedWindow window) { @Test public void testBundleFinalizer() throws Exception { - startBundleCount.set(0); - processElementCount.set(0); - finishBundleCount.set(0); + WithBundleFinalizerDoFn.startBundleCount.set(0); + WithBundleFinalizerDoFn.processElementCount.set(0); + WithBundleFinalizerDoFn.finishBundleCount.set(0); DoFnInfo fnInfo = DoFnInfo.forFn( new WithBundleFinalizerDoFn(), @@ -711,36 +711,35 @@ public void afterBundleCommit(Instant expiry, Callback callback) { parDoFn.finishBundle(); - assertThat(startBundleCount.get(), equalTo(1)); - assertThat(processElementCount.get(), equalTo(5)); - assertThat(finishBundleCount.get(), equalTo(1)); + assertThat(WithBundleFinalizerDoFn.startBundleCount.get(), equalTo(1)); + assertThat(WithBundleFinalizerDoFn.processElementCount.get(), equalTo(5)); + assertThat(WithBundleFinalizerDoFn.finishBundleCount.get(), equalTo(1)); } - private static final AtomicInteger startBundleCount = new AtomicInteger(0); - private static final AtomicInteger processElementCount = new AtomicInteger(0); - private static final AtomicInteger finishBundleCount = new AtomicInteger(0); - static class WithBundleFinalizerDoFn extends DoFn { + private static final AtomicInteger startBundleCount = new AtomicInteger(0); + private static final AtomicInteger processElementCount = new AtomicInteger(0); + private static final AtomicInteger finishBundleCount = new AtomicInteger(0); @StartBundle public void startBundle(StartBundleContext context, BundleFinalizer bundleFinalizer) { bundleFinalizer.afterBundleCommit( Instant.now().plus(Duration.standardMinutes(5)), - () -> SimpleParDoFnTest.startBundleCount.incrementAndGet()); + () -> startBundleCount.incrementAndGet()); } @ProcessElement public void processElement(ProcessContext c, BundleFinalizer bundleFinalizer) { bundleFinalizer.afterBundleCommit( Instant.now().plus(Duration.standardMinutes(5)), - () -> SimpleParDoFnTest.processElementCount.incrementAndGet()); + () -> processElementCount.incrementAndGet()); } @FinishBundle public void finishBundle(FinishBundleContext context, BundleFinalizer bundleFinalizer) { bundleFinalizer.afterBundleCommit( Instant.now().plus(Duration.standardMinutes(5)), - () -> SimpleParDoFnTest.finishBundleCount.incrementAndGet()); + () -> finishBundleCount.incrementAndGet()); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java index 2c5fe04256be..6f3b55c8fc94 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java @@ -137,7 +137,7 @@ public void testIgnoresUnknownIds() throws Exception { ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); finalizer.finalizeCommits(Collections.singletonList(2L)); while (executor.elementsOutstanding() > 1) { - Thread.sleep(500); + Thread.sleep(200); } verify(callback, never()).run(); assertEquals(1, finalizer.cleanupQueueSize()); @@ -165,12 +165,12 @@ public void testCleanupOnExpiration() throws Exception { while (finalizer.cleanupQueueSize() > 1) { // Wait until the two 100ms timeouts expire. - Thread.sleep(500); + Thread.sleep(200); } // We can call finalize even though these were already cleaned up. finalizer.finalizeCommits(ImmutableList.of(2L, 3L)); while (executor.elementsOutstanding() > 1) { - Thread.sleep(500); + Thread.sleep(200); } verify(callback2, never()).run(); verify(callback3, never()).run(); From 6dcafc40ba5ddd88397304252af94fc9cfa16ff6 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Wed, 4 Mar 2026 11:47:37 -0800 Subject: [PATCH 06/14] Ran spotlessApply --- .../apache/beam/runners/dataflow/DataflowRunner.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 552ad5920bbf..8c2a6c6398b3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -2739,11 +2739,11 @@ static void verifyDoFnSupported( boolean isUnifiedWorker = useUnifiedWorker(options); if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker && !streaming) { throw new UnsupportedOperationException( - String.format( - "%s does not currently support %s in batch mode when not using unified worker because it " - + "uses BundleFinalizers in its implementation. Set the `--experiments=use_runner_v2` " - + "option to use this DoFn.", - DataflowRunner.class.getSimpleName(), fn.getClass().getSimpleName())); + String.format( + "%s does not currently support %s in batch mode when not using unified worker because it " + + "uses BundleFinalizers in its implementation. Set the `--experiments=use_runner_v2` " + + "option to use this DoFn.", + DataflowRunner.class.getSimpleName(), fn.getClass().getSimpleName())); } } From 741300185c4f71a750fdfb33343bd17e5d523c57 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Fri, 6 Mar 2026 14:54:01 -0800 Subject: [PATCH 07/14] Addresses more PR review comments. Triggering validates runner tests. --- ...tCommit_Java_ValidatesRunner_Dataflow.json | 4 +-- .../google-cloud-dataflow-java/build.gradle | 4 +++ .../worker/StreamingModeExecutionContext.java | 27 +++++++++---------- .../grpc/GetWorkResponseChunkAssembler.java | 10 +++---- .../processing/StreamingCommitFinalizer.java | 8 ++++-- .../StreamingCommitFinalizerTest.java | 9 ++----- 6 files changed, 30 insertions(+), 32 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json index a89f7adb4ce8..8144784f5f02 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 3, + "modification": 2, } - + diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index f9550a1425b2..d61e1d201fe2 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -453,6 +453,10 @@ task validatesRunner { 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful', 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle', 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful', + + // Batch legacy worker does not support bundle finalization. + 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testBundleFinalizationOccursOnBoundedSplittableDoFn', + 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testBundleFinalizationOccursOnUnboundedSplittableDoFn', ] )) } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index f6946200425f..2a8d0ba301a5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -84,8 +84,10 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBasedTable; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.PeekingIterator; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; @@ -455,8 +457,7 @@ public Map> flushState() { List> bundleFinalizers = new ArrayList<>(); for (StepContext stepContext : getAllStepContexts()) { stepContext.flushState(); - bundleFinalizers.addAll(stepContext.getBundleFinalizerCallbacks()); - stepContext.clearBundleFinalizerCallbacks(); + bundleFinalizers.addAll(stepContext.flushBundleFinalizerCallbacks()); } for (Pair bundleFinalizer : bundleFinalizers) { long id = ThreadLocalRandom.current().nextLong(); @@ -1082,26 +1083,22 @@ public TimerInternals userTimerInternals() { return checkNotNull(userTimerInternals); } - public List> getBundleFinalizerCallbacks() { - return bundleFinalizer.getCallbacks(); - } - - public void clearBundleFinalizerCallbacks() { - bundleFinalizer.clearCallbacks(); + public ImmutableList> flushBundleFinalizerCallbacks() { + return bundleFinalizer.flushCallbacks(); } } private static class WindmillBundleFinalizer implements BundleFinalizer { - private List> callbacks = new ArrayList<>(); + private ImmutableList.Builder> callbacks = ImmutableList.builder(); private WindmillBundleFinalizer() {} - private List> getCallbacks() { - return callbacks; - } - - private void clearCallbacks() { - callbacks.clear(); + private ImmutableList> flushCallbacks() { + ImmutableList> flushedCallbacks = callbacks.build(); + if (!Iterables.isEmpty(flushedCallbacks)) { + callbacks = ImmutableList.builder(); + } + return flushedCallbacks; } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java index 2fb70ad0111b..4afee8a70df1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java @@ -66,7 +66,7 @@ final class GetWorkResponseChunkAssembler { } /** - * Appends the response chunk bytes to the {@link #data }byte buffer. Return the assembled + * Appends the response chunk bytes to the {@link #data} byte buffer. Return the assembled * WorkItem if all response chunks for a WorkItem have been received. */ List append(Windmill.StreamingGetWorkResponseChunk chunk) { @@ -100,13 +100,13 @@ List append(Windmill.StreamingGetWorkResponseChunk chunk) { private Optional flushToWorkItem() { try { workItemBuilder.mergeFrom(data); + workItemBuilder.addAllAppliedFinalizeIds(appliedFinalizeIds); return Optional.of( AssembledWorkItem.create( workItemBuilder.build(), Preconditions.checkNotNull(metadata), workTimingInfosTracker.getLatencyAttributions(), - bufferedSize, - appliedFinalizeIds)); + bufferedSize)); } catch (IOException e) { LOG.error("Failed to parse work item from stream: ", e); } finally { @@ -149,9 +149,7 @@ private static AssembledWorkItem create( WorkItem workItem, ComputationMetadata computationMetadata, ImmutableList latencyAttributions, - long size, - List appliedFinalizeIds) { - workItem = workItem.toBuilder().addAllAppliedFinalizeIds(appliedFinalizeIds).build(); + long size) { return new AutoValue_GetWorkResponseChunkAssembler_AssembledWorkItem( workItem, computationMetadata, latencyAttributions, size); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index d52ea56f5af5..9731bcd36287 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.processing; -import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import java.util.ArrayList; @@ -35,6 +35,7 @@ import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.commons.lang3.tuple.Pair; import org.joda.time.Duration; import org.joda.time.Instant; @@ -149,6 +150,9 @@ public void cacheCommitFinalizers(Map> callbacks) * is still cached it is invoked. */ public void finalizeCommits(Iterable finalizeIds) { + if (Iterables.isEmpty(finalizeIds)) { + return; + } List callbacksToExecute = new ArrayList<>(); lock.lock(); try { @@ -164,7 +168,7 @@ public void finalizeCommits(Iterable finalizeIds) { } for (Runnable callback : callbacksToExecute) { try { - finalizationExecutor.execute(callback, 0); + finalizationExecutor.forceExecute(callback, 0); } catch (OutOfMemoryError oom) { throw oom; } catch (Throwable t) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java index 6f3b55c8fc94..45d55df9100e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java @@ -136,9 +136,7 @@ public void testIgnoresUnknownIds() throws Exception { finalizer.cacheCommitFinalizers( ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); finalizer.finalizeCommits(Collections.singletonList(2L)); - while (executor.elementsOutstanding() > 1) { - Thread.sleep(200); - } + assertEquals(1, executor.elementsOutstanding()); verify(callback, never()).run(); assertEquals(1, finalizer.cleanupQueueSize()); } @@ -167,11 +165,8 @@ public void testCleanupOnExpiration() throws Exception { // Wait until the two 100ms timeouts expire. Thread.sleep(200); } - // We can call finalize even though these were already cleaned up. + assertEquals(1, executor.elementsOutstanding()); finalizer.finalizeCommits(ImmutableList.of(2L, 3L)); - while (executor.elementsOutstanding() > 1) { - Thread.sleep(200); - } verify(callback2, never()).run(); verify(callback3, never()).run(); From 5b24e50d69cfb5f1132bc5ed72ff6356bb8ccd22 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Mon, 9 Mar 2026 13:44:57 -0700 Subject: [PATCH 08/14] Adds finalization callbacks directly when flushing instead of copying. Adds missing bundleFinalizer from SDF. Retriggering ValidatesRunner tests. --- ...tCommit_Java_ValidatesRunner_Dataflow.json | 2 +- ...va_ValidatesRunner_Dataflow_Streaming.json | 2 +- .../google-cloud-dataflow-java/build.gradle | 8 ++--- .../worker/SplittableProcessFnFactory.java | 5 +-- .../worker/StreamingModeExecutionContext.java | 32 +++++++++---------- 5 files changed, 21 insertions(+), 28 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json index 8144784f5f02..e9d869cc508c 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 2, + "modification": 1, } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index 50d17c108f2e..e623d3373a93 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 2, + "modification": 1, } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index d61e1d201fe2..828505984ce7 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -453,11 +453,9 @@ task validatesRunner { 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful', 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle', 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful', - - // Batch legacy worker does not support bundle finalization. - 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testBundleFinalizationOccursOnBoundedSplittableDoFn', - 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testBundleFinalizationOccursOnUnboundedSplittableDoFn', - ] + ], + // Batch legacy worker does not support bundle finalization. + excludedCategories: [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ], )) } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java index 4473dff8e94a..93c288fea9ea 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java @@ -156,10 +156,7 @@ public DoFnRunner>, OutputT> crea // in the event of a crash. 10000, Duration.standardSeconds(10), - () -> { - throw new UnsupportedOperationException( - "BundleFinalizer unsupported by non-portable Dataflow."); - })); + stepContext::bundleFinalizer)); DoFnRunner>, OutputT> simpleRunner = new SimpleDoFnRunner<>( options, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 2a8d0ba301a5..b264382091ed 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -454,25 +454,23 @@ public void invalidateCache() { public Map> flushState() { Map> callbacks = new HashMap<>(); - List> bundleFinalizers = new ArrayList<>(); for (StepContext stepContext : getAllStepContexts()) { stepContext.flushState(); - bundleFinalizers.addAll(stepContext.flushBundleFinalizerCallbacks()); - } - for (Pair bundleFinalizer : bundleFinalizers) { - long id = ThreadLocalRandom.current().nextLong(); - callbacks.put( - id, - Pair.of( - bundleFinalizer.getLeft(), - () -> { - try { - bundleFinalizer.getRight().onBundleSuccess(); - } catch (Exception e) { - throw new RuntimeException("Exception while running bundle finalizer", e); - } - })); - outputBuilder.addFinalizeIds(id); + for (Pair bundleFinalizer : stepContext.flushBundleFinalizerCallbacks()) { + long id = ThreadLocalRandom.current().nextLong(); + callbacks.put( + id, + Pair.of( + bundleFinalizer.getLeft(), + () -> { + try { + bundleFinalizer.getRight().onBundleSuccess(); + } catch (Exception e) { + throw new RuntimeException("Exception while running bundle finalizer", e); + } + })); + outputBuilder.addFinalizeIds(id); + } } if (activeReader != null) { From 4f0176167ed11e889763bee01d532e47f2a330b5 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Mon, 9 Mar 2026 16:08:09 -0700 Subject: [PATCH 09/14] Ran spotlessApply and retriggering streaming ValidatesRunner tests. --- ...am_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json | 2 +- .../dataflow/worker/StreamingModeExecutionContext.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index e623d3373a93..e1b083e439cc 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 1, + "modification": 3, } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index b264382091ed..e136def3ca33 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -24,7 +24,6 @@ import com.google.api.services.dataflow.model.SideInputInfo; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -456,7 +455,8 @@ public Map> flushState() { for (StepContext stepContext : getAllStepContexts()) { stepContext.flushState(); - for (Pair bundleFinalizer : stepContext.flushBundleFinalizerCallbacks()) { + for (Pair bundleFinalizer : + stepContext.flushBundleFinalizerCallbacks()) { long id = ThreadLocalRandom.current().nextLong(); callbacks.put( id, From 591aeaaa346a3a8e9917ef3e90b7105e1d039427 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Fri, 13 Mar 2026 11:04:27 -0700 Subject: [PATCH 10/14] Retriggering validates runner tests. --- ...beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index e1b083e439cc..bd1cc3348ac8 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 3, + "modification": 4, } From d7df55217068ecc242f4e3e72afd083828f21219 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Fri, 13 Mar 2026 12:39:02 -0700 Subject: [PATCH 11/14] Changes Long to long for build error. --- .../windmill/work/processing/StreamingCommitFinalizer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index 9731bcd36287..12292c1c631a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -50,7 +50,7 @@ final class StreamingCommitFinalizer { /** A {@link Runnable} and expiry time pair. */ @AutoValue public abstract static class FinalizationInfo { - public abstract Long getId(); + public abstract long getId(); public abstract Instant getExpiryTime(); From a50b859c2711c36f5b7ad652e92abef1dcf13c22 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Mon, 16 Mar 2026 11:29:05 -0700 Subject: [PATCH 12/14] Starts finalization callback cleanup thread lazily. This is for tests and pipelines that don't have any callbacks. --- .../work/processing/StreamingCommitFinalizer.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index 12292c1c631a..098b4021ff8e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -71,11 +71,13 @@ public static FinalizationInfo create(Long id, Instant expiryTime, Runnable call private final PriorityQueue cleanUpQueue = new PriorityQueue<>(11, Comparator.comparing(FinalizationInfo::getExpiryTime)); + @GuardedBy("lock") + private boolean cleanUpThreadStarted = false; + private final BoundedQueueExecutor finalizationExecutor; private StreamingCommitFinalizer(BoundedQueueExecutor finalizationCleanupExecutor) { finalizationExecutor = finalizationCleanupExecutor; - finalizationCleanupExecutor.execute(this::cleanupThreadBody, 0); } private void cleanupThreadBody() { @@ -132,6 +134,12 @@ public void cacheCommitFinalizers(Map> callbacks) + " but had " + existingInfo); } + if (!cleanUpThreadStarted) { + // Start the cleanup thread lazily for pipelines that don't use finalization callbacks + // and some tests. + cleanUpThreadStarted = true; + finalizationExecutor.execute(this::cleanupThreadBody, 0); + } cleanUpQueue.add(info); @SuppressWarnings("ReferenceEquality") boolean newMin = cleanUpQueue.peek() == info; From 7c62f6fefe93592ecfd1eca8e0253864fa032a7c Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Tue, 17 Mar 2026 13:36:11 -0700 Subject: [PATCH 13/14] Excludes bundle finalizer tests for appliance only and retriggers validates runner tests. --- ...beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json | 2 +- runners/google-cloud-dataflow-java/build.gradle | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index bd1cc3348ac8..40966050cf55 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 4, + "modification": 5, } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 321c5304054f..e110fa3bd52a 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -491,6 +491,8 @@ task validatesRunnerStreaming { description "Validates Dataflow runner forcing streaming mode" dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig + [ name: 'validatesRunnerLegacyWorkerTestStreaming', + // Streaming appliance currently fails bundle finalizer tests. + excludedCategories: [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ], ])) } From 24ce8f6539d5d337c9992361ba30cdb628f5cfcf Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Wed, 18 Mar 2026 14:58:09 -0700 Subject: [PATCH 14/14] Switches which Pair library being imported since it was causing undeclaredArtigfact build errors and fixes bug in ValidatesRunner streaming test which was not excluding all categories. Retriggering post submits. --- ...beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json | 2 +- runners/google-cloud-dataflow-java/build.gradle | 2 +- .../runners/dataflow/worker/StreamingModeExecutionContext.java | 2 +- .../windmill/work/processing/StreamingCommitFinalizer.java | 2 +- .../windmill/work/processing/StreamingCommitFinalizerTest.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index 40966050cf55..0c41d2bcf2fe 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 5, + "modification": 6, } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index e110fa3bd52a..652c72c323ef 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -492,7 +492,7 @@ task validatesRunnerStreaming { dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig + [ name: 'validatesRunnerLegacyWorkerTestStreaming', // Streaming appliance currently fails bundle finalizer tests. - excludedCategories: [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ], + excludedCategories: validatesRunnerStreamingConfig.excludedCategories + [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ], ])) } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index e136def3ca33..f75d452b211b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -35,6 +35,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.NotThreadSafe; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; @@ -91,7 +92,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.PeekingIterator; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table; -import org.apache.commons.lang3.tuple.Pair; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index 098b4021ff8e..4266f11f50c9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -32,11 +32,11 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; -import org.apache.commons.lang3.tuple.Pair; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java index 45d55df9100e..7361d0be2cd0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java @@ -30,9 +30,9 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.commons.lang3.tuple.Pair; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before;