From 659068d921ebeedcc982786b3ed8487fc6f40400 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Wed, 25 Mar 2026 15:40:40 +0100 Subject: [PATCH 1/3] [Java] Add ByteStringOutputStream.isEmpty() --- .../beam/runners/dataflow/worker/PubsubSink.java | 9 ++++----- .../beam/runners/dataflow/worker/WindmillSink.java | 2 +- .../sdk/fn/data/BeamFnDataOutboundAggregator.java | 13 +++++++------ .../org/apache/beam/sdk/fn/stream/DataStreams.java | 8 +++++--- .../beam/sdk/util/ByteStringOutputStream.java | 12 ++++++++---- .../fn/data/BeamFnDataOutboundAggregatorTest.java | 5 +++-- .../beam/sdk/util/ByteStringOutputStreamTest.java | 8 ++++++++ .../apache/beam/fn/harness/state/BagUserState.java | 2 +- 8 files changed, 37 insertions(+), 22 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java index 0616f08ae25f..2f4b26b89ab4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java @@ -19,7 +19,6 @@ import static org.apache.beam.runners.dataflow.util.Structs.getBytes; import static org.apache.beam.runners.dataflow.util.Structs.getString; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.auto.service.AutoService; import java.io.IOException; @@ -156,10 +155,10 @@ private PubsubWriter(String topic) { @Override public long add(WindowedValue data) throws IOException { - checkState( - stream.size() == 0, - "Expected output stream to be empty but had %s", - stream.toByteString()); + if (!stream.isEmpty()) { + throw new IllegalStateException( + "Expected output stream to be empty but was of size " + stream.size()); + } ByteString byteString = null; try { if (formatFn != null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index abee9a33df2d..01be85a048e8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -196,7 +196,7 @@ private WindmillStreamWriter(String destinationName) { } private ByteString encode(Coder coder, EncodeT object) throws IOException { - if (stream.size() != 0) { + if (!stream.isEmpty()) { throw new IllegalStateException( "Expected output stream to be empty but had " + stream.toByteString()); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java index 109a057585ad..1dd2b3d80ca2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java @@ -237,7 +237,7 @@ public void discard() { private Elements.Builder convertBufferForTransmission() { Elements.Builder bufferedElements = Elements.newBuilder(); for (Map.Entry> entry : outputDataReceivers.entrySet()) { - if (entry.getValue().bufferedSize() == 0) { + if (!entry.getValue().hasBufferedOutput()) { continue; } ByteString bytes = entry.getValue().toByteStringAndResetBuffer(); @@ -248,7 +248,7 @@ private Elements.Builder convertBufferForTransmission() { .setData(bytes); } for (Map.Entry> entry : outputTimersReceivers.entrySet()) { - if (entry.getValue().bufferedSize() == 0) { + if (!entry.getValue().hasBufferedOutput()) { continue; } ByteString bytes = entry.getValue().toByteStringAndResetBuffer(); @@ -340,10 +340,11 @@ public Receiver(Coder coder) { public void accept(T input) throws Exception { int size = output.size(); coder.encode(input, output); - if (output.size() - size == 0) { + long delta = (long) output.size() - size; + if (delta == 0) { output.write(0); + delta = 1; } - final long delta = (long) output.size() - size; bytesWrittenSinceFlush += delta; perBundleByteCount += delta; perBundleElementCount += 1; @@ -360,8 +361,8 @@ public long getElementCount() { return perBundleElementCount; } - public int bufferedSize() { - return output.size(); + public boolean hasBufferedOutput() { + return output.isEmpty(); } public ByteString toByteStringAndResetBuffer() { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java index 15401a49bd98..d8f7a00c51a0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java @@ -99,10 +99,12 @@ public ElementDelimitedOutputStream( public void delimitElement() throws IOException { // If the previous encoding was exactly zero bytes, output a single marker byte as per // https://s.apache.org/beam-fn-api-send-and-receive-data - if (previousPosition == output.size()) { + int newPosition = output.size(); + if (previousPosition == newPosition) { write(0); + ++newPosition; } - previousPosition = output.size(); + previousPosition = newPosition; } @Override @@ -136,7 +138,7 @@ public void write(byte[] b, int offset, int length) throws IOException { @Override public void close() throws IOException { - if (output.size() > 0) { + if (!output.isEmpty()) { consumer.read(output.toByteString()); } output.close(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java index ade84f7a6436..ec5aafd3f637 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java @@ -162,10 +162,8 @@ public ByteString toByteStringAndReset() { * Resets the output stream to be re-used possibly re-using any existing buffers. */ public void reset() { - if (size() == 0) { - return; - } - toByteStringAndReset(); + bufferPos = 0; + result = ByteString.EMPTY; } /** @@ -216,6 +214,12 @@ public int size() { return result.size() + bufferPos; } + /** Returns if the output stream is currently empty. */ + @SuppressWarnings("ReferenceEquality") + public boolean isEmpty() { + return bufferPos == 0 && result == ByteString.EMPTY; + } + @Override public Appendable append(@Nullable CharSequence csq) throws IOException { write(Preconditions.checkNotNull(csq).toString().getBytes(StandardCharsets.UTF_8)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java index 529add84152d..092ba200c94b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.empty; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.io.IOException; @@ -146,7 +147,7 @@ public void testConfiguredBufferLimit() throws Exception { } else { receiver = Iterables.getOnlyElement(aggregator.outputDataReceivers.values()); } - assertEquals(0L, receiver.bufferedSize()); + assertFalse(receiver.hasBufferedOutput()); assertEquals(102L, receiver.getByteCount()); assertEquals(2L, receiver.getElementCount()); @@ -156,7 +157,7 @@ public void testConfiguredBufferLimit() throws Exception { aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams(); // Test that receiver stats have been reset after // sendOrCollectBufferedDataAndFinishOutboundStreams. - assertEquals(0L, receiver.bufferedSize()); + assertFalse(receiver.hasBufferedOutput()); assertEquals(0L, receiver.getByteCount()); assertEquals(0L, receiver.getElementCount()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java index 605d341d476f..c4b468a118fe 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -145,6 +146,7 @@ public void testWriteBytesWithZeroInitialCapacity() throws Exception { ByteStringOutputStream out = new ByteStringOutputStream(0); assertEquals(0, out.size()); + assertTrue(out.isEmpty()); for (int pos = 0; pos < testBuffer.length; ) { if (testBuffer[pos] == 0) { @@ -157,8 +159,14 @@ public void testWriteBytesWithZeroInitialCapacity() throws Exception { } assertEquals(pos, out.size()); } + assertEquals(testBuffer.length == 0, out.isEmpty()); + assertEquals(testBuffer.length, out.size()); assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteString()); + assertEquals(testBuffer.length == 0, out.isEmpty()); + assertEquals(testBuffer.length, out.size()); assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteStringAndReset()); + assertTrue(out.isEmpty()); + assertEquals(0, out.size()); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java index ba56c6d656ca..de5031085b8c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java @@ -155,7 +155,7 @@ public void asyncClose() throws Exception { .setAppend(StateAppendRequest.newBuilder().setData(out.toByteStringAndReset()))); } } - if (out.size() > 0) { + if (!out.isEmpty()) { beamFnStateClient.handle( request .toBuilder() From 06fb68af9b1bb0310f0fe254f26a822ceda2f275 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Thu, 26 Mar 2026 09:39:05 +0100 Subject: [PATCH 2/3] fix bug --- .../apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java index 1dd2b3d80ca2..9b9603706b48 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java @@ -362,7 +362,7 @@ public long getElementCount() { } public boolean hasBufferedOutput() { - return output.isEmpty(); + return !output.isEmpty(); } public ByteString toByteStringAndResetBuffer() { From 92b3b90e223521ceb62d426912d64933db5b6fa1 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Fri, 27 Mar 2026 13:01:37 +0100 Subject: [PATCH 3/3] revert DataStreams change as it may use other OutputStreams for which size() behaves differently --- .../java/org/apache/beam/sdk/fn/stream/DataStreams.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java index d8f7a00c51a0..15401a49bd98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java @@ -99,12 +99,10 @@ public ElementDelimitedOutputStream( public void delimitElement() throws IOException { // If the previous encoding was exactly zero bytes, output a single marker byte as per // https://s.apache.org/beam-fn-api-send-and-receive-data - int newPosition = output.size(); - if (previousPosition == newPosition) { + if (previousPosition == output.size()) { write(0); - ++newPosition; } - previousPosition = newPosition; + previousPosition = output.size(); } @Override @@ -138,7 +136,7 @@ public void write(byte[] b, int offset, int length) throws IOException { @Override public void close() throws IOException { - if (!output.isEmpty()) { + if (output.size() > 0) { consumer.read(output.toByteString()); } output.close();