diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java index 0616f08ae25f..2f4b26b89ab4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java @@ -19,7 +19,6 @@ import static org.apache.beam.runners.dataflow.util.Structs.getBytes; import static org.apache.beam.runners.dataflow.util.Structs.getString; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.auto.service.AutoService; import java.io.IOException; @@ -156,10 +155,10 @@ private PubsubWriter(String topic) { @Override public long add(WindowedValue data) throws IOException { - checkState( - stream.size() == 0, - "Expected output stream to be empty but had %s", - stream.toByteString()); + if (!stream.isEmpty()) { + throw new IllegalStateException( + "Expected output stream to be empty but was of size " + stream.size()); + } ByteString byteString = null; try { if (formatFn != null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index abee9a33df2d..01be85a048e8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -196,7 +196,7 @@ private WindmillStreamWriter(String destinationName) { } private ByteString encode(Coder coder, EncodeT object) throws IOException { - if (stream.size() != 0) { + if (!stream.isEmpty()) { throw new IllegalStateException( "Expected output stream to be empty but had " + stream.toByteString()); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java index 109a057585ad..9b9603706b48 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java @@ -237,7 +237,7 @@ public void discard() { private Elements.Builder convertBufferForTransmission() { Elements.Builder bufferedElements = Elements.newBuilder(); for (Map.Entry> entry : outputDataReceivers.entrySet()) { - if (entry.getValue().bufferedSize() == 0) { + if (!entry.getValue().hasBufferedOutput()) { continue; } ByteString bytes = entry.getValue().toByteStringAndResetBuffer(); @@ -248,7 +248,7 @@ private Elements.Builder convertBufferForTransmission() { .setData(bytes); } for (Map.Entry> entry : outputTimersReceivers.entrySet()) { - if (entry.getValue().bufferedSize() == 0) { + if (!entry.getValue().hasBufferedOutput()) { continue; } ByteString bytes = entry.getValue().toByteStringAndResetBuffer(); @@ -340,10 +340,11 @@ public Receiver(Coder coder) { public void accept(T input) throws Exception { int size = output.size(); coder.encode(input, output); - if (output.size() - size == 0) { + long delta = (long) output.size() - size; + if (delta == 0) { output.write(0); + delta = 1; } - final long delta = (long) output.size() - size; bytesWrittenSinceFlush += delta; perBundleByteCount += delta; perBundleElementCount += 1; @@ -360,8 +361,8 @@ public long getElementCount() { return perBundleElementCount; } - public int bufferedSize() { - return output.size(); + public boolean hasBufferedOutput() { + return !output.isEmpty(); } public ByteString toByteStringAndResetBuffer() { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java index ade84f7a6436..ec5aafd3f637 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java @@ -162,10 +162,8 @@ public ByteString toByteStringAndReset() { * Resets the output stream to be re-used possibly re-using any existing buffers. */ public void reset() { - if (size() == 0) { - return; - } - toByteStringAndReset(); + bufferPos = 0; + result = ByteString.EMPTY; } /** @@ -216,6 +214,12 @@ public int size() { return result.size() + bufferPos; } + /** Returns if the output stream is currently empty. */ + @SuppressWarnings("ReferenceEquality") + public boolean isEmpty() { + return bufferPos == 0 && result == ByteString.EMPTY; + } + @Override public Appendable append(@Nullable CharSequence csq) throws IOException { write(Preconditions.checkNotNull(csq).toString().getBytes(StandardCharsets.UTF_8)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java index 529add84152d..092ba200c94b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.empty; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.io.IOException; @@ -146,7 +147,7 @@ public void testConfiguredBufferLimit() throws Exception { } else { receiver = Iterables.getOnlyElement(aggregator.outputDataReceivers.values()); } - assertEquals(0L, receiver.bufferedSize()); + assertFalse(receiver.hasBufferedOutput()); assertEquals(102L, receiver.getByteCount()); assertEquals(2L, receiver.getElementCount()); @@ -156,7 +157,7 @@ public void testConfiguredBufferLimit() throws Exception { aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams(); // Test that receiver stats have been reset after // sendOrCollectBufferedDataAndFinishOutboundStreams. - assertEquals(0L, receiver.bufferedSize()); + assertFalse(receiver.hasBufferedOutput()); assertEquals(0L, receiver.getByteCount()); assertEquals(0L, receiver.getElementCount()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java index 605d341d476f..c4b468a118fe 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -145,6 +146,7 @@ public void testWriteBytesWithZeroInitialCapacity() throws Exception { ByteStringOutputStream out = new ByteStringOutputStream(0); assertEquals(0, out.size()); + assertTrue(out.isEmpty()); for (int pos = 0; pos < testBuffer.length; ) { if (testBuffer[pos] == 0) { @@ -157,8 +159,14 @@ public void testWriteBytesWithZeroInitialCapacity() throws Exception { } assertEquals(pos, out.size()); } + assertEquals(testBuffer.length == 0, out.isEmpty()); + assertEquals(testBuffer.length, out.size()); assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteString()); + assertEquals(testBuffer.length == 0, out.isEmpty()); + assertEquals(testBuffer.length, out.size()); assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteStringAndReset()); + assertTrue(out.isEmpty()); + assertEquals(0, out.size()); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java index ba56c6d656ca..de5031085b8c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java @@ -155,7 +155,7 @@ public void asyncClose() throws Exception { .setAppend(StateAppendRequest.newBuilder().setData(out.toByteStringAndReset()))); } } - if (out.size() > 0) { + if (!out.isEmpty()) { beamFnStateClient.handle( request .toBuilder()